我創建了一個腳本來將數據從Dynamo遷移到Mysql數據庫。 首先我沒有使用異步,但我開始在SQL方面遇到瓶頸,所以我決定使用異步庫「節流」dymano部分。 問題:我在路徑中間有一個遞歸,只要發電機有數據我必須繼續這個過程(超簡單的ETL),但我不知道如何在瀑布內執行遞歸。 我的代碼:通過nodejs中的瀑布和遞歸使用異步
function main() {
async.waterfall([getMaxTimestamp, scanDynamoDB, printout, saveToMySQL], function(err, result) {
if(err) console.log(err)
console.log(result)
});
}
function getMaxTimestamp(callback) {
console.time("max query");
connection.query("SELECT MAX(created_at) as start_date from Tracking;", function(err, data) {
console.timeEnd("max query");
callback(err, data);
})
}
function scanDynamoDB(data, callback) {
if (data[0].start_date != null && data[0].start_date)
query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date;
console.time("dynamo read");
dynamoDB.scan(query, function(err, data) {
console.timeEnd("dynamo read");
callback(err, data);
// if (!err) {
// if (data != undefined && data.Count > 0) {
// printout(data.Items) // Print out the subset of results.
// if (data.LastEvaluatedKey) { // Result is incomplete; there is more to come.
// query.ExclusiveStartKey = data.LastEvaluatedKey;
// scanDynamoDB(query);
// }
// } else {
// console.log('No fresh data found on Dynamo')
// } else console.dir(err);
});
};
function assembleSql() {
insertSql = "insert into Tracking (";
for (var i = 0; i < headers.length; i++) {
insertSql += headers[i];
if (i < headers.length - 1)
insertSql += ",";
}
insertSql += ") values ?;"
previousInsertSql = insertSql;
}
function saveToMySQL(items, callback) {
assembleSql();
//connection.connect();
console.time("insert sql")
connection.query(insertSql, [items], function(err, result) {
console.timeEnd("insert sql")
if (err){
callback(err, null)
return;
}
totalInserts += result.affectedRows;
callback(err, totalInserts)
//connection.end();
})
}
function printout(items, callback) {
var headersMap = {};
var values;
var header;
var value;
var out = [];
if (headers.length == 0) {
if (items.length > 0) {
for (var i = 0; i < items.length; i++) {
for (var key in items[i]) {
headersMap[key] = true;
}
}
}
for (var key in headersMap) {
headers.push(key);
}
}
for (index in items) {
values = [];
for (i = 0; i < headers.length; i++) {
value = "";
header = headers[i];
// Loop through the header rows, adding values if they exist
if (items[index].hasOwnProperty(header)) {
if (items[index][header].N) {
value = items[index][header].N;
} else if (items[index][header].S) {
value = items[index][header].S;
} else if (items[index][header].SS) {
value = items[index][header].SS.toString();
} else if (items[index][header].NS) {
value = items[index][header].NS.toString();
} else if (items[index][header].B) {
value = items[index][header].B.toString('base64');
} else if (items[index][header].M) {
value = JSON.stringify(items[index][header].M);
} else if (items[index][header].L) {
value = JSON.stringify(items[index][header].L);
} else if (items[index][header].BOOL !== undefined) {
value = items[index][header].BOOL.toString();
}
}
values.push(value)
}
out.push(values)
}
callback(null, out);
}
main();
註釋的部分就是遞歸發生,但我不知道在哪裏放置這是我流的內!
任何幫助,將不勝感激!
我在這裏可能是錯的,但通過這樣做,我會切斷瀑布鏈,對吧?我的意思是,一旦我讀完所有的數據,我將繼續保存到MySQL,是這樣嗎?並感謝您的幫助! – Leonardo
是的,瀑布會等待所有數據。如果你需要部分過程數據,那麼'readNext'必須做到這一點。 –
嗯,那會怎麼樣?使用readNext?有點新異步 – Leonardo