我有一個Spark作業,它可以非常快速地處理數據,但是當它試圖將結果寫入postgresql數據庫時,它非常緩慢。以下是大部分相關代碼:pyspark + psycopg2將結果寫入數據庫的速度很慢
import psycopg2
def save_df_to_db(records):
# each item in record is a dictionary with 'url', 'tag', 'value' as keys
db_conn = psycopg2.connect(connect_string)
db_conn.autocommit = True
cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
upsert_query = """INSERT INTO mytable (url, tag, value)
VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s"""
try:
cursor.executemany(upsert_query, records)
except Exception as e:
print "Error in executing save_df_to_db: ", e.message
data = [...] # initial data
rdd = sc.parallelize(data)
rdd = ... # Some simple RDD transforms...
rdd.foreachPartition(save_df_to_db)
該表還對url +標記是唯一的約束。我正在尋找解決方案來提高此代碼的速度。任何建議或建議是受歡迎的。
作爲此時Psycopg 2.7,它提供'execute_values',仍處於測試階段。現在使用適合2.6的解決方案:http://stackoverflow.com/a/30985541/131874 –