不要怂,就是干,撸起袖子干!

Commit bb40f280 by Mick Hansen

Merge pull request #1723 from sequelize/queryInterfacePromises

Query interface promises
2 parents c33046b9 a8f23bd8
......@@ -6,6 +6,7 @@ Notice: All 1.7.x changes are present in 2.0.x aswell
- Sequelize now returns promises instead of its custom event emitter from most calls. This affects methods that return multiple values (like `findOrCreate` or `findOrInitialize`). If your current callbacks do not accept the 2nd success parameter you might be seeing an array as the first param. Either use `.spread()` for these methods or add another argument to your callback: `.success(instance)` -> `.success(instance, created)`.
- `.success()`/`.done()` and any other non promise methods are now deprecated (we will keep the codebase around for a few versions though). on('sql') persists for debugging purposes.
- Model association calls (belongsTo/hasOne/hasMany) are no longer chainable. (this is to support being able to pass association references to include rather than model/as combinations)
- `QueryInterface` no longer emits global events. This means you can no longer do things like `QueryInterface.on('showAllSchemas', function ... `
- `define()` stores models in `sequelize.models` Object e.g. `sequelize.models.MyModel`
# v2.0.0-dev11
......
......@@ -39,6 +39,20 @@ module.exports = (function() {
throwMethodUndefined('createTableQuery')
},
describeTableQuery: function (tableName, schema, schemaDelimiter) {
var table = this.quoteTable(
this.addSchema({
tableName: tableName,
options: {
schema: schema,
schemaDelimiter: schemaDelimiter
}
})
)
return 'DESCRIBE ' + table + ';'
},
/*
Returns a query for dropping a table.
*/
......
......@@ -159,54 +159,88 @@ module.exports = (function() {
Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype)
ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.isConnected && !this.pool) {
this.connect()
}
var self = this
options = options || {}
if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting
var queueItem = {
query: new Query(this.client, this.sequelize, callee, options || {}),
client: this.client,
query: new Query(null, this.sequelize, callee, options),
sql: sql
}
};
queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options)
return queueItem.query
return queueItem.query.promise.finally(function () {
afterQuery.call(self, queueItem)
})
}
var self = this
, query = new Query(this.client, this.sequelize, callee, options || {})
this.pendingQueries++
query.done(function() {
self.pendingQueries--
if (self.pool) {
self.pool.release(query.client)
} else {
if (self.pendingQueries === 0) {
setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self)
}, 100)
var query = new Query(null, this.sequelize, callee, options);
this.pendingQueries++;
query.options.uuid = this.config.uuid
return this.getConnection(options).then(function (client) {
query.client = client
return query.run(sql).finally(function () {
self.pendingQueries--;
if (self.pool) {
self.pool.release(query.client);
} else {
if (self.pendingQueries === 0) {
setTimeout(function() {
if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100);
}
}
}
});
})
};
ConnectorManager.prototype.getConnection = function(options) {
var self = this;
if (!this.pool) {
query.run(sql)
} else {
this.pool.acquire(function(err, client) {
if (err) {
return query.emit('error', err)
}
options = options || {}
query.client = client
query.run(sql)
return
}, undefined, options.type)
}
return new Utils.Promise(function (resolve, reject) {
if (!self.pool) {
// Regular client caching
if (self.client) {
return resolve(self.client);
} else {
// Cache for concurrent queries
if (self._getConnection) {
return resolve(self._getConnection)
}
return query
}
// Set cache and acquire connection
self._getConnection = this;
connect.call(self, function(err, client) {
if (err) {
return reject(err);
}
// Unset caching, should now be caught by the self.client check
self._getConnection = null;
self.client = client;
resolve(client);
});
}
} else {
// Acquire from pool
self.pool.acquire(function(err, client) {
if (err) {
return reject(err);
}
resolve(client);
}, options.priority, options.type);
}
})
};
ConnectorManager.prototype.connect = function() {
var self = this
......@@ -228,10 +262,12 @@ module.exports = (function() {
return
}
// private
var disconnect = function(client) {
if (!client) {
return // TODO possible orphaning of clients?
}
var self = this
if (!this.useQueue) {
this.client = null
......@@ -257,7 +293,7 @@ module.exports = (function() {
var connect = function(done, config) {
config = config || this.config
var self = this
, client
......@@ -324,7 +360,7 @@ module.exports = (function() {
var self = this
this.pool.acquire(function(err, client) {
if (err) {
queueItem.query.emit('error', err)
queueItem.query.reject(err)
return
}
//we set the client here, asynchronously, when getting a pooled connection
......@@ -367,12 +403,6 @@ module.exports = (function() {
var execQueueItem = function(queueItem) {
var self = this
queueItem.query
.success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client)
}
......
......@@ -12,7 +12,12 @@ module.exports = (function() {
raw: false
}, options || {})
var self = this
this.checkLoggingOption()
this.promise = new Utils.Promise(function (resolve, reject) {
self.resolve = resolve
self.reject = reject
})
}
Utils.inherit(Query, AbstractQuery)
......@@ -87,9 +92,9 @@ module.exports = (function() {
})
.on('error', function(err) {
errorDetected = true
self.emit('sql', self.sql)
self.promise.emit('sql', self.sql)
err.sql = sql
self.emit('error', err, self.callee)
self.reject(err)
})
.on('end', function(info) {
if (alreadyEnded || errorDetected) {
......@@ -97,19 +102,18 @@ module.exports = (function() {
}
alreadyEnded = true
self.emit('sql', self.sql)
self.promise.emit('sql', self.sql)
// we need to figure out whether to send the result set
// or info depending upon the type of query
if (/^call/.test(self.sql.toLowerCase())) {
self.emit('success', resultSet)
self.resolve(resultSet)
} else if( /^show/.test(self.sql.toLowerCase()) ||
/^select/.test(self.sql.toLowerCase()) ||
/^describe/.test(self.sql.toLowerCase())) {
self.emit('success', self.formatResults(resultSet))
self.resolve(self.formatResults(resultSet))
} else {
self.emit('success', self.formatResults(info))
self.resolve(self.formatResults(info))
}
})
})
.on('error', function(err) {
......@@ -117,12 +121,12 @@ module.exports = (function() {
return
}
errorDetected = true
self.emit('sql', self.sql)
self.emit('error', err, self.callee)
self.promise.emit('sql', self.sql)
self.reject(err)
})
.setMaxListeners(100)
return this
return this.promise
}
return Query
......
......@@ -2,7 +2,7 @@ var mysql
, Pooling = require('generic-pool')
, Query = require("./query")
, Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
, without = function(arr, elem) { return arr.filter(function(e) { return e.query.uuid != elem.query.uuid }) }
module.exports = (function() {
var ConnectorManager = function(sequelize, config) {
......@@ -160,79 +160,75 @@ module.exports = (function() {
Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype);
ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this
options = options || {}
if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting
var queueItem = {
query: new Query(null, this.sequelize, callee, options || {}),
query: new Query(null, this.sequelize, callee, options),
sql: sql
};
queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options);
return queueItem.query;
enqueue.call(this, queueItem, options)
return queueItem.query.promise.finally(function () {
afterQuery.call(self, queueItem)
})
}
var self = this, query = new Query(null, this.sequelize, callee, options || {});
var query = new Query(null, this.sequelize, callee, options);
this.pendingQueries++;
query.options.uuid = this.config.uuid
query.done(function() {
self.pendingQueries--;
if (self.pool) {
self.pool.release(query.client);
} else {
if (self.pendingQueries === 0) {
setTimeout(function() {
if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100);
}
}
});
this.getConnection(options, function (err, client) {
if (err) {
return query.emit('error', err)
}
return this.getConnection(options).then(function (client) {
query.client = client
query.run(sql)
});
return query;
return query.run(sql).finally(function () {
self.pendingQueries--;
if (self.pool) {
self.pool.release(query.client);
} else {
if (self.pendingQueries === 0) {
setTimeout(function() {
if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100);
}
}
});
})
};
ConnectorManager.prototype.getConnection = function(options, callback) {
ConnectorManager.prototype.getConnection = function(options) {
var self = this;
if (typeof options === "function") {
callback = options;
options = {};
}
options = options || {}
return new Utils.CustomEventEmitter(function (emitter) {
return new Utils.Promise(function (resolve, reject) {
if (!self.pool) {
// Regular client caching
if (self.client) {
return emitter.emit('success', self.client);
return resolve(self.client);
} else {
// Cache for concurrent queries
if (self._getConnection) {
self._getConnection.proxy(emitter);
return;
return resolve(self._getConnection)
}
// Set cache and acquire connection
self._getConnection = emitter;
self._getConnection = this;
connect.call(self, function(err, client) {
if (err) {
return emitter.emit('error', err);
return reject(err);
}
// Unset caching, should now be caught by the self.client check
self._getConnection = null;
self.client = client;
emitter.emit('success', client);
resolve(client);
});
}
}
......@@ -240,12 +236,12 @@ module.exports = (function() {
// Acquire from pool
self.pool.acquire(function(err, client) {
if (err) {
return emitter.emit('error', err);
return reject(err);
}
emitter.emit('success', client);
resolve(client);
}, options.priority, options.type);
}
}).run().done(callback);
})
};
ConnectorManager.prototype.disconnect = function() {
......@@ -255,13 +251,15 @@ module.exports = (function() {
return
};
// private
var disconnect = function(client) {
var self = this;
this.client = null;
if (!client) {
return // TODO possible orphaning of clients?
}
client.end(function() {
if (!self.useQueue) {
return client.destroy();
......@@ -269,7 +267,6 @@ module.exports = (function() {
var intervalObj = null
var cleanup = function () {
var retryCt = 0
// make sure to let client finish before calling destroy
if (client._queue && (client._queue.length > 0)) {
return
......@@ -287,7 +284,6 @@ module.exports = (function() {
var connect = function(done, config) {
config = config || this.config
var emitter = new (require('events').EventEmitter)()
var connectionConfig = {
host: config.host,
port: config.port,
......@@ -309,22 +305,22 @@ module.exports = (function() {
switch(err.code) {
case 'ECONNREFUSED':
case 'ER_ACCESS_D2ENIED_ERROR':
emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.')
done('Failed to authenticate for MySQL. Please double check your settings.')
break
case 'ENOTFOUND':
case 'EHOSTUNREACH':
case 'EINVAL':
emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.')
done('Failed to find MySQL server. Please double check your settings.')
break
default:
emitter.emit('error', err);
done(err);
break;
}
return;
}
emitter.emit('success', connection);
done(null, connection);
})
connection.query("SET time_zone = '+0:00'");
......@@ -333,13 +329,6 @@ module.exports = (function() {
if (config.pool !== null && config.pool.handleDisconnects) {
handleDisconnect(this.pool, connection)
}
emitter.on('error', function (err) {
done(err);
});
emitter.on('success', function (connection) {
done(null, connection);
});
}
var handleDisconnect = function(pool, client) {
......@@ -388,26 +377,17 @@ module.exports = (function() {
disconnectIfNoConnections.call(this)
}
var execQueueItem = function(queueItem) {
var self = this
self.getConnection({
this.getConnection({
priority: queueItem.query.options.priority,
type: queueItem.query.options.type
}, function (err, connection) {
if (err) {
queueItem.query.emit('error', err)
return
}
}).then(function (connection) {
queueItem.query.client = connection
queueItem.client = connection
queueItem.query
.success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client)
queueItem.query.run(queueItem.sql)
}, function (err) {
queueItem.query.reject(err)
})
}
......@@ -421,7 +401,7 @@ module.exports = (function() {
})
ConnectorManager.prototype.__defineGetter__('isConnected', function() {
return this.client != null
return this.client !== null
})
var disconnectIfNoConnections = function() {
......
var Utils = require("../../utils")
, AbstractQuery = require('../abstract/query')
, uuid = require('node-uuid')
module.exports = (function() {
var Query = function(client, sequelize, callee, options) {
this.client = client
this.callee = callee
this.sequelize = sequelize
this.uuid = uuid.v4()
this.options = Utils._.extend({
logging: console.log,
plain: false,
raw: false
}, options || {})
var self = this
this.checkLoggingOption()
this.promise = new Utils.Promise(function (resolve, reject) {
self.resolve = resolve
self.reject = reject
})
}
Utils.inherit(Query, AbstractQuery)
Query.prototype.run = function(sql) {
var self = this
this.sql = sql
if (this.options.logging !== false) {
this.sequelize.log('Executing (' + this.options.uuid + '): ' + this.sql)
}
this.client.query(this.sql, function(err, results, fields) {
this.emit('sql', this.sql, this.options.uuid)
self.client.query(self.sql, function(err, results, fields) {
self.promise.emit('sql', self.sql, self.options.uuid)
if (err) {
err.sql = sql
this.emit('error', err)
self.reject(err)
} else {
this.emit('success', this.formatResults(results))
self.resolve(self.formatResults(results))
}
}.bind(this)).setMaxListeners(100)
return this
}).setMaxListeners(100)
return this.promise
}
return Query
......
......@@ -22,15 +22,19 @@ var QueryInterface = module.exports = {
@since 1.6.0
*/
removeColumn: function(tableName, attributeName, emitter, queryAndEmit) {
removeColumn: function(tableName, attributeName) {
var self = this
return this.describeTable(tableName).then(function(fields) {
delete fields[attributeName]
var sql = this.QueryGenerator.removeColumnQuery(tableName, fields)
var sql = self.QueryGenerator.removeColumnQuery(tableName, fields)
, subQueries = sql.split(';').filter(function(q) { return q !== '' })
return QueryInterface.execMultiQuery.call(this, subQueries, 'removeColumn', null, queryAndEmit)
}.bind(this))
subQueries.unshift(null)
return Utils.Promise.reduce(subQueries, function (total, subQuery) {
return self.sequelize.query(subQuery + ';', null, { raw: true})
})
})
},
/**
......@@ -48,17 +52,21 @@ var QueryInterface = module.exports = {
@since 1.6.0
*/
changeColumn: function(tableName, attributes, emitter, queryAndEmit) {
changeColumn: function(tableName, attributes) {
var attributeName = Utils._.keys(attributes)[0]
, self = this
return this.describeTable(tableName).then(function(fields) {
fields[attributeName] = attributes[attributeName]
var sql = this.QueryGenerator.removeColumnQuery(tableName, fields)
var sql = self.QueryGenerator.removeColumnQuery(tableName, fields)
, subQueries = sql.split(';').filter(function(q) { return q !== '' })
return QueryInterface.execMultiQuery.call(this, subQueries, 'changeColumn', emitter, queryAndEmit)
}.bind(this))
subQueries.unshift(null)
return Utils.Promise.reduce(subQueries, function (total, subQuery) {
return self.sequelize.query(subQuery + ';', null, { raw: true})
})
})
},
/**
......@@ -77,93 +85,19 @@ var QueryInterface = module.exports = {
@since 1.6.0
*/
renameColumn: function(tableName, attrNameBefore, attrNameAfter, emitter, queryAndEmit) {
renameColumn: function(tableName, attrNameBefore, attrNameAfter) {
var self = this
return this.describeTable(tableName).then(function(fields) {
fields[attrNameAfter] = Utils._.clone(fields[attrNameBefore])
delete fields[attrNameBefore]
var sql = this.QueryGenerator.renameColumnQuery(tableName, attrNameBefore, attrNameAfter, fields)
var sql = self.QueryGenerator.renameColumnQuery(tableName, attrNameBefore, attrNameAfter, fields)
, subQueries = sql.split(';').filter(function(q) { return q !== '' })
return QueryInterface.execMultiQuery.call(this, subQueries, 'renameColumn', emitter, queryAndEmit)
}.bind(this))
subQueries.unshift(null)
return Utils.Promise.reduce(subQueries, function (total, subQuery) {
return self.sequelize.query(subQuery + ';', null, { raw: true})
})
})
},
execMultiQuery: function(queries, methodName, emitter, queryAndEmit) {
var chainer = new Utils.QueryChainer()
queries.splice(0, queries.length - 1).forEach(function(query) {
chainer.add(this.sequelize, 'query', [query + ";", null, { raw: true }])
}.bind(this))
return chainer
.runSerially()
.then(function() {
return queryAndEmit.call(this, queries.splice(queries.length - 1)[0], methodName, {}, emitter)
}.bind(this))
},
dropAllTables: function(options) {
var self = this
if (!options) {
options = {}
}
var skip = options.skip || [];
return new Utils.CustomEventEmitter(function(dropAllTablesEmitter) {
var events = []
, chainer = new Utils.QueryChainer()
, onError = function(err) {
self.emit('dropAllTables', err)
dropAllTablesEmitter.emit('error', err)
}
self
.showAllTables()
.error(onError)
.success(function(tableNames) {
self
.sequelize
.query('PRAGMA foreign_keys;')
.proxy(dropAllTablesEmitter, { events: ['sql'] })
.error(onError)
.success(function(result) {
var foreignKeysAreEnabled = result.foreign_keys === 1
if (foreignKeysAreEnabled) {
var queries = []
queries.push('PRAGMA foreign_keys = OFF')
tableNames.forEach(function(tableName) {
// if tableName is not in the Array of tables names then dont drop it
if (skip.indexOf(tableName) === -1) {
queries.push(self.QueryGenerator.dropTableQuery(tableName).replace(';', ''))
}
})
queries.push('PRAGMA foreign_keys = ON')
QueryInterface.execMultiQuery.call(self, queries, 'dropAllTables', dropAllTablesEmitter, self.queryAndEmit)
} else {
// add the table removal query to the chainer
tableNames.forEach(function(tableName) {
chainer.add(self, 'dropTable', [ tableName, { cascade: true } ])
})
chainer
.runSerially()
.proxy(dropAllTablesEmitter, { events: ['sql'] })
.error(onError)
.success(function() {
self.emit('dropAllTables', null)
dropAllTablesEmitter.emit('success', null)
})
}
})
})
}).run()
}
}
......@@ -112,7 +112,7 @@ Hooks.runHooks = function() {
})
if (fn) {
promise.spread(function () {
promise = promise.spread(function () {
fn.apply(self, [null].concat(Array.prototype.slice.apply(arguments)));
}, fn);
}
......
......@@ -648,17 +648,15 @@ module.exports = (function() {
// This semi awkward syntax where we can't return the chain directly but have to return the last .then() call is to allow sql proxying
return self.Model.runHooks(self.Model.options.hooks.beforeDestroy, self).then(function () {
var query
, identifier
var identifier
if (self.Model._timestampAttributes.deletedAt && options.force === false) {
self.dataValues[self.Model._timestampAttributes.deletedAt] = new Date()
query = self.save(options)
return self.save(options)
} else {
identifier = self.__options.hasPrimaryKeys ? self.primaryKeyValues : { id: self.id };
query = self.QueryInterface.delete(self, self.QueryInterface.QueryGenerator.addSchema(self.Model), identifier, options)
return self.QueryInterface.delete(self, self.QueryInterface.QueryGenerator.addSchema(self.Model), identifier, options)
}
return query;
}).then(function (results) {
return self.Model.runHooks(self.Model.options.hooks.afterDestroy, self).return(results);
});
......
......@@ -170,7 +170,7 @@ module.exports = (function() {
type: QueryTypes.SELECT
}, options || {})
return self.QueryInterface.queryAndEmit([result.toSql(), self, options], 'snafu')
return self.sequelize.query(result.toSql(), self, options)
}
return result
......@@ -393,7 +393,7 @@ module.exports = (function() {
if (options.force) {
return self.drop(options).then(function () {
return doQuery().return(self);
return doQuery().return(self)
});
} else {
return doQuery().return(this)
......@@ -1143,7 +1143,7 @@ module.exports = (function() {
}
if(this.sequelize.options.dialect === 'postgres' && options.ignoreDuplicates) {
return this.sequelize.Promise.reject(new Error('Postgres does not support the \'ignoreDuplicates\' option.'))
return Utils.Promise.reject(new Error('Postgres does not support the \'ignoreDuplicates\' option.'))
}
var self = this
......
......@@ -127,10 +127,6 @@ module.exports = (function() {
if (options.skipOnError && (self.fails.length > 0)) {
onError('Skipped due to earlier error!')
} else {
if (typeof serial.options === "object" && Object.keys(serial.options).length > 0 && serial.method === "queryAndEmit") {
serial.params.push(serial.options)
}
var emitter = serial.klass[serial.method].apply(serial.klass, serial.params)
emitter.success(function(result) {
......
......@@ -52,9 +52,6 @@ var Utils = module.exports = {
return _
})(),
addEventEmitter: function(_class) {
util.inherits(_class, require('events').EventEmitter)
},
format: function(arr, dialect) {
var timeZone = null;
// Make a clone of the array beacuse format modifies the passed args
......
......@@ -361,9 +361,7 @@ describe(Support.getTestDialectTeaser("BelongsTo"), function() {
Task.create({ title: 'task' }).success(function(task) {
task.setUser(user).success(function() {
// Should fail due to FK restriction
user.destroy().then(function () {
assert(false);
}, function(err) {
user.destroy().catch(function(err) {
expect(err).to.be.ok;
Task.findAll().success(function(tasks) {
expect(tasks).to.have.length(1)
......
......@@ -98,10 +98,10 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () {
if (dialect === 'mariadb') {
expect(err.message).to.match(/Access denied for user/)
} else if (dialect === 'postgres') {
// When the test is run with only it produces:
// Error: Error: Failed to authenticate for PostgresSQL. Please double check your settings.
// When its run with all the other tests it produces:
// Error: invalid port number: "99999"
// When the test is run with only it produces:
// Error: Error: Failed to authenticate for PostgresSQL. Please double check your settings.
// When its run with all the other tests it produces:
// Error: invalid port number: "99999"
expect(err.message).to.match(/invalid port number/)
} else {
expect(err.message).to.match(/Failed to authenticate/)
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!