2017-08-01 74 views
1

我需要從CSV文件(大約20K行)將數據導入到我的數據庫。有些行可能已經存在於數據庫中,因此只需要更新它們,但必須插入新行。如果任何操作失敗,交易必須取消。如何在書架上批量「插入」?

我該怎麼做?這是我使用的代碼:

var vehicle = { 
        id: row.id, 
        lastUpdate: moment(row.lastUpdate, 'DD/MM/YYYY - HH:mm').toDate(), 
        version: row.version, 
        color: row.color, 
        location: row.location, 
        status: row.status 
       }; 

       Vehicle.forge({ 
        id: row.id 
       }) 
        .save(vehicle, { transacting: t, patch: true }) 
        .then(model => { 
         console.log('*************' + vehicle.id); 
        }) 
        .catch(Vehicle.NoRowsUpdatedError, err => { 
         // There are no rows for this chassis, so let's insert it 
         Vehicle.forge(vehicle) 
          .save(null, { transacting: t, method: 'insert' }) 
          .then(model => { 
           console.log('++++++++++++++' + vehicle.id); 
          }) 
          .catch(err => { 
           console.log(`INSERT ERROR: ${err.message}`); 
           t.rollback(); 
           return res.json({ status: false, count: 0, error: err.message }); 
          }); 
        }) 
        .catch(err => { 
         console.log(`UPDATE ERROR: ${err.message}`); 
         t.rollback(); 
         return res.json({ status: false, count: 0, error: err.message }); 
        }); 

此代碼是在for循環中,但在第二次迭代失敗可能是因爲promises之間的併發。

我也嘗試添加一個自定義函數到我的模型文件,但它說該函數不存在。

let bookshelf = require('./base'); 

var Vehicle, 
    Vehicles; 

Vehicle = bookshelf.Model.extend({ 
    tableName: 'vehicles', 

    /** 
    * Insert a model based on data 
    * @param {Object} data 
    * @param {Object} [options] Options for model.save 
    * @return {Promise(bookshelf.Model)} 
    */ 
    create: function (data, options) { 
     return this.forge(data).save(null, options); 
    }, 

    /** 
    * Select a model based on a query 
    * @param {Object} [query] 
    * @param {Object} [options] Options for model.fetch 
    * @param {Boolean} [options.require=false] 
    * @return {Promise(bookshelf.Model)} 
    */ 
    findOne: function (query, options) { 
     options = extend({ require: true }, options); 
     return this.forge(query).fetch(options); 
    }, 

    /** 
    * Select a model based on data and update if found, insert if not found 
    * @param {Object} selectData Data for select 
    * @param {Object} updateData Data for update 
    * @param {Object} [options] Options for model.save 
    */ 
    upsert: function (selectData, updateData, options) { 
     return this.findOne(selectData, extend(options, { require: false })) 
      .bind(this) 
      .then(function (model) { 
       return model 
        ? model.save(
         updateData, 
         extend({ patch: true, method: 'update' }, options) 
        ) 
        : this.create(
         extend(selectData, updateData), 
         extend(options, { method: 'insert' }) 
        ) 
      }); 
    } 
}); 

Vehicles = bookshelf.Collection.extend({ 
    model: Vehicle 
}); 

module.exports = { 
    Vehicle: bookshelf.model('Vehicle', Vehicle), 
    Vehicles: bookshelf.collection('Vehicles', Vehicles) 
}; 

回答

2

除了使用書架,您可以直接使用knex來做到這一點。只要抓住你傳遞我的書架knex的情況下,你可以使用它像這樣:

knex.transaction((trx) => { 
    return Bluebird.map(vehicles, vehicle => { 
    const insert = knex('vehicles').insert(vehicle).toString(); 
    delete vehicle.id; 
    const update = knex('vehicles').update(vehicle).toString(); 
    const set = update.substring(18); 
    return trx.raw(`${insert} ON CONFLICT (id) DO UPDATE ${set}`); 
    }); 
}); 

我們可以利用Knex的方便toString方法來生成最原始我們查詢了我們;通過這種方式,即使Knex不直接支持,我們也可以做一個補充。 Bluebird的map函數非常適合乾淨地處理這樣的數據,並讓您等待完全循環。

+2

這很好,除了在處理先前的查詢(驅動程序必須緩衝它們導致額外的內存消耗)之前,應該使用Bluebird.mapSeries來同時防止對DB驅動程序的併發查詢溢出。其他的事情是,不應該使用knex'toString()'來呈現串聯查詢,否則會導致很多類型的問題,例如。在插入的數據中帶有'?'標記。正確的方法是使用綁定並將查詢作爲參數傳遞,比如'trx.raw('?ON CONFLICT(id)DO UPDATE?',[insertQuery,setQuery])'以保持值綁定正確發送到db驅動程序。 –