不要怂,就是干,撸起袖子干!

You need to sign in or sign up before continuing.
Commit ca62cf10 by Mick Hansen

feat(promises): properly propagate errors

1 parent 57b6f92a
...@@ -667,11 +667,9 @@ module.exports = (function() { ...@@ -667,11 +667,9 @@ module.exports = (function() {
identifier = self.__options.hasPrimaryKeys ? self.primaryKeyValues : { id: self.id }; identifier = self.__options.hasPrimaryKeys ? self.primaryKeyValues : { id: self.id };
query = self.QueryInterface.delete(self, self.QueryInterface.QueryGenerator.addSchema(self.Model), identifier, options) query = self.QueryInterface.delete(self, self.QueryInterface.QueryGenerator.addSchema(self.Model), identifier, options)
} }
return query return query;
}).then(function (results) { }).then(function (results) {
return self.Model.runHooks(self.Model.options.hooks.afterDestroy, self).then(function () { return self.Model.runHooks(self.Model.options.hooks.afterDestroy, self).return(results);
return results;
});
}); });
} }
......
var Utils = require('../../utils') var Utils = require('../../utils')
, CustomEventEmitter = require("../../emitters/custom-event-emitter") , CustomEventEmitter = require("../../emitters/custom-event-emitter")
, Promise = require("../../promise")
, Dot = require('dottie') , Dot = require('dottie')
, _ = require('lodash') , _ = require('lodash')
, QueryTypes = require('../../query-types') , QueryTypes = require('../../query-types')
......
...@@ -47,21 +47,15 @@ module.exports = (function() { ...@@ -47,21 +47,15 @@ module.exports = (function() {
self.pendingQueries++ self.pendingQueries++
self.clientDrained = false self.clientDrained = false
return new Utils.CustomEventEmitter(function(emitter) { return self.connect().then(function(done) {
self.connect() var query = new Query(self.client, self.sequelize, callee, options || {})
.on('error', function(err) {
emitter.emit('error', err) // We return the query regardless of error or success in the query
}) return query.run(sql).finally(function () {
.on('success', function(done) { self.endQuery.call(self);
var query = new Query(self.client, self.sequelize, callee, options || {}) done && done();
});
return query.run(sql) });
.complete(function(err) {
self.endQuery.call(self)
done && done(err) })
.proxy(emitter)
})
}).run()
} }
ConnectorManager.prototype.afterTransactionSetup = function(callback) { ConnectorManager.prototype.afterTransactionSetup = function(callback) {
...@@ -70,7 +64,7 @@ module.exports = (function() { ...@@ -70,7 +64,7 @@ module.exports = (function() {
ConnectorManager.prototype.connect = function(callback) { ConnectorManager.prototype.connect = function(callback) {
var self = this var self = this
var emitter = new (require('events').EventEmitter)() var emitter = new Utils.Promise();
// in case database is slow to connect, prevent orphaning the client // in case database is slow to connect, prevent orphaning the client
// TODO: We really need some sort of queue/flush/drain mechanism // TODO: We really need some sort of queue/flush/drain mechanism
......
...@@ -3,6 +3,7 @@ var Utils = require("../../utils") ...@@ -3,6 +3,7 @@ var Utils = require("../../utils")
, DataTypes = require('../../data-types') , DataTypes = require('../../data-types')
, hstore = require('./hstore') , hstore = require('./hstore')
, QueryTypes = require('../../query-types') , QueryTypes = require('../../query-types')
, Promise = require('../../promise')
module.exports = (function() { module.exports = (function() {
var Query = function(client, sequelize, callee, options) { var Query = function(client, sequelize, callee, options) {
...@@ -31,138 +32,138 @@ module.exports = (function() { ...@@ -31,138 +32,138 @@ module.exports = (function() {
this.sequelize.log('Executing (' + this.options.uuid + '): ' + this.sql) this.sequelize.log('Executing (' + this.options.uuid + '): ' + this.sql)
} }
query.on('row', function(row) { return new Promise(function (resolve, reject) {
rows.push(row) var promise = this;
})
query.on('error', function(err) {
receivedError = true
err.sql = sql
self.emit('sql', sql, self.options.uuid)
self.emit('error', err, self.callee)
})
query.on('end', function(result) {
self.emit('sql', self.sql, self.options.uuid)
if (receivedError) {
return
}
onSuccess.call(self, rows, sql, result) query.on('row', function(row) {
}) rows.push(row)
})
return this query.on('error', function(err) {
} receivedError = true
err.sql = sql
promise.emit('sql', sql, self.options.uuid)
reject(err);
})
Query.prototype.getInsertIdField = function() { query.on('end', function(result) {
return 'id' if (receivedError) {
} return;
}
var onSuccess = function(rows, sql, result) { promise.emit('sql', self.sql, self.options.uuid)
var results = rows resolve([rows, sql, result]);
, self = this })
, isTableNameQuery = (sql.indexOf('SELECT table_name FROM information_schema.tables') === 0) }).spread(function (rows, sql, result) {
, isRelNameQuery = (sql.indexOf('SELECT relname FROM pg_class WHERE oid IN') === 0) var results = rows
, isTableNameQuery = (sql.indexOf('SELECT table_name FROM information_schema.tables') === 0)
if (isTableNameQuery || isRelNameQuery) { , isRelNameQuery = (sql.indexOf('SELECT relname FROM pg_class WHERE oid IN') === 0)
if (isRelNameQuery) {
results = rows.map(function(row) { if (isTableNameQuery || isRelNameQuery) {
return { if (isRelNameQuery) {
name: row.relname, results = rows.map(function(row) {
tableName: row.relname.split('_')[0] return {
} name: row.relname,
}) tableName: row.relname.split('_')[0]
} else { }
results = rows.map(function(row) { return Utils._.values(row) }) })
} else {
results = rows.map(function(row) { return Utils._.values(row) })
}
return results
} }
return this.emit('success', results)
}
if (this.send('isSelectQuery')) { if (self.send('isSelectQuery')) {
if (this.sql.toLowerCase().indexOf('select c.column_name') === 0) { if (self.sql.toLowerCase().indexOf('select c.column_name') === 0) {
var result = {} result = {}
rows.forEach(function(_result) { rows.forEach(function(_result) {
result[_result.Field] = { result[_result.Field] = {
type: _result.Type.toUpperCase(), type: _result.Type.toUpperCase(),
allowNull: (_result.Null === 'YES'), allowNull: (_result.Null === 'YES'),
defaultValue: _result.Default, defaultValue: _result.Default,
special: (!!_result.special ? self.sequelize.queryInterface.QueryGenerator.fromArray(_result.special) : []) special: (!!_result.special ? self.sequelize.queryInterface.QueryGenerator.fromArray(_result.special) : [])
} }
if (result[_result.Field].type === 'BOOLEAN') { if (result[_result.Field].type === 'BOOLEAN') {
result[_result.Field].defaultValue = { 'false': false, 'true': true }[result[_result.Field].defaultValue] result[_result.Field].defaultValue = { 'false': false, 'true': true }[result[_result.Field].defaultValue]
if (result[_result.Field].defaultValue === undefined) { if (result[_result.Field].defaultValue === undefined) {
result[_result.Field].defaultValue = null result[_result.Field].defaultValue = null
}
} }
}
if (typeof result[_result.Field].defaultValue === 'string') { if (typeof result[_result.Field].defaultValue === 'string') {
result[_result.Field].defaultValue = result[_result.Field].defaultValue.replace(/'/g, "") result[_result.Field].defaultValue = result[_result.Field].defaultValue.replace(/'/g, "")
if (result[_result.Field].defaultValue.indexOf('::') > -1) { if (result[_result.Field].defaultValue.indexOf('::') > -1) {
var split = result[_result.Field].defaultValue.split('::') var split = result[_result.Field].defaultValue.split('::')
if (split[1].toLowerCase() !== "regclass)") { if (split[1].toLowerCase() !== "regclass)") {
result[_result.Field].defaultValue = split[0] result[_result.Field].defaultValue = split[0]
}
} }
} }
} })
})
this.emit('success', result) return result
} else { } else {
// Postgres will treat tables as case-insensitive, so fix the case // Postgres will treat tables as case-insensitive, so fix the case
// of the returned values to match attributes // of the returned values to match attributes
if (this.options.raw === false && this.sequelize.options.quoteIdentifiers === false) { if (self.options.raw === false && self.sequelize.options.quoteIdentifiers === false) {
var attrsMap = Utils._.reduce(this.callee.attributes, function(m, v, k) { m[k.toLowerCase()] = k; return m}, {}) var attrsMap = Utils._.reduce(self.callee.attributes, function(m, v, k) { m[k.toLowerCase()] = k; return m}, {})
rows.forEach(function(row) { rows.forEach(function(row) {
Utils._.keys(row).forEach(function(key) { Utils._.keys(row).forEach(function(key) {
var targetAttr = attrsMap[key] var targetAttr = attrsMap[key]
if (targetAttr != key) { if (targetAttr != key) {
row[targetAttr] = row[key] row[targetAttr] = row[key]
delete row[key] delete row[key]
} }
})
}) })
}) }
}
// Parse hstore fields if the model has any hstore fields. // Parse hstore fields if the model has any hstore fields.
// This cannot be done in the 'pg' lib because hstore is a UDT. // This cannot be done in the 'pg' lib because hstore is a UDT.
if (!!this.callee && !!this.callee._hasHstoreAttributes) { if (!!self.callee && !!self.callee._hasHstoreAttributes) {
rows.forEach(function(row) { rows.forEach(function(row) {
Utils._.keys(row).forEach(function(key) { Utils._.keys(row).forEach(function(key) {
if (self.callee._isHstoreAttribute(key)) { if (self.callee._isHstoreAttribute(key)) {
row[key] = hstore.parse(row[key]) row[key] = hstore.parse(row[key])
} }
})
}) })
}) }
}
this.emit('success', this.send('handleSelectQuery', rows)) return self.send('handleSelectQuery', rows)
} }
} else if (this.send('isShowOrDescribeQuery')) { } else if (self.send('isShowOrDescribeQuery')) {
this.emit('success', results) return results
} else if ([QueryTypes.BULKUPDATE, QueryTypes.BULKDELETE].indexOf(this.options.type) !== -1) { } else if ([QueryTypes.BULKUPDATE, QueryTypes.BULKDELETE].indexOf(self.options.type) !== -1) {
this.emit('success', result.rowCount) return result.rowCount
} else if (this.send('isInsertQuery') || this.send('isUpdateQuery')) { } else if (self.send('isInsertQuery') || self.send('isUpdateQuery')) {
if (this.callee !== null) { // may happen for bulk inserts or bulk updates if (self.callee !== null) { // may happen for bulk inserts or bulk updates
for (var key in rows[0]) { for (var key in rows[0]) {
if (rows[0].hasOwnProperty(key)) { if (rows[0].hasOwnProperty(key)) {
var record = rows[0][key] var record = rows[0][key]
if (!!this.callee.Model && !!this.callee.Model.rawAttributes && !!this.callee.Model.rawAttributes[key] && !!this.callee.Model.rawAttributes[key].type && this.callee.Model.rawAttributes[key].type.toString() === DataTypes.HSTORE.toString()) { if (!!self.callee.Model && !!self.callee.Model.rawAttributes && !!self.callee.Model.rawAttributes[key] && !!self.callee.Model.rawAttributes[key].type && self.callee.Model.rawAttributes[key].type.toString() === DataTypes.HSTORE.toString()) {
record = hstore.parse(record) record = hstore.parse(record)
}
self.callee.dataValues[key] = record
} }
this.callee.dataValues[key] = record
} }
} }
return self.callee
} else {
return results
} }
})
this.emit('success', this.callee) return this
} else { }
this.emit('success', results)
} Query.prototype.getInsertIdField = function() {
return 'id'
} }
return Query return Query
......
var util = require("util") var util = require("util")
, EventEmitter = require("events").EventEmitter , EventEmitter = require("events").EventEmitter
, Promise = require("bluebird") , Promise = require("../promise")
, proxyEventKeys = ['success', 'error', 'sql'] , proxyEventKeys = ['success', 'error', 'sql']
, Utils = require('../utils') , Utils = require('../utils')
......
...@@ -20,7 +20,7 @@ var SequelizePromise = function(resolver) { ...@@ -20,7 +20,7 @@ var SequelizePromise = function(resolver) {
this._boundTo = void 0; this._boundTo = void 0;
// Intercept the resolver so we can resolve with emit's // Intercept the resolver so we can resolve with emit's
this._resolveFromResolver(function (resolve, reject) { this._resolveFromResolver(function resolverIntercept(resolve, reject) {
self.seqResolve = resolve; self.seqResolve = resolve;
self.seqReject = reject; self.seqReject = reject;
...@@ -55,7 +55,7 @@ SequelizePromise.prototype._then = function ( ...@@ -55,7 +55,7 @@ SequelizePromise.prototype._then = function (
* Start of sequelize specific * Start of sequelize specific
* Needed to transfer sql events accross .then() calls * Needed to transfer sql events accross .then() calls
*/ */
if (this.proxySql && ret.emit) { if (this.proxySql && ret && ret.emit) {
this.proxySql(ret); this.proxySql(ret);
} }
/* /*
...@@ -81,9 +81,7 @@ SequelizePromise.prototype._settlePromiseAt = function (index) { ...@@ -81,9 +81,7 @@ SequelizePromise.prototype._settlePromiseAt = function (index) {
if (this.$sql && receiver && receiver.emit) { if (this.$sql && receiver && receiver.emit) {
this.$sql.forEach(function (sql) { this.$sql.forEach(function (sql) {
if (receiver && receiver.emit) { receiver.emit('sql', sql);
receiver.emit('sql', sql);
}
}); });
} }
...@@ -109,7 +107,7 @@ SequelizePromise.prototype.emit = function(evt) { ...@@ -109,7 +107,7 @@ SequelizePromise.prototype.emit = function(evt) {
if (evt === 'success') { if (evt === 'success') {
this.seqResolve.apply(this, args); this.seqResolve.apply(this, args);
} else if (evt === 'error') { } else if (evt === 'error') {;
this.seqReject.apply(this, args); this.seqReject.apply(this, args);
} else { } else {
// Needed to transfer sql across .then() calls // Needed to transfer sql across .then() calls
......
...@@ -473,10 +473,12 @@ module.exports = (function() { ...@@ -473,10 +473,12 @@ module.exports = (function() {
} }
QueryInterface.prototype.insert = function(dao, tableName, values, options) { QueryInterface.prototype.insert = function(dao, tableName, values, options) {
var sql = this.QueryGenerator.insertQuery(tableName, values, dao.Model.rawAttributes) var sql = this.QueryGenerator.insertQuery(tableName, values, dao.Model.rawAttributes);
return queryAndEmit.call(this, [sql, dao, options], 'insert', {
success: function(obj) { obj.isNewRecord = false } return queryAndEmit.call(this, [sql, dao, options], 'insert').then(function (result) {
}) result.isNewRecord = false;
return result;
});
} }
QueryInterface.prototype.bulkInsert = function(tableName, records, options, Model) { QueryInterface.prototype.bulkInsert = function(tableName, records, options, Model) {
...@@ -533,7 +535,7 @@ module.exports = (function() { ...@@ -533,7 +535,7 @@ module.exports = (function() {
} }
} }
var emitter = new Promise(); var emitter;
var tick = 0 var tick = 0
var iterate = function(err, i) { var iterate = function(err, i) {
if (!!err || i >= cascades.length) { if (!!err || i >= cascades.length) {
...@@ -574,33 +576,22 @@ module.exports = (function() { ...@@ -574,33 +576,22 @@ module.exports = (function() {
var run = function(err) { var run = function(err) {
if (!!err) { if (!!err) {
return emitter.emit('error', err) return emitter.reject(err);
} }
var chainer = new Utils.QueryChainer() if (emitter) {
self.queryAndEmit([sql, dao, options], 'delete').proxy(emitter);
chainer.add(self, 'queryAndEmit', [[sql, dao, options], 'delete']) } else {
return self.queryAndEmit([sql, dao, options], 'delete');
chainer.runSerially() }
.success(function(results){
emitter.query = { sql: sql }
emitter.emit('sql', sql)
emitter.emit('success', results[1])
})
.error(function(err) {
emitter.query = { sql: sql }
emitter.emit('sql', sql)
emitter.emit('error', err)
})
} }
if (cascades.length > 0) { if (cascades.length > 0) {
emitter = new Promise();
iterate(null, tick) iterate(null, tick)
} else { } else {
run() return run()
} }
return emitter;
} }
QueryInterface.prototype.bulkDelete = function(tableName, identifier, options) { QueryInterface.prototype.bulkDelete = function(tableName, identifier, options) {
...@@ -867,6 +858,7 @@ module.exports = (function() { ...@@ -867,6 +858,7 @@ module.exports = (function() {
}, options || {}) }, options || {})
var execQuery = function(emitter) { var execQuery = function(emitter) {
var query;
if (Array.isArray(sqlOrQueryParams)) { if (Array.isArray(sqlOrQueryParams)) {
if (sqlOrQueryParams.length === 1) { if (sqlOrQueryParams.length === 1) {
sqlOrQueryParams.push(null) sqlOrQueryParams.push(null)
...@@ -876,11 +868,22 @@ module.exports = (function() { ...@@ -876,11 +868,22 @@ module.exports = (function() {
sqlOrQueryParams.push(typeof options === "object" ? options : {}) sqlOrQueryParams.push(typeof options === "object" ? options : {})
} }
emitter.query = this.sequelize.query.apply(this.sequelize, sqlOrQueryParams) query = this.sequelize.query.apply(this.sequelize, sqlOrQueryParams)
} else { } else {
emitter.query = this.sequelize.query(sqlOrQueryParams, null, options) query = this.sequelize.query(sqlOrQueryParams, null, options)
} }
if (!emitter) {
return query.then(function (result) {
if (options.success) options.success(result);
return result;
}/*, function (err) {
if (options.error) options.error(err);
}*/);
}
emitter.query = query;
emitter emitter
.query .query
.success(function(obj) { .success(function(obj) {
...@@ -905,7 +908,7 @@ module.exports = (function() { ...@@ -905,7 +908,7 @@ module.exports = (function() {
if (!!emitter) { if (!!emitter) {
execQuery(emitter) execQuery(emitter)
} else { } else {
return execQuery(new Promise()); return execQuery();
} }
} }
......
...@@ -509,7 +509,7 @@ module.exports = (function() { ...@@ -509,7 +509,7 @@ module.exports = (function() {
type: (sql.toLowerCase().indexOf('select') === 0) ? QueryTypes.SELECT : false type: (sql.toLowerCase().indexOf('select') === 0) ? QueryTypes.SELECT : false
}) })
return this.transactionManager.query(sql, callee, options) return this.transactionManager.query(sql, callee, options);
} }
/** /**
......
...@@ -606,5 +606,6 @@ Utils.col.prototype.toString = function (queryGenerator, parentModel) { ...@@ -606,5 +606,6 @@ Utils.col.prototype.toString = function (queryGenerator, parentModel) {
} }
Utils.CustomEventEmitter = require(__dirname + "/emitters/custom-event-emitter") Utils.CustomEventEmitter = require(__dirname + "/emitters/custom-event-emitter")
Utils.Promise = require(__dirname + "/promise")
Utils.QueryChainer = require(__dirname + "/query-chainer") Utils.QueryChainer = require(__dirname + "/query-chainer")
Utils.Lingo = require("lingo") Utils.Lingo = require("lingo")
...@@ -4,6 +4,7 @@ var chai = require('chai') ...@@ -4,6 +4,7 @@ var chai = require('chai')
, Support = require(__dirname + '/../support') , Support = require(__dirname + '/../support')
, DataTypes = require(__dirname + "/../../lib/data-types") , DataTypes = require(__dirname + "/../../lib/data-types")
, Sequelize = require('../../index') , Sequelize = require('../../index')
, assert = require('assert')
chai.config.includeStack = true chai.config.includeStack = true
...@@ -359,14 +360,17 @@ describe(Support.getTestDialectTeaser("BelongsTo"), function() { ...@@ -359,14 +360,17 @@ describe(Support.getTestDialectTeaser("BelongsTo"), function() {
User.create({ username: 'foo' }).success(function(user) { User.create({ username: 'foo' }).success(function(user) {
Task.create({ title: 'task' }).success(function(task) { Task.create({ title: 'task' }).success(function(task) {
task.setUser(user).success(function() { task.setUser(user).success(function() {
user.destroy().error(function() { // Should fail due to FK restriction
// Should fail due to FK restriction user.destroy().then(function () {
assert(false);
}, function(err) {
expect(err).to.be.ok;
Task.findAll().success(function(tasks) { Task.findAll().success(function(tasks) {
expect(tasks).to.have.length(1) expect(tasks).to.have.length(1)
done() done()
}) })
}) })
}) });
}) })
}) })
}) })
......
...@@ -6,9 +6,10 @@ var fs = require('fs') ...@@ -6,9 +6,10 @@ var fs = require('fs')
, Config = require(__dirname + "/config/config") , Config = require(__dirname + "/config/config")
// Make sure errors get thrown when testing // Make sure errors get thrown when testing
Sequelize.Promise.onPossiblyUnhandledRejection(function(e, promise){ Sequelize.Promise.onPossiblyUnhandledRejection(function(e, promise) {
throw e; throw e;
}); });
Sequelize.Promise.longStackTraces();
var Support = { var Support = {
Sequelize: Sequelize, Sequelize: Sequelize,
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!