不要怂,就是干,撸起袖子干!

Commit 8cbd6a67 by Jan Aagaard Meier

Refactored .query for all dialects to return promises

1 parent 0c114c36
...@@ -159,54 +159,88 @@ module.exports = (function() { ...@@ -159,54 +159,88 @@ module.exports = (function() {
Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype) Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype)
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.isConnected && !this.pool) { var self = this
this.connect()
} options = options || {}
if (this.useQueue) { if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting
var queueItem = { var queueItem = {
query: new Query(this.client, this.sequelize, callee, options || {}), query: new Query(null, this.sequelize, callee, options),
client: this.client,
sql: sql sql: sql
} };
queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options) enqueue.call(this, queueItem, options)
return queueItem.query return queueItem.query.promise.finally(function () {
afterQuery.call(self, queueItem)
})
} }
var self = this var query = new Query(null, this.sequelize, callee, options);
, query = new Query(this.client, this.sequelize, callee, options || {}) this.pendingQueries++;
this.pendingQueries++
query.options.uuid = this.config.uuid
query.done(function() {
self.pendingQueries-- return this.getConnection(options).then(function (client) {
if (self.pool) { query.client = client
self.pool.release(query.client) return query.run(sql).finally(function () {
} else { self.pendingQueries--;
if (self.pendingQueries === 0) { if (self.pool) {
setTimeout(function() { self.pool.release(query.client);
self.pendingQueries === 0 && self.disconnect.call(self) } else {
}, 100) if (self.pendingQueries === 0) {
setTimeout(function() {
if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100);
}
} }
} });
}) })
};
ConnectorManager.prototype.getConnection = function(options) {
var self = this;
if (!this.pool) { options = options || {}
query.run(sql)
} else {
this.pool.acquire(function(err, client) {
if (err) {
return query.emit('error', err)
}
query.client = client return new Utils.Promise(function (resolve, reject) {
query.run(sql) if (!self.pool) {
return // Regular client caching
}, undefined, options.type) if (self.client) {
} return resolve(self.client);
} else {
// Cache for concurrent queries
if (self._getConnection) {
return resolve(self._getConnection)
}
return query // Set cache and acquire connection
} self._getConnection = this;
connect.call(self, function(err, client) {
if (err) {
return reject(err);
}
// Unset caching, should now be caught by the self.client check
self._getConnection = null;
self.client = client;
resolve(client);
});
}
} else {
// Acquire from pool
self.pool.acquire(function(err, client) {
if (err) {
return reject(err);
}
resolve(client);
}, options.priority, options.type);
}
})
};
ConnectorManager.prototype.connect = function() { ConnectorManager.prototype.connect = function() {
var self = this var self = this
...@@ -228,10 +262,12 @@ module.exports = (function() { ...@@ -228,10 +262,12 @@ module.exports = (function() {
return return
} }
// private // private
var disconnect = function(client) { var disconnect = function(client) {
if (!client) {
return // TODO possible orphaning of clients?
}
var self = this var self = this
if (!this.useQueue) { if (!this.useQueue) {
this.client = null this.client = null
...@@ -257,7 +293,7 @@ module.exports = (function() { ...@@ -257,7 +293,7 @@ module.exports = (function() {
var connect = function(done, config) { var connect = function(done, config) {
config = config || this.config config = config || this.config
var self = this var self = this
, client , client
...@@ -324,7 +360,7 @@ module.exports = (function() { ...@@ -324,7 +360,7 @@ module.exports = (function() {
var self = this var self = this
this.pool.acquire(function(err, client) { this.pool.acquire(function(err, client) {
if (err) { if (err) {
queueItem.query.emit('error', err) queueItem.query.reject(err)
return return
} }
//we set the client here, asynchronously, when getting a pooled connection //we set the client here, asynchronously, when getting a pooled connection
...@@ -367,12 +403,6 @@ module.exports = (function() { ...@@ -367,12 +403,6 @@ module.exports = (function() {
var execQueueItem = function(queueItem) { var execQueueItem = function(queueItem) {
var self = this
queueItem.query
.success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client) queueItem.query.run(queueItem.sql, queueItem.client)
} }
......
...@@ -12,7 +12,12 @@ module.exports = (function() { ...@@ -12,7 +12,12 @@ module.exports = (function() {
raw: false raw: false
}, options || {}) }, options || {})
var self = this
this.checkLoggingOption() this.checkLoggingOption()
this.promise = new Utils.Promise(function (resolve, reject) {
self.resolve = resolve
self.reject = reject
})
} }
Utils.inherit(Query, AbstractQuery) Utils.inherit(Query, AbstractQuery)
...@@ -87,9 +92,9 @@ module.exports = (function() { ...@@ -87,9 +92,9 @@ module.exports = (function() {
}) })
.on('error', function(err) { .on('error', function(err) {
errorDetected = true errorDetected = true
self.emit('sql', self.sql) self.promise.emit('sql', self.sql)
err.sql = sql err.sql = sql
self.emit('error', err, self.callee) self.reject(err)
}) })
.on('end', function(info) { .on('end', function(info) {
if (alreadyEnded || errorDetected) { if (alreadyEnded || errorDetected) {
...@@ -97,19 +102,18 @@ module.exports = (function() { ...@@ -97,19 +102,18 @@ module.exports = (function() {
} }
alreadyEnded = true alreadyEnded = true
self.emit('sql', self.sql) self.promise.emit('sql', self.sql)
// we need to figure out whether to send the result set // we need to figure out whether to send the result set
// or info depending upon the type of query // or info depending upon the type of query
if (/^call/.test(self.sql.toLowerCase())) { if (/^call/.test(self.sql.toLowerCase())) {
self.emit('success', resultSet) self.resolve(resultSet)
} else if( /^show/.test(self.sql.toLowerCase()) || } else if( /^show/.test(self.sql.toLowerCase()) ||
/^select/.test(self.sql.toLowerCase()) || /^select/.test(self.sql.toLowerCase()) ||
/^describe/.test(self.sql.toLowerCase())) { /^describe/.test(self.sql.toLowerCase())) {
self.emit('success', self.formatResults(resultSet)) self.resolve(self.formatResults(resultSet))
} else { } else {
self.emit('success', self.formatResults(info)) self.resolve(self.formatResults(info))
} }
}) })
}) })
.on('error', function(err) { .on('error', function(err) {
...@@ -117,12 +121,12 @@ module.exports = (function() { ...@@ -117,12 +121,12 @@ module.exports = (function() {
return return
} }
errorDetected = true errorDetected = true
self.emit('sql', self.sql) self.promise.emit('sql', self.sql)
self.emit('error', err, self.callee) self.reject(err)
}) })
.setMaxListeners(100) .setMaxListeners(100)
return this return this.promise
} }
return Query return Query
......
...@@ -2,7 +2,7 @@ var mysql ...@@ -2,7 +2,7 @@ var mysql
, Pooling = require('generic-pool') , Pooling = require('generic-pool')
, Query = require("./query") , Query = require("./query")
, Utils = require("../../utils") , Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) } , without = function(arr, elem) { return arr.filter(function(e) { return e.query.uuid != elem.query.uuid }) }
module.exports = (function() { module.exports = (function() {
var ConnectorManager = function(sequelize, config) { var ConnectorManager = function(sequelize, config) {
...@@ -160,79 +160,75 @@ module.exports = (function() { ...@@ -160,79 +160,75 @@ module.exports = (function() {
Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype); Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype);
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this
options = options || {}
if (this.useQueue) { if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting // If queueing we'll let the execQueueItem method handle connecting
var queueItem = { var queueItem = {
query: new Query(null, this.sequelize, callee, options || {}), query: new Query(null, this.sequelize, callee, options),
sql: sql sql: sql
}; };
queueItem.query.options.uuid = this.config.uuid queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options); enqueue.call(this, queueItem, options)
return queueItem.query; return queueItem.query.promise.finally(function () {
afterQuery.call(self, queueItem)
})
} }
var self = this, query = new Query(null, this.sequelize, callee, options || {}); var query = new Query(null, this.sequelize, callee, options);
this.pendingQueries++; this.pendingQueries++;
query.options.uuid = this.config.uuid query.options.uuid = this.config.uuid
query.done(function() {
self.pendingQueries--;
if (self.pool) {
self.pool.release(query.client);
} else {
if (self.pendingQueries === 0) {
setTimeout(function() {
if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100);
}
}
});
this.getConnection(options, function (err, client) { return this.getConnection(options).then(function (client) {
if (err) {
return query.emit('error', err)
}
query.client = client query.client = client
query.run(sql) return query.run(sql).finally(function () {
}); self.pendingQueries--;
if (self.pool) {
return query; self.pool.release(query.client);
} else {
if (self.pendingQueries === 0) {
setTimeout(function() {
if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100);
}
}
});
})
}; };
ConnectorManager.prototype.getConnection = function(options, callback) { ConnectorManager.prototype.getConnection = function(options) {
var self = this; var self = this;
if (typeof options === "function") { options = options || {}
callback = options;
options = {};
}
return new Utils.CustomEventEmitter(function (emitter) { return new Utils.Promise(function (resolve, reject) {
if (!self.pool) { if (!self.pool) {
// Regular client caching // Regular client caching
if (self.client) { if (self.client) {
return emitter.emit('success', self.client); return resolve(self.client);
} else { } else {
// Cache for concurrent queries // Cache for concurrent queries
if (self._getConnection) { if (self._getConnection) {
self._getConnection.proxy(emitter); return resolve(self._getConnection)
return;
} }
// Set cache and acquire connection // Set cache and acquire connection
self._getConnection = emitter; self._getConnection = this;
connect.call(self, function(err, client) { connect.call(self, function(err, client) {
if (err) { if (err) {
return emitter.emit('error', err); return reject(err);
} }
// Unset caching, should now be caught by the self.client check // Unset caching, should now be caught by the self.client check
self._getConnection = null; self._getConnection = null;
self.client = client; self.client = client;
emitter.emit('success', client); resolve(client);
}); });
} }
} }
...@@ -240,12 +236,12 @@ module.exports = (function() { ...@@ -240,12 +236,12 @@ module.exports = (function() {
// Acquire from pool // Acquire from pool
self.pool.acquire(function(err, client) { self.pool.acquire(function(err, client) {
if (err) { if (err) {
return emitter.emit('error', err); return reject(err);
} }
emitter.emit('success', client); resolve(client);
}, options.priority, options.type); }, options.priority, options.type);
} }
}).run().done(callback); })
}; };
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
...@@ -255,13 +251,15 @@ module.exports = (function() { ...@@ -255,13 +251,15 @@ module.exports = (function() {
return return
}; };
// private // private
var disconnect = function(client) { var disconnect = function(client) {
var self = this; var self = this;
this.client = null; this.client = null;
if (!client) {
return
}
client.end(function() { client.end(function() {
if (!self.useQueue) { if (!self.useQueue) {
return client.destroy(); return client.destroy();
...@@ -269,7 +267,6 @@ module.exports = (function() { ...@@ -269,7 +267,6 @@ module.exports = (function() {
var intervalObj = null var intervalObj = null
var cleanup = function () { var cleanup = function () {
var retryCt = 0
// make sure to let client finish before calling destroy // make sure to let client finish before calling destroy
if (client._queue && (client._queue.length > 0)) { if (client._queue && (client._queue.length > 0)) {
return return
...@@ -287,7 +284,6 @@ module.exports = (function() { ...@@ -287,7 +284,6 @@ module.exports = (function() {
var connect = function(done, config) { var connect = function(done, config) {
config = config || this.config config = config || this.config
var emitter = new (require('events').EventEmitter)()
var connectionConfig = { var connectionConfig = {
host: config.host, host: config.host,
port: config.port, port: config.port,
...@@ -309,22 +305,22 @@ module.exports = (function() { ...@@ -309,22 +305,22 @@ module.exports = (function() {
switch(err.code) { switch(err.code) {
case 'ECONNREFUSED': case 'ECONNREFUSED':
case 'ER_ACCESS_D2ENIED_ERROR': case 'ER_ACCESS_D2ENIED_ERROR':
emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.') done('Failed to authenticate for MySQL. Please double check your settings.')
break break
case 'ENOTFOUND': case 'ENOTFOUND':
case 'EHOSTUNREACH': case 'EHOSTUNREACH':
case 'EINVAL': case 'EINVAL':
emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.') done('Failed to find MySQL server. Please double check your settings.')
break break
default: default:
emitter.emit('error', err); done(err);
break; break;
} }
return; return;
} }
emitter.emit('success', connection); done(null, connection);
}) })
connection.query("SET time_zone = '+0:00'"); connection.query("SET time_zone = '+0:00'");
...@@ -333,13 +329,6 @@ module.exports = (function() { ...@@ -333,13 +329,6 @@ module.exports = (function() {
if (config.pool !== null && config.pool.handleDisconnects) { if (config.pool !== null && config.pool.handleDisconnects) {
handleDisconnect(this.pool, connection) handleDisconnect(this.pool, connection)
} }
emitter.on('error', function (err) {
done(err);
});
emitter.on('success', function (connection) {
done(null, connection);
});
} }
var handleDisconnect = function(pool, client) { var handleDisconnect = function(pool, client) {
...@@ -388,26 +377,17 @@ module.exports = (function() { ...@@ -388,26 +377,17 @@ module.exports = (function() {
disconnectIfNoConnections.call(this) disconnectIfNoConnections.call(this)
} }
var execQueueItem = function(queueItem) { var execQueueItem = function(queueItem) {
var self = this this.getConnection({
self.getConnection({
priority: queueItem.query.options.priority, priority: queueItem.query.options.priority,
type: queueItem.query.options.type type: queueItem.query.options.type
}, function (err, connection) { }).then(function (connection) {
if (err) {
queueItem.query.emit('error', err)
return
}
queueItem.query.client = connection queueItem.query.client = connection
queueItem.client = connection queueItem.client = connection
queueItem.query
.success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client) queueItem.query.run(queueItem.sql)
}, function (err) {
queueItem.query.reject(err)
}) })
} }
...@@ -421,7 +401,7 @@ module.exports = (function() { ...@@ -421,7 +401,7 @@ module.exports = (function() {
}) })
ConnectorManager.prototype.__defineGetter__('isConnected', function() { ConnectorManager.prototype.__defineGetter__('isConnected', function() {
return this.client != null return this.client !== null
}) })
var disconnectIfNoConnections = function() { var disconnectIfNoConnections = function() {
......
var Utils = require("../../utils") var Utils = require("../../utils")
, AbstractQuery = require('../abstract/query') , AbstractQuery = require('../abstract/query')
, uuid = require('node-uuid')
module.exports = (function() { module.exports = (function() {
var Query = function(client, sequelize, callee, options) { var Query = function(client, sequelize, callee, options) {
this.client = client this.client = client
this.callee = callee this.callee = callee
this.sequelize = sequelize this.sequelize = sequelize
this.uuid = uuid.v4()
this.options = Utils._.extend({ this.options = Utils._.extend({
logging: console.log, logging: console.log,
plain: false, plain: false,
raw: false raw: false
}, options || {}) }, options || {})
var self = this
this.checkLoggingOption() this.checkLoggingOption()
this.promise = new Utils.Promise(function (resolve, reject) {
self.resolve = resolve
self.reject = reject
})
} }
Utils.inherit(Query, AbstractQuery) Utils.inherit(Query, AbstractQuery)
Query.prototype.run = function(sql) { Query.prototype.run = function(sql) {
var self = this
this.sql = sql this.sql = sql
if (this.options.logging !== false) { if (this.options.logging !== false) {
this.sequelize.log('Executing (' + this.options.uuid + '): ' + this.sql) this.sequelize.log('Executing (' + this.options.uuid + '): ' + this.sql)
} }
this.client.query(this.sql, function(err, results, fields) { self.client.query(self.sql, function(err, results, fields) {
this.emit('sql', this.sql, this.options.uuid) self.promise.emit('sql', self.sql, self.options.uuid)
if (err) { if (err) {
err.sql = sql err.sql = sql
this.emit('error', err)
self.reject(err)
} else { } else {
this.emit('success', this.formatResults(results)) self.resolve(self.formatResults(results))
} }
}.bind(this)).setMaxListeners(100) }).setMaxListeners(100)
return this
return this.promise
} }
return Query return Query
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!