2017-02-14 331 views
0

我有一個Spark作業,它可以非常快速地處理數據,但是當它試圖將結果寫入postgresql數據庫時,它非常緩慢。以下是大部分相關代碼:pyspark + psycopg2將結果寫入數據庫的速度很慢

import psycopg2 

def save_df_to_db(records): 
    # each item in record is a dictionary with 'url', 'tag', 'value' as keys 
    db_conn = psycopg2.connect(connect_string) 
    db_conn.autocommit = True 
    cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) 
    upsert_query = """INSERT INTO mytable (url, tag, value) 
         VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s""" 

    try: 
     cursor.executemany(upsert_query, records) 
    except Exception as e: 
     print "Error in executing save_df_to_db: ", e.message 

data = [...] # initial data 
rdd = sc.parallelize(data) 
rdd = ... # Some simple RDD transforms... 
rdd.foreachPartition(save_df_to_db) 

該表還對url +標記是唯一的約束。我正在尋找解決方案來提高此代碼的速度。任何建議或建議是受歡迎的。

+0

作爲此時Psycopg 2.7,它提供'execute_values',仍處於測試階段。現在使用適合2.6的解決方案:http://stackoverflow.com/a/30985541/131874 –

回答

0

感謝您的回覆。由於我使用的psycopg2版本不支持批處理執行,因此我不得不依賴使用copy命令的稍微不同的方法。我寫下了一個小功能,幫助將保存時間從20分鐘縮短到30秒左右。這是功能。它採用熊貓數據幀作爲輸入,並將其寫入一個表(CURSO):

import StringIO 
import pandas as pd 

def write_dataframe_to_table(cursor, table, dataframe, batch_size=100, null='None'): 
    """ 
    Write a pandas dataframe into a postgres table. 
    It only works if the table columns have the same name as the dataframe columns. 
    :param cursor: the psycopg2 cursor object 
    :param table: the table name 
    :param dataframe: the dataframe 
    :param batch_size: batch size 
    :param null: textual representation of NULL in the file. The default is the string None. 
    """ 
    for i in range(0, len(dataframe), batch_size): 
     chunk_df = dataframe[i: batch_size + i] 
     content = "\n".join(chunk_df.apply(lambda x: "\t".join(map(str, x)), axis=1)) 
     cursor.copy_from(StringIO.StringIO(content), table, columns=list(chunk_df.columns), null=null) 
1

我相信主要的瓶頸是cursor.executemanyconnection.autocommit的組合。因爲它是executemany

官方文檔中解釋在目前的實現這個方法是不是比執行漢在一個循環中執行​​更快。

既然你把它和connection.autocommit結合起來,你可以在每次插入後有效地提交。

Psycopg提供fast execution helpers

可以被用於執行批處理操作

。手動處理提交也更有意義。

另外還有可能會使用大量的併發寫入和索引更新來限制數據庫服務器。通常我會建議寫入磁盤並使用COPY執行批量導入,但不能保證在此處提供幫助。

由於您使用不帶時間戳的可變記錄,因此您不能只刪除索引並在導入後重新創建它作爲提高性能的另一種方式。