2016-07-15 54 views
1

我使用熊貓來讀取非常大的csv文件,這也是gzip。 我解壓縮到大約30-50GB的csv文件。 我分塊文件並處理/操作它們。 最後,相關數據添加到我壓縮購買內存,以避免30-50Gb加文件分塊

它工作正常,但速度很慢,因爲我要處理,每天一個文件,有幾個年的數據(600TB未壓縮的CSV)

能買HDF5文件更多內存是避免分塊和加速64GB/128GB的過程的好方法? 但這會使熊貓變得緩慢而笨拙嗎? 我是否正確地說切換到C++可以加速這個過程,但我仍然忍受着讀取過程,不得不以塊爲單位處理數據。 最後有沒有人有任何想法來處理這個最好的方法。

順便說一下,一旦工作完成,我不必回過頭去處理數據,所以想讓它在合理的時間內工作,所以寫了一些東西,並行過程可能不錯,但經驗有限那個領域需要我花些時間才能構建出來,所以寧願不去除非那是唯一的選擇。

更新。我認爲這會更容易看到代碼。無論如何,我不相信代碼特別慢。我認爲技術/方法可能是。

def txttohdf(path, contract): 
    #create dataframes for trade and quote 
    dftrade = pd.DataFrame(columns = ["datetime", "Price", "Volume"]) 
    dfquote = pd.DataFrame(columns = ["datetime", "BidPrice", "BidSize","AskPrice", "AskSize"]) 
    #create an hdf5 file with high compression and table so we can append 
    hdf = pd.HDFStore(path + contract + '.h5', complevel=9, complib='blosc') 
    hdf.put('trade', dftrade, format='table', data_columns=True) 
    hdf.put('quote', dfquote, format='table', data_columns=True) 
    #date1 = date(start).strftime('%Y%m%d') 
    #date2 = date(end).strftime('%Y%m%d') 
    #dd = [date1 + timedelta(days=x) for x in range((date2-date1).days + 1)] 
    #walkthrough directories 
    for subdir, dir, files in os.walk(path): 
     for file in files: 
      #check if contract has name 
      #print(file) 
       #create filename from directory and file 

      filename = os.path.join(subdir, file) 
       #read in csv 
      if filename.endswith('.gz'): 

       df = pd.read_csv(gzip.open(filename),header=0,iterator=True,chunksize = 10000, low_memory =False, names = ['RIC','Date','Time','GMTOffset','Type','ExCntrbID','LOC','Price','Volume','MarketVWAP','BuyerID','BidPrice','BidSize','NoBuyers','SellerID','AskPrice','AskSize','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription']) 
       #parse date time this is quicker than doing it while we read it in 
       for chunk in df: 
        chunk['datetime'] = chunk.apply(lambda row: datetime.datetime.strptime(row['Date']+ ':' + row['Time'],'%d-%b-%Y:%H:%M:%S.%f'), axis=1) 
        #df = df[~df.comment.str.contains('ALIAS')] 
       #drop uneeded columns inc date and time 
        chunk = chunk.drop(['Date','Time','GMTOffset','ExCntrbID','LOC','MarketVWAP','BuyerID','NoBuyers','SellerID','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription'], axis=1) 
       # convert to datetime explicitly and add nanoseconds to same time stamps 
        chunk['datetime'] = pd.to_datetime(chunk.datetime) 
       #nanoseconds = df.groupby(['datetime']).cumcount() 
       #df['datetime'] += np.array(nanoseconds, dtype='m8[ns]') 
       # drop empty prints and make sure all prices are valid 
        dfRic = chunk[(chunk["RIC"] == contract)] 
        if len(dfRic)>0: 
         print(dfRic) 
        if ~chunk.empty: 
         dft = dfRic[(dfRic["Type"] == "Trade")] 
         dft.dropna(subset = ["Volume"], inplace =True) 
         dft = dft.drop(["RIC","Type","BidPrice", "BidSize", "AskPrice", "AskSize"], axis=1) 
         dft = dft[(dft["Price"] > 0)] 

        # clean up bid and ask 
         dfq = dfRic[(dfRic["Type"] == "Quote")] 
         dfq.dropna(how = 'all', subset = ["BidSize","AskSize"], inplace =True) 
         dfq = dfq.drop(["RIC","Type","Price", "Volume"], axis=1) 
         dfq = dfq[(dfq["BidSize"] > 0) | (dfq["AskSize"] > 0)] 
         dfq = dfq.ffill() 
        else: 
         print("Empty")  
    #add to hdf and close if loop finished 
        hdf.append('trade', dft, format='table', data_columns=True) 
        hdf.append('quote', dfq, format='table', data_columns=True) 
    hdf.close() 
+0

你能解釋什麼是緩慢的,爲什麼它慢?沒有更多的細節,很難猜測什麼有助於加快這一進程。 –

+1

您應該嘗試分析和測量程序的性能,以確定哪些點最慢以及內存或CPU功耗是否是限制因素。這將有助於縮小特定更改對您的幫助。然後,您還可以將最慢的源代碼部分上傳到http://codereview.stackexchange.com/上的問題,並徵求關於提高其性能的建議。 – gfv

+0

我會嘗試讀取塊壓縮的CSV格式,而不是首先解壓縮它們 - 這樣,您應該擁有更少的IO(通常是最慢的部分之一)。除此之外,擁有更多內存應該允許你擁有更大的塊,或者甚至可以在沒有塊的情況下完成,如果你的內存將近似於。比所產生的DF大兩倍。在同一臺服務器/計算機上進行並行處理(如果你的意思是DASK)會使開銷變得更糟。如果你需要一個真正的權力看看Apache PySpark SQL,但這將意味着更高的投資到Hadoop集羣 - 只是我的2美分... – MaxU

回答

1

我覺得你有哪些可優化不少東西:

  • 首先是隻讀那些你真正需要的不是讀取列,然後拖放 - 使用usecols=list_of_needed_columns參數

  • 增加CHUNKSIZE - 使用不同的值嘗試 - 我會開始10**5

  • 不使用chunk.apply(...)轉換的日期時間 - 這是很慢 - 使用pd.to_datetime(列,格式=「...」),而不是

  • 您可以過濾你的數據更有效地結合位多個條件時而不是一步一步地做:

+0

多數民衆贊成在偉大的,將應用變化 - 你絕對正確的申請,我忘記了這一點。 – azuric