不要怂,就是干,撸起袖子干!

Commit c76d53f3 by Mick Hansen

Handle all cases

1 parent 532754f6
Showing with 71 additions and 35 deletions
...@@ -152,21 +152,22 @@ module.exports = (function() { ...@@ -152,21 +152,22 @@ module.exports = (function() {
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype); Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.isConnected && !this.pool) {
this.connect()
}
if (this.useQueue) { if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting
var queueItem = { var queueItem = {
query: new Query(this.client, this.sequelize, callee, options || {}), query: new Query(null, this.sequelize, callee, options || {}),
sql: sql sql: sql
}; };
queueItem.query.options.uuid = this.config.uuid queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options) enqueue.call(this, queueItem, options);
return queueItem.query; return queueItem.query;
} }
if (!this.isConnected && !this.pool) {
this.connect()
}
var self = this, query = new Query(this.client, this.sequelize, callee, options || {}); var self = this, query = new Query(this.client, this.sequelize, callee, options || {});
this.pendingQueries++; this.pendingQueries++;
...@@ -178,30 +179,23 @@ module.exports = (function() { ...@@ -178,30 +179,23 @@ module.exports = (function() {
} else { } else {
if (self.pendingQueries === 0) { if (self.pendingQueries === 0) {
setTimeout(function() { setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self) if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100); }, 100);
} }
} }
}); });
if (!this.pool) { this.getConnection(options, function (err, client) {
query.run(sql);
} else {
this.pool.acquire(function(err, client) {
if (err) {
return query.emit('error', err)
}
query.client = client query.client = client
query.run(sql) query.run(sql)
return; });
}, undefined, options.type)
}
return query; return query;
}; };
ConnectorManager.prototype.connect = function() { ConnectorManager.prototype.connect = function(callback) {
var self = this; var self = this;
// in case database is slow to connect, prevent orphaning the client // in case database is slow to connect, prevent orphaning the client
if (this.isConnecting || this.pool) { if (this.isConnecting || this.pool) {
...@@ -209,11 +203,57 @@ module.exports = (function() { ...@@ -209,11 +203,57 @@ module.exports = (function() {
} }
connect.call(self, function(err, client) { connect.call(self, function(err, client) {
self.client = client; self.client = client;
callback(err, client);
return; return;
}); });
return; return;
}; };
ConnectorManager.prototype.getConnection = function(options, callback) {
var self = this;
if (typeof options === "function") {
callback = options;
options = {};
}
return new Utils.CustomEventEmitter(function (emitter) {
if (!self.pool) {
if (self.client) {
return emitter.emit('success', self.client);
} else {
// Cache for concurrency
if (self._getConnection) {
self._getConnection.proxy(emitter);
return;
}
// Set cache and acquire connection
self._getConnection = emitter;
connect.call(self, function(err, client) {
if (err) {
return emitter.emit('error', err);
}
// Unset caching, should now be caught by the self.client check
self._getConnection = null;
self.client = client;
emitter.emit('success', client);
});
}
}
if (self.pool) {
// Acquire from pool
self.pool.acquire(function(err, client) {
if (err) {
return emitter.emit('error', err);
}
emitter.emit('success', client);
}, options.priority, options.type);
}
}).run().done(callback);
};
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
if (this.client) { if (this.client) {
disconnect.call(this, this.client) disconnect.call(this, this.client)
...@@ -326,23 +366,7 @@ module.exports = (function() { ...@@ -326,23 +366,7 @@ module.exports = (function() {
options = options || {} options = options || {}
if (this.activeQueue.length < this.maxConcurrentQueries) { if (this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem) this.activeQueue.push(queueItem)
if (this.pool) {
var self = this
this.pool.acquire(function(err, client) {
if (err) {
queueItem.query.emit('error', err)
return
}
//we set the client here, asynchronously, when getting a pooled connection
//allowing the ConnectorManager.query method to remain synchronous
queueItem.query.client = client
queueItem.client = client
execQueueItem.call(self, queueItem)
return
}, undefined, options.type)
} else {
execQueueItem.call(this, queueItem) execQueueItem.call(this, queueItem)
}
} else { } else {
this.queue.push(queueItem) this.queue.push(queueItem)
} }
...@@ -375,11 +399,23 @@ module.exports = (function() { ...@@ -375,11 +399,23 @@ module.exports = (function() {
var execQueueItem = function(queueItem) { var execQueueItem = function(queueItem) {
var self = this var self = this
self.getConnection({
priority: queueItem.query.options.priority,
type: queueItem.query.options.type
}, function (err, connection) {
if (err) {
queueItem.query.emit('error', err)
return
}
queueItem.query.client = connection
queueItem.client = connection
queueItem.query queueItem.query
.success(function(){ afterQuery.call(self, queueItem) }) .success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) }) .error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client) queueItem.query.run(queueItem.sql, queueItem.client)
})
} }
ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() { ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!