2017-09-13 114 views
1

我試圖使用node.js將csv文件導入neo4j。我必須將數據插入多個collection/table,所以我必須使用node.js腳本插入數據。但我的問題是,插入CSV數據時無法防止數據重複。使用node.js在neo4j中導入CSV

樣品CSV數據:

name 
------------- 
Afghanistan 
Afghanistan 
Aland 
Albania 
Albania 
Bangladesh 
Bangladesh 

index.js

cp = require('child_process'); 
child = cp.fork(__dirname + "/background-import-csv-file.js"); 
child.on('message', function(msg) { 
    console.log("background-insert-process said : ", msg); 
}); 
file = path.resolve(__dirname, `./file/simplemaps.csv`); 
child.send(file); 

background-import-csv-file.js,我有兩種不同的方式來編寫代碼。

首先無極基於background-import-csv-file.js):

cp = require('child_process'); 
csv = require('fast-csv'); 
Q = require('q'); 
DB = require("./common/driver"); 
Country = require('./collection/country'); 
process.on("message", (file) => { 
    stream = fs.createReadStream(file); 
    csv 
    .fromStream(stream, { headers: true }) 
    .on("data", function(data) { 
     let countryData = { "name": data.name }; 
     neo = new DB(); 
     country = new Country(neo); 
     country.insert(countryData) 
      .then(resp => process.send(resp.msg)) 
      .catch(err => process.send(err)) 
    }) 
    .on("end",() => process.send("file read complete")); 
}); 

./collection/country.js

Q = require('q'); 
    Country = function Country(neo) { 
    this.country = "Country"; this.neo = neo; 
    }; 

    Country.prototype.find = function find(filters) { 
    query = `MATCH (a:Country { name: '${filters.name}' }) RETURN {country:properties(a)}`; 
    return this.neo.run(query, filters).then(resp => resp); 
    } 

    Country.prototype.create = function create(data) { 
    query = `CREATE (ax:Country { name: '${data.name}' }) RETURN ax `; 
    return this.neo.run(query, {}).then(resp => resp[0].properties).catch(err => err) 
    } 

    Country.prototype.insert = function insert(country) { 
     filter = { name: country.name }; 
     return Q(this.find(filter)) 
     .then(resp => resp.length > 0 ? Q.resolve({ msg: `country: [${country.name}] is already exist` }) : Q.resolve(this.create(country)) ) 
    .then(resp => resp) 
    .catch(e => Q.reject(e)); 
    } 

    module.exports = Country; 

./common/driver.js

neo4j = require('neo4j-driver').v1; 
function DB() { 
    this.driver = neo4j.driver(); this.session = this.driver.session(); 
} 

DB.prototype.run = function run(query, data) { 
    return this.session.run(query, data) 
    .then(response => response.records.map(
      record => record._fields[0] ? 
      record._fields.length ? record._fields[0] : {} : {} 
     )).catch(err => new Error(err)); 
} 

module.exports = DB; 

當我在終端運行index.js,在databas e,我有2 Afghanistan,1 Aland,2 Albania和2 Bangladesh。但我需要在我的數據庫中有1 Afghanistan,1 Aland,1 Albania和1 Bangladesh。當我分析代碼時,發現在插入數據之前,我正在檢查數據(Country.prototype.find = function find(filters)),如果它已經存在與否,但它總是返回空結果。這就是爲什麼它插入多個數據。如果我再次運行index.js,則不會將新數據插入到數據庫中。爲了解決這個問題,我已經試過以下CQL

MERGE (c:Country { name: '${data.name}' }) RETURN c 

它插入唯一的數據,但它殺了這麼多的時間。然後,我寫了下面的代碼:

事件驅動background-import-csv-file.js):

process.on("message", (file) => { 
    stream = fs.createReadStream(file); 
    csv 
    .fromStream(stream, { headers: true }) 
    .on("data", function(data) { 
     countryData = { "name": data.name }; 
     neo = new DB(); 
     country = new Country(neo); 
     country.find(countryData); 
     country.on('find', resp => resp.length > 0 ? Q.resolve({ msg: `country: [${country.name}] is already exist` }) : Q.resolve(country.create(countryData)) ); 

     country.on('create', resp => console.log(resp)); 
    }) 
    .on("end",() => process.send("file read complete")); 
}); 

./collection/country.js

EventEmitter = require('events').EventEmitter; 
util = require('util'); 

Country = function Country(neo) { 
    this.neo = neo; EventEmitter.call(this); 
}; 
util.inherits(Country, EventEmitter); 

Country.prototype.find = function find(filters) { 
    query = `MATCH (a:Country { name: '${filters.name}' }) RETURN {country:properties(a)}`; 
    return this.neo.run(query, {}).then(resp => this.emit('find', resp)); 
} 

Country.prototype.create = function create(data) { 
    query = `CREATE (ax:Country { name: '${data.name}' }) RETURN ax `; 
    return this.neo.run(query, {}).then(resp => this.emit('create', resp[0].properties)).catch(err => err) 
} 

而這一次,它顯示了同樣的結果。我錯過了什麼?任何建議都將非常有用。

注意:我正在使用fast-csv進行csv解析,並使用Q作爲承諾。

+0

什麼是 「勒貝爾」 是什麼意思?我沒有看到一個顯而易見的理由,爲什麼用一個簡單的Cypher查詢就無法做到這一點。 – cybersam

回答

1

我的問題是,在csv文件解析,它是如此之快(事件驅動),它不等待完成插入數據到數據庫。所以我必須暫停文件解析然後重新開始。

我解決使用下面的代碼我的問題:

承諾基礎(背景進口的CSV file.js):

cp = require('child_process'); 
csv = require('fast-csv'); 
Q = require('q'); 
DB = require("./common/driver"); 
Country = require('./collection/country'); 

process.on("message", (file) => { 
    stream = fs.createReadStream(file); 
    csvstream = csv 
    .fromStream(stream, { headers: true }) 
    .on("data", function(data) { 
     csvstream.pause(); // pause the csv file parsing 
     countryData = { "name": data.name }; 
     neo = new DB(); 
     country = new Country(neo); 
     country.insert(countryData) 
     .then(resp => { 
      process.send(resp.msg); 
      neo.close(); 
      return csvstream.resume(); // after completing db process, resume 
     }) 
     .catch(err => { 
      process.send(err); 
      return csvstream.resume(); // if failed, then resume 
      }); 
    }) 
    .on("end",() => process.send("file read complete")); 
}); 
1

其實我可以想象以下解決方案:

  1. 修改CSV文件本身的編程語言(如node.js中)具有相同名稱中刪除重複行。
  2. 添加的Neo4j unique constrains CREATE CONSTRAINT ON (c:Country) ASSERT c.name IS UNIQUE
  3. 涉及中間件,像排隊要防止重複項,對於這一點,你需要定義你自己的消息結構和重複運算。

以上。