2016-12-29 99 views
1

我想使用RxPy打開一個(csv)文件並逐行處理文件。我正是我設想有以下步驟RxPy讀取csv文件和進程行

  1. 提供一個文件名到流
  2. 打開由線
  3. 文件
  4. 讀取文件中的行刪除與註釋(例如#開頭的行.. 。)
  5. 申請CSV閱讀
  6. 篩選記錄滿足某種條件

到目前爲止,我有:

def to_file(filename): 
f = open(filename) 
return Observable.using(
    lambda: AnonymousDisposable(lambda: f.close()), 
    lambda d: Observable.just(f) 
) 

def to_reader(f): 
    return csv.reader(f) 

def print_rows(reader): 
    for row in reader: 
     print(row) 

這工作

Observable.from_(["filename.csv", "filename2.csv"]) 
    .flat_map(to_file).**map**(to_reader).subscribe(print_rows) 

這不:ValueError異常:I/O操作上關閉的文件

Observable.from_(["filename.csv", "filename2.csv"]) 
    .flat_map(to_file).**flat_map**(to_rows).subscribe(print) 

第二屆不會因爲工作(見https://github.com/ReactiveX/RxPY/issues/69

When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.

任何想法我如何能實現: 喜歡的東西:

Observable.from_(["filename.csv", "filename2.csv"] 
    ).flat_map(to_file 
    ).filter(comment_lines 
    ).filter(empty_lines 
    ).map(to_csv_reader 
    ).filter(filter_by..) 
    ).do whatever 

非常感謝您的幫助

克林斯曼

回答

0

剛開始我最近與RxPy工作,做同樣的事情需要。驚訝的人還沒有回答你的問題,但決定回答,以防別人需要知道。假設你有一個CSV文件是這樣的:

$ cat datafile.csv 
"iata","airport","city","state","country","lat","long" 
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472 
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778 
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933 
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056 
"01J","Hilliard Airpark","Hilliard","FL","USA",30.6880125,-81.90594389 

這裏是一個解決方案:

from rx import Observable 
from csv import DictReader 

Observable.from_(DictReader(open('datafile.csv', 'r'))) \ 
      .subscribe(lambda row: 
        print("{0:3}\t{1:<35}".format(row['iata'], row['airport'][:35])) 
     )