2015-11-20 111 views
5

我有74個相對較大的熊貓數據框(大約34,600行和8列),我試圖儘可能快地插入到SQL Server數據庫中。在做了一些研究之後,我瞭解到這個功能對於插入SQL Server數據庫的這種大型數據庫並不好,這是我採用的最初方法(非常慢 - 應用程序完成大約需要一個小時,大約4分鐘。使用MySQL數據庫時)將大熊貓數據框寫入SQL Server數據庫

This article,和許多其他StackOverflow的帖子一直在指着我在正確的方向有幫助的,但是我已經打了一個路障:

我嘗試使用SQLAlchemy的核心,而不是ORM的原因在上面的鏈接中解釋。所以,我的數據幀轉換爲字典,使用pandas.to_dict,然後做一個​​和insert()

self._session_factory.engine.execute(
    TimeSeriesResultValues.__table__.insert(), 
    data) 
# 'data' is a list of dictionaries. 

的問題是,插入沒有得到任何價值 - 他們似乎是一堆空括號和我的得到這個錯誤:

(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot 
insert the value NULL into the column... 

我在傳遞的字典列表中有值,所以我不明白爲什麼值沒有顯示出來。

編輯:

這裏是我要去關的例子:

def test_sqlalchemy_core(n=100000): 
    init_sqlalchemy() 
    t0 = time.time() 
    engine.execute(
     Customer.__table__.insert(), 
     [{"name": 'NAME ' + str(i)} for i in range(n)] 
    ) 
    print("SQLAlchemy Core: Total time for " + str(n) + 
     " records " + str(time.time() - t0) + " secs") 
+0

*約4分鐘,同時使用mysql數據庫* ...這樣的'to_sql()'是一個可行的解決方案時,只是連接MSSQL比MySQL更慢?你正在使用哪種ODBC API?數據庫服務器是本地還是遠程?考慮臨時表導入,然後遷移到最終表。 – Parfait

+0

@Parfait:使用'''to_sql()''可以在MySQL中產生可以接受的性能,但不會產生MSSQL。我正在使用pyodbc。數據庫是遠程的,因此寫入CSV文件然後通過原始的sql代碼進行批量插入在這種情況下也不會真正起作用。此外,用戶需要批量管理權限才能這樣做,這對於此應用程序的用戶來說並不總是可能的。 – denvaar

+1

考慮繞過odbc驅動程序並嚴格使用Python API - [pmyssl](http://www.pymssql.org/en/latest/)和MySQL ODBC API? pymysql?兩者中的表結構和數據類型相同?相同數量的記錄?真的調查這一點。兩者都是高級企業RDMS,不應該執行那麼廣的範圍(4分鐘比~60分鐘)。 – Parfait

回答

7

我有一個壞消息給你,其實SQLAlchemy的未實現的SQL Server批量導入,這是實際上只是做與to_sql正在做的相同的緩慢的單個INSERT語句。我會說你最好的選擇是嘗試使用bcp命令行工具編寫腳本。下面是我在過去使用的腳本,但不保證:

from subprocess import check_output, call 
import pandas as pd 
import numpy as np 
import os 

pad = 0.1 
tablename = 'sandbox.max.pybcp_test' 
overwrite=True 
raise_exception = True 
server = 'P01' 
trusted_connection= True 
username=None 
password=None 
delimiter='|' 
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False) 



def get_column_def_sql(col): 
    if col.dtype == object: 
     width = col.str.len().max() * (1+pad) 
     return '[{}] varchar({})'.format(col.name, int(width)) 
    elif np.issubdtype(col.dtype, float): 
     return'[{}] float'.format(col.name) 
    elif np.issubdtype(col.dtype, int): 
     return '[{}] int'.format(col.name) 
    else: 
     if raise_exception: 
     raise NotImplementedError('data type {} not implemented'.format(col.dtype)) 
     else: 
     print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype)) 
     width = col.str.len().max() * (1+pad) 
     return '[{}] varchar({})'.format(col.name, int(width)) 

def create_table(df, tablename, server, trusted_connection, username, password, pad):   
    if trusted_connection: 
     login_string = '-E' 
    else: 
     login_string = '-U {} -P {}'.format(username, password) 

    col_defs = [] 
    for col in df: 
     col_defs += [get_column_def_sql(df[col])] 

    query_string = 'CREATE TABLE {}\n({})\nGO\nQUIT'.format(tablename, ',\n'.join(col_defs))  
    if overwrite == True: 
     query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string 


    query_file = 'c:\\pybcp_tempqueryfile.sql' 
    with open (query_file,'w') as f: 
     f.write(query_string) 

    if trusted_connection: 
     login_string = '-E' 
    else: 
     login_string = '-U {} -P {}'.format(username, password) 

    o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True) 
    if o != 0: 
     raise BaseException("Failed to create table") 
    # o = call('del {}'.format(query_file), shell=True) 


def call_bcp(df, tablename): 
    if trusted_connection: 
     login_string = '-T' 
    else: 
     login_string = '-U {} -P {}'.format(username, password) 
    temp_file = 'c:\\pybcp_tempqueryfile.csv' 

    #remove the delimiter and change the encoding of the data frame to latin so sql server can read it 
    df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin')) 
    df.to_csv(temp_file, index = False, sep = '|', errors='ignore') 
    o = call('bcp sandbox.max.pybcp_test2 in c:\pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r\n -c') 
+0

感謝您的迴應 - 我不知道是否涉及創建文件的任何內容都適用於這種特殊情況。 – denvaar

+0

關於爲什麼不支持它,你有更多的信息嗎? – denvaar