2017-08-02 86 views
0

我在單進程和多進程中嘗試了批量插入,但他們使用同一時間。我沒有得到任何改善。 cassandra的keyspace是SimpleStrategy,我認爲它只有一個節點。做這些影響嗎?python多處理bach在Cassandra中插入,性能沒有提高

這是我的多處理代碼,你能幫我找到哪裏出錯嗎?

lock = Lock() 
ID = Value('i', 0) 

def copy(x): 

    cluster = Cluster() 
    session = cluster.connect('test') 
    global lock, row_ID 
    count = 0 

    insertt = session.prepare("INSERT INTO table2(id, age, gender, name) values(?, ?, ?, ?)") 
    batch = BatchStatement() 

    for i in x: 
     with open(files[i]) as csvfile: 
      reader = csv.reader(csvfile, delimiter=',') 
      for row in tqdm(reader): 
       if count <= 59: 
        with lock: 
         ID.value += 1 
        name_ID = row[1] 
        gender_ID = row[2] 
        age_ID = int(row[3]) 
        batch.add(insertt, (ID.value, age_ID, gender_ID, name_ID)) 
        count += 1 
       else: 
        count = 0 
        with lock: 
         ID.value += 1 
        name_ID = row[1] 
        gender_ID = row[2] 
        age_ID = int(row[3]) 
        batch.add(insertt, (ID.value, age_ID, gender_ID, name_ID)) 
        session.execute(batch) 
        batch = BatchStatement() 

if __name__ == '__main__': 
    start = time.time() 
    with Pool() as p: 
     p.map(copy, [range(0,6),range(6,12),range(12,18),range(18,24)]) 
     end = time.time() 
     t = end - start 
     print(t) 

回答

1

批次不是爲了提高性能,而是真的相反。特別記錄的批次(您在此使用的)是正常寫入成本的兩倍多。未記錄的批次可能會略微提高性能如果批次中的所有數據都屬於同一分區。

在這個特定的例子中,你的吞吐量也會受限於你的csv閱讀器可以從磁盤中獲取多快。由於其阻塞可能是吞吐量的主要影響之一。您也可以使用executeAsync,以便在完成前一個批處理時不阻塞下一個批處理的構建(儘管在這裏再次使用批處理)。

+0

謝謝,我是卡桑德拉的新手。我比較了在一個過程中使用批處理(記錄)和不使用批處理所消耗的時間,它實際上節省了大約4/5時間。但是當我添加多處理時,它具有相同的性能。所以另一個問題是「我是否正確使用多處理」?你能否給我一些相關的教程鏈接? – Oak

+0

如果可能,你能直接糾正我的代碼嗎?提前致謝。 – Oak

+0

如果您從執行更改爲executeasync,您將看到更大的差異。您看到批處理性能提高的原因是因爲吞吐量在使用同步請求時被延遲阻塞。通過減少請求的數量,減少阻塞延遲(littles law fwiw)。但是這個請求實際上更加昂貴且速度更慢。 –