不要怂,就是干,撸起袖子干!

Commit f8efb9f9 by Jan Aagaard Meier

Promisified bulk create

1 parent 68620db1
...@@ -1051,7 +1051,7 @@ module.exports = (function() { ...@@ -1051,7 +1051,7 @@ module.exports = (function() {
* @param {Object} [options] * @param {Object} [options]
* @param {Array} [options.fields] Fields to insert (defaults to all fields) * @param {Array} [options.fields] Fields to insert (defaults to all fields)
* @param {Boolean} [options.validate=false] Should each row be subject to validation before it is inserted. The whole insert will fail if one row fails validation * @param {Boolean} [options.validate=false] Should each row be subject to validation before it is inserted. The whole insert will fail if one row fails validation
* @param {Boolean} [options.hooks=false] Run before / after bulkCreate hooks? * @param {Boolean} [options.hooks=false] Run before / after create hooks for each individual DAO? BulkCreate hooks will still be run.
* @param {Boolean} [options.ignoreDuplicates=false] Ignore duplicate values for primary keys? (not supported by postgres) * @param {Boolean} [options.ignoreDuplicates=false] Ignore duplicate values for primary keys? (not supported by postgres)
* *
* @return {Promise} * @return {Promise}
...@@ -1079,81 +1079,40 @@ module.exports = (function() { ...@@ -1079,81 +1079,40 @@ module.exports = (function() {
options = Utils._.extend(options, fieldsOrOptions) options = Utils._.extend(options, fieldsOrOptions)
} }
if(this.daoFactoryManager.sequelize.options.dialect === 'postgres' && options.ignoreDuplicates ) { if(this.sequelize.options.dialect === 'postgres' && options.ignoreDuplicates) {
return new Utils.Promise(function(resolve, reject) { return this.sequelize.Promise.reject(new Error('Postgres does not support the \'ignoreDuplicates\' option.'))
reject(new Error('Postgres does not support the \'ignoreDuplicates\' option.'))
});
} }
var self = this var self = this
, updatedAtAttr = this._timestampAttributes.updatedAt , updatedAtAttr = this._timestampAttributes.updatedAt
, createdAtAttr = this._timestampAttributes.createdAt , createdAtAttr = this._timestampAttributes.createdAt
, errors = [] , errors = []
, daoPromises = []
, daos = records.map(function(v) { , daos = records.map(function(v) {
return self.build(v, { return self.build(v, {
isNewRecord: true isNewRecord: true
}) })
}) })
return new Utils.CustomEventEmitter(function(emitter) { if (options.validate && options.fields.length) {
var done = function() { var skippedFields = Utils._.difference(Object.keys(self.attributes), options.fields);
self.runHooks('afterBulkCreate', daos, options.fields, function(err, newRecords, newFields) { }
if (!!err) {
return emitter.emit('error', err)
}
daos = newRecords || daos var done = function() {
options.fields = newFields || options.fields return self.runHooks('afterBulkCreate', daos, options.fields).then(function(newRecords, newFields) {
daos = newRecords || daos
options.fields = newFields || options.fields
emitter.emit('success', daos, options.fields) return new self.sequelize.Promise.resolve(daos, options.fields)
}) })
} }
var next = function(err) { return self.runHooks('beforeBulkCreate', daos, options.fields).then(function(newRecords, newFields) {
if (err !== undefined && err !== null) { daos = newRecords || daos
return emitter.emit('error', err) options.fields = newFields || options.fields
}
var runHook = function (dao) {
if (options.hooks === false) { if (options.hooks === false) {
return runQuery()
}
var i = 0
var iterate = function(i) {
self.runHooks('beforeCreate', daos[i], function(err, newValues) {
if (!!err) {
return emitter.emit('error', err)
}
daos[i] = newValues || daos[i]
daos[i].save({ transaction: options.transaction }).error(function(err) {
emitter.emit('error', err)
}).success(function() {
self.runHooks('afterCreate', daos[i], function(err, newValues) {
if (!!err) {
return emitter.emit('error', err)
}
daos[i] = newValues || daos[i]
i++
if (i >= daos.length) {
return done()
}
iterate(i)
})
})
})
}
iterate(i)
}
var runQuery = function() {
// we will re-create from DAOs, which may have set up default attributes
records = []
daos.forEach(function(dao) {
var values = options.fields.length > 0 ? {} : dao.dataValues var values = options.fields.length > 0 ? {} : dao.dataValues
options.fields.forEach(function(field) { options.fields.forEach(function(field) {
...@@ -1169,65 +1128,50 @@ module.exports = (function() { ...@@ -1169,65 +1128,50 @@ module.exports = (function() {
} }
records.push(values) records.push(values)
})
self.QueryInterface.bulkInsert(self.getTableName(), records, options, self) return values
.on('sql', function(sql) { }
emitter.emit('sql', sql)
}) return self.runHooks('beforeCreate', dao).then(function(newValues) {
.error(function(err) { dao = newValues || dao
emitter.emit('error', err) return dao.save({ transaction: options.transaction }).then(function() {
}).success(function(rows) { return self.runHooks('afterCreate', dao)
done() })
}) })
} }
self.runHooks('beforeBulkCreate', daos, options.fields, function(err, newRecords, newFields) { var runValidation = function (dao) {
if (!!err) { if (options.validate === false) {
return emitter.emit('error', err) return dao
} }
daos = newRecords || daos
options.fields = newFields || options.fields
if (options.validate === true) { var fn = options.hooks === true ? 'hookValidate' : 'validate'
if (options.fields.length) { return dao[fn]({skip: skippedFields}).then(function (err) {
var skippedFields = Utils._.difference(Object.keys(self.attributes), options.fields); if (!!err) {
errors.push({record: dao, errors: err})
} }
})
}
if (options.hooks === true) { records = []
var iterate = function(i) { daos.forEach(function (dao) {
daos[i].hookValidate({skip: skippedFields}).complete(function (err) { daoPromises.push(runValidation(dao))
if (!!err) { daoPromises.push(runHook(dao))
errors.push({record: v, errors: err}) })
}
i++
if (i > daos.length) {
next(errors.length > 0 ? errors : null)
}
iterate(i)
})
}
} else {
var afterDaos = Utils._.after(daos.length, function() {
next(errors.length > 0 ? errors : null)
})
daos.forEach(function(v) { return self.sequelize.Promise.all(daoPromises).then(function() {
v.validate({skip: skippedFields}).success(function(err) { if (errors.length) {
if (!!err) { // Validation or hooks failed
errors.push({record: v, errors: err}) return self.sequelize.Promise.reject(errors)
} } else if (records.length) {
afterDaos() // Insert all records at once
}) return self.QueryInterface.bulkInsert(self.getTableName(), records, options, self).then(done)
})
}
} else { } else {
next() // Records were already saved while running create / update hooks
return done()
} }
}) })
}).run() })
} }
/** /**
......
var Validator = require("validator") var Validator = require("validator")
, Utils = require("./utils") , Utils = require("./utils")
, sequelizeError = require("./errors") , sequelizeError = require("./errors")
, Promise = require("bluebird") , Promise = require("./promise")
, DataTypes = require("./data-types") , DataTypes = require("./data-types")
, _ = require('lodash') , _ = require('lodash')
...@@ -141,18 +141,18 @@ DaoValidator.prototype.validate = function() { ...@@ -141,18 +141,18 @@ DaoValidator.prototype.validate = function() {
this.errors = new sequelizeError.ValidationError('Validation error') this.errors = new sequelizeError.ValidationError('Validation error')
var self = this var self = this
return new Utils.CustomEventEmitter(function(emitter) { return Promise.settle([
Promise.settle([ self._builtinValidators(),
self._builtinValidators(), self._customValidators(),
self._customValidators(), ]).then(function () {
]).then(function () { if (Object.keys(self.errors).length) {
if (Object.keys(self.errors).length) { return self.errors
emitter.emit('success', self.errors) }
} else {
emitter.emit('success') return new Promise(function (resolve) {
} resolve()
}) })
}).run() })
} }
/** /**
...@@ -161,11 +161,10 @@ DaoValidator.prototype.validate = function() { ...@@ -161,11 +161,10 @@ DaoValidator.prototype.validate = function() {
* - Validation * - Validation
* - After Validation Model Hooks * - After Validation Model Hooks
* *
* @return {EventEmitter} * @return {Promise}
*/ */
DaoValidator.prototype.hookValidate = function() { DaoValidator.prototype.hookValidate = function() {
var self = this var self = this
return self.modelInstance.Model.runHooks('beforeValidate', self.modelInstance).then(function () { return self.modelInstance.Model.runHooks('beforeValidate', self.modelInstance).then(function () {
return self.validate().then(function (error) { return self.validate().then(function (error) {
if (error) { if (error) {
...@@ -296,8 +295,6 @@ DaoValidator.prototype._invokeCustomValidator = Promise.method(function(validato ...@@ -296,8 +295,6 @@ DaoValidator.prototype._invokeCustomValidator = Promise.method(function(validato
isAsync = true; isAsync = true;
} }
if (isAsync) { if (isAsync) {
if (optAttrDefined) { if (optAttrDefined) {
validatorFunction = Promise.promisify(validator.bind(this.modelInstance, invokeArgs)) validatorFunction = Promise.promisify(validator.bind(this.modelInstance, invokeArgs))
...@@ -347,7 +344,6 @@ DaoValidator.prototype._invokeBuiltinValidator = Promise.method(function(value, ...@@ -347,7 +344,6 @@ DaoValidator.prototype._invokeBuiltinValidator = Promise.method(function(value,
} }
}); });
/** /**
* Will validate a single field against its schema definition (isnull). * Will validate a single field against its schema definition (isnull).
* *
......
...@@ -46,23 +46,101 @@ Promise = require("bluebird") ...@@ -46,23 +46,101 @@ Promise = require("bluebird")
util.inherits(SequelizePromise, Promise) util.inherits(SequelizePromise, Promise)
Utils._.extend(SequelizePromise, Promise) Utils._.extend(SequelizePromise, Promise)
SequelizePromise.is = function (obj) {
if (obj === void 0) return false;
return obj instanceof Promise || obj instanceof SequelizePromise;
}
SequelizePromise.all = function(promises) { SequelizePromise.all = function(promises) {
var resolved = SequelizePromise.resolve(Promise.all(promises)); var resolved = SequelizePromise.resolve(Promise.all(promises));
// Propagate sql events // Propagate sql events
promises.forEach(function (promise) { promises.forEach(function (promise) {
promise.on('sql', function (sql) { if (SequelizePromise.is(promise)) {
resolved.emit('sql', sql); promise.on('sql', function (sql) {
}); resolved.emit('sql', sql);
});
promise.$sql.forEach(function (sql) {
resolved.emit('sql', sql); promise.$sql.forEach(function (sql) {
}); resolved.emit('sql', sql);
});
}
}); });
return resolved; return resolved;
}; };
SequelizePromise.settle = function (promises) {
var settled = SequelizePromise.resolve(Promise.settle(promises))
// Propagate sql events
promises.forEach(function (promise) {
if (SequelizePromise.is(promise)) {
promise.on('sql', function (sql) {
settled.emit('sql', sql);
});
promise.$sql.forEach(function (sql) {
settled.emit('sql', sql);
});
}
});
return settled;
}
function tryCatchApply(fn, args, receiver) {
try {
return fn.apply(receiver, args);
}
catch (e) {
return {e: e};
}
}
SequelizePromise.method = function (fn) {
if (typeof fn !== "function") {
throw new TypeError(NOT_FUNCTION_ERROR);
}
return function Promise$_method() {
var value = tryCatchApply(fn, Array.prototype.slice.apply(arguments), this);
var ret = new SequelizePromise(INTERNAL);
ret._setTrace(void 0);
ret._resolveFromSyncValue(value);
return ret;
};
};
Promise.prototype._resolveFromSyncValue = function(value) {
if (value && value.hasOwnProperty('e')) {
this._cleanValues();
this._setRejected();
this._settledValue = value.e;
this._ensurePossibleRejectionHandled();
}
else {
var maybePromise = Promise._cast(value, void 0);
if (maybePromise instanceof Promise) {
this._follow(maybePromise);
}
else {
this._cleanValues();
this._setFulfilled();
this._settledValue = value;
}
}
};
SequelizePromise.attempt = SequelizePromise["try"] = function (fn, args, ctx) {
var value = tryCatchApply(fn, args, ctx)
var ret = new SequelizePromise(INTERNAL);
ret._setTrace(void 0);
ret._resolveFromSyncValue(value);
return ret;
};
// Need to hack resolve cause we can't hack all directrly // Need to hack resolve cause we can't hack all directrly
SequelizePromise.resolve = SequelizePromise.fulfilled = function(value) { SequelizePromise.resolve = SequelizePromise.fulfilled = function(value) {
var ret = new SequelizePromise(INTERNAL); var ret = new SequelizePromise(INTERNAL);
...@@ -76,6 +154,53 @@ SequelizePromise.resolve = SequelizePromise.fulfilled = function(value) { ...@@ -76,6 +154,53 @@ SequelizePromise.resolve = SequelizePromise.fulfilled = function(value) {
return ret; return ret;
}; };
SequelizePromise.promisify = function (callback, receiver) {
function promisified() {
var _receiver = receiver;
var promise = new SequelizePromise(INTERNAL);
promise._setTrace(void 0);
var fn = function PromiseResolver$_callback(err, value) {
if (err) {
// var wrapped = wrapAsRejectionError(maybeWrapAsError(err));
var wrapped = err
promise._attachExtraTrace(wrapped);
promise._reject(wrapped);
}
else {
if (arguments.length > 2) {
promise._fulfill(Array.prototype.slice.call(arguments, 1));
}
else {
promise._fulfill(value);
}
}
}
try {
callback.apply(_receiver, withAppended(arguments, fn));
}
catch(e) {
// var wrapped = maybeWrapAsError(e);
var wrapped = e
promise._attachExtraTrace(wrapped);
promise._reject(wrapped);
}
return promise;
}
promisified.__isPromisified__ = true;
return promisified;
}
function withAppended(target, appendee) {
var len = target.length;
var ret = new Array(len + 1);
var i;
for (i = 0; i < len; ++i) {
ret[i] = target[i];
}
ret[i] = appendee;
return ret;
}
// Need to hack _then to make sure our promise is chainable // Need to hack _then to make sure our promise is chainable
SequelizePromise.prototype._then = function ( SequelizePromise.prototype._then = function (
didFulfill, didFulfill,
......
...@@ -480,7 +480,7 @@ describe(Support.getTestDialectTeaser("DAOFactory"), function () { ...@@ -480,7 +480,7 @@ describe(Support.getTestDialectTeaser("DAOFactory"), function () {
expect(str2.str).to.equal('http://sequelizejs.org') expect(str2.str).to.equal('http://sequelizejs.org')
StringIsNullOrUrl.create({ str: '' }).error(function(err) { StringIsNullOrUrl.create({ str: '' }).error(function(err) {
expect(err).to.exist expect(err).to.exist
expect(err.str[0].message).to.match(/Validation isURL failed/) expect(err.str[0].message).to.match(/Validation isURL failed/)
done() done()
}) })
......
...@@ -219,8 +219,8 @@ describe(Support.getTestDialectTeaser("DaoValidator"), function() { ...@@ -219,8 +219,8 @@ describe(Support.getTestDialectTeaser("DaoValidator"), function() {
} }
}) })
var successfulUser = UserSuccess.build({ name: succeedingValue }) var successfulUser = UserSuccess.build({ name: succeedingValue })
successfulUser.validate().success( function() { successfulUser.validate().success(function(errors) {
expect(arguments).to.have.length(0) expect(errors).to.be.undefined
done() done()
}).error(function(err) { }).error(function(err) {
expect(err).to.deep.equal({}) expect(err).to.deep.equal({})
...@@ -539,11 +539,8 @@ describe(Support.getTestDialectTeaser("DaoValidator"), function() { ...@@ -539,11 +539,8 @@ describe(Support.getTestDialectTeaser("DaoValidator"), function() {
expect(error.name[0].message).to.equal("name should equal '2'") expect(error.name[0].message).to.equal("name should equal '2'")
var successfulUser = User.build({ name : "2" }) var successfulUser = User.build({ name : "2" })
successfulUser.validate().success(function() { successfulUser.validate().success(function(err) {
expect(arguments).to.have.length(0) expect(err).not.to.be.defined
done()
}).error(function(err) {
expect(err[0].message).to.equal()
done() done()
}) })
}) })
...@@ -572,8 +569,8 @@ describe(Support.getTestDialectTeaser("DaoValidator"), function() { ...@@ -572,8 +569,8 @@ describe(Support.getTestDialectTeaser("DaoValidator"), function() {
expect(error).to.be.an.instanceOf(self.sequelize.ValidationError) expect(error).to.be.an.instanceOf(self.sequelize.ValidationError)
expect(error.name[0].message).to.equal("Invalid username") expect(error.name[0].message).to.equal("Invalid username")
User.build({ name : "no error" }).validate().success(function() { User.build({ name : "no error" }).validate().success(function(errors) {
expect(arguments).to.have.length(0) expect(errors).not.to.be.defined
done() done()
}) })
}) })
......
...@@ -104,6 +104,7 @@ describe(Support.getTestDialectTeaser("Hooks"), function () { ...@@ -104,6 +104,7 @@ describe(Support.getTestDialectTeaser("Hooks"), function () {
{username: 'Bob', mood: 'cold'}, {username: 'Bob', mood: 'cold'},
{username: 'Tobi', mood: 'hot'} {username: 'Tobi', mood: 'hot'}
], { fields: [], hooks: true }).success(function(bulkUsers) { ], { fields: [], hooks: true }).success(function(bulkUsers) {
console.log(bulkUsers)
expect(beforeBulkCreate).to.be.true expect(beforeBulkCreate).to.be.true
expect(afterBulkCreate).to.be.true expect(afterBulkCreate).to.be.true
expect(bulkUsers).to.be.instanceof(Array) expect(bulkUsers).to.be.instanceof(Array)
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!