2016-11-11 64 views
0

我創建了一個腳本來將數據從Dynamo遷移到Mysql數據庫。 首先我沒有使用異步,但我開始在SQL方面遇到瓶頸,所以我決定使用異步庫「節流」dymano部分。 問題:我在路徑中間有一個遞歸,只要發電機有數據我必須繼續這個過程(超簡單的ETL),但我不知道如何在瀑布內執行遞歸。 我的代碼:通過nodejs中的瀑布和遞歸使用異步

function main() { 
    async.waterfall([getMaxTimestamp, scanDynamoDB, printout, saveToMySQL], function(err, result) { 
     if(err) console.log(err) 
     console.log(result) 
    }); 
} 

function getMaxTimestamp(callback) { 
    console.time("max query"); 
    connection.query("SELECT MAX(created_at) as start_date from Tracking;", function(err, data) { 
     console.timeEnd("max query"); 
     callback(err, data); 
    }) 
} 

function scanDynamoDB(data, callback) { 
    if (data[0].start_date != null && data[0].start_date) 
     query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date; 

    console.time("dynamo read"); 
    dynamoDB.scan(query, function(err, data) { 
     console.timeEnd("dynamo read"); 
     callback(err, data); 
     // if (!err) { 
     //  if (data != undefined && data.Count > 0) { 
     //   printout(data.Items) // Print out the subset of results. 
     //   if (data.LastEvaluatedKey) { // Result is incomplete; there is more to come. 
     //    query.ExclusiveStartKey = data.LastEvaluatedKey; 
     //    scanDynamoDB(query); 
     //   } 
     //  } else { 
     //   console.log('No fresh data found on Dynamo') 
     // } else console.dir(err); 
    }); 
}; 

function assembleSql() { 
    insertSql = "insert into Tracking ("; 
    for (var i = 0; i < headers.length; i++) { 
     insertSql += headers[i]; 
     if (i < headers.length - 1) 
      insertSql += ","; 
    } 

    insertSql += ") values ?;" 
    previousInsertSql = insertSql; 
} 

function saveToMySQL(items, callback) { 
    assembleSql(); 
    //connection.connect(); 
    console.time("insert sql") 
    connection.query(insertSql, [items], function(err, result) { 
     console.timeEnd("insert sql") 
     if (err){ 
      callback(err, null) 
      return; 
     } 

     totalInserts += result.affectedRows; 
     callback(err, totalInserts) 
     //connection.end(); 
    }) 
} 

function printout(items, callback) { 
    var headersMap = {}; 
    var values; 
    var header; 
    var value; 

    var out = []; 

    if (headers.length == 0) { 
     if (items.length > 0) { 
      for (var i = 0; i < items.length; i++) { 
       for (var key in items[i]) { 
        headersMap[key] = true; 
       } 
      } 
     } 
     for (var key in headersMap) { 
      headers.push(key); 
     } 
    } 

    for (index in items) { 
     values = []; 
     for (i = 0; i < headers.length; i++) { 
      value = ""; 
      header = headers[i]; 
      // Loop through the header rows, adding values if they exist 
      if (items[index].hasOwnProperty(header)) { 
       if (items[index][header].N) { 
        value = items[index][header].N; 
       } else if (items[index][header].S) { 
        value = items[index][header].S; 
       } else if (items[index][header].SS) { 
        value = items[index][header].SS.toString(); 
       } else if (items[index][header].NS) { 
        value = items[index][header].NS.toString(); 
       } else if (items[index][header].B) { 
        value = items[index][header].B.toString('base64'); 
       } else if (items[index][header].M) { 
        value = JSON.stringify(items[index][header].M); 
       } else if (items[index][header].L) { 
        value = JSON.stringify(items[index][header].L); 
       } else if (items[index][header].BOOL !== undefined) { 
        value = items[index][header].BOOL.toString(); 
       } 
      } 
      values.push(value) 
     } 
     out.push(values) 
    } 
    callback(null, out); 
} 
main(); 

註釋的部分就是遞歸發生,但我不知道在哪裏放置這是我流的內!

任何幫助,將不勝感激!

回答

0

其實我能夠通過自己看着辦吧。

async.whilst(function() { return canInsert}, function (callback){ 
      scanDynamoDB(query, callback) 
     }, function(err, res) {} 
function scanDynamoDB(data, callback) { 
    console.time("dynamo read"); 

    dynamoDB.scan(query, function(err, data) { 
     console.timeEnd("dynamo read"); 
     if (!err) { 
      if (data != undefined && data.Count > 0) { 
       canInsert = data.LastEvaluatedKey; 
       if (data.LastEvaluatedKey) // Result is incomplete; there is more to come. 
        query.ExclusiveStartKey = data.LastEvaluatedKey; 
      } 
     } else console.dir(err); 
    }); 
}; 

我可能只是一個while(canInsert)做到了。無論如何,我避免遞歸和內存使用方式較低。

0

在獲取數據時,請不要在scanDynamoDB內調用回調函數。您可以實現附加功能和遞歸錯誤時不出現調用它,就像下面

function scanDynamoDB(data, callback) { 
    if (data[0].start_date != null && data[0].start_date) 
     query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date; 

    console.time("dynamo read"); 

    var result = []; // for accumulate data of each query 

    function readNext(err, data) { 
     if (err) 
      return callback(err); 

     if (!data || !data.Count) 
      return callback(null, result); 

     // add data to result 

     dynamoDB.scan(query, readNext); 
    } 

    dynamoDB.scan(query, readNext); 
}; 
+0

我在這裏可能是錯的,但通過這樣做,我會切斷瀑布鏈,對吧?我的意思是,一旦我讀完所有的數據,我將繼續保存到MySQL,是這樣嗎?並感謝您的幫助! – Leonardo

+0

是的,瀑布會等待所有數據。如果你需要部分過程數據,那麼'readNext'必須做到這一點。 –

+0

嗯,那會怎麼樣?使用readNext?有點新異步 – Leonardo