不要怂,就是干,撸起袖子干!

Commit 811177eb by Daniel Durante

Reverted back to my original postgres pooling mechanism.

1 parent 34b69bfe
...@@ -6,11 +6,12 @@ module.exports = (function() { ...@@ -6,11 +6,12 @@ module.exports = (function() {
var pgModule = config.dialectModulePath || 'pg' var pgModule = config.dialectModulePath || 'pg'
this.sequelize = sequelize this.sequelize = sequelize
this.client = null this.client = null
this.config = config || {} this.config = config || {}
this.config.port = this.config.port || 5432 this.config.port = this.config.port || 5432
this.pooling = (!!this.config.poolCfg && (this.config.poolCfg.maxConnections > 0)) this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0))
this.pg = this.config.native ? require(pgModule).native : require(pgModule) this.pg = this.config.native ? require(pgModule).native : require(pgModule)
this.poolIdentifier = null
// Better support for BigInts // Better support for BigInts
// https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935 // https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
...@@ -25,49 +26,55 @@ module.exports = (function() { ...@@ -25,49 +26,55 @@ module.exports = (function() {
this.disconnectTimeoutId = null this.disconnectTimeoutId = null
this.pendingQueries = 0 this.pendingQueries = 0
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50) this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
process.on('exit', function() {
this.disconnect()
}.bind(this))
} }
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype) Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
var isConnecting = false ConnectorManager.prototype.endQuery = function() {
var isConnected = false var self = this
self.pendingQueries--
if (!self.pooling && self.pendingQueries === 0) {
setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self)
}, 100)
}
}
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this var self = this
self.pendingQueries += 1 self.pendingQueries++
return new Utils.CustomEventEmitter(function(emitter) { return new Utils.CustomEventEmitter(function(emitter) {
self.connect(function(err, client) { self.connect()
if (!!err) { .on('error', function(err) {
return emitter.emit('error', err) emitter.emit('error', err)
} })
.on('success', function(done) {
var query = new Query(client, self.sequelize, callee, options || {}) var query = new Query(self.client, self.sequelize, callee, options || {})
done = done || null
return query.run(sql) return query.run(sql, done)
.success(function() { self.endQuery.call(self) }) .success(function(results) { self.endQuery.call(self) })
.error(function() { self.endQuery.call(self) }) .error(function(err) { self.endQuery.call(self) })
.proxy(emitter) .proxy(emitter)
}) })
}).run() }).run()
} }
ConnectorManager.prototype.endQuery = function() {
var self = this
self.pendingQueries--
if (self.pendingQueries === 0) {
setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self)
}, 100)
}
}
ConnectorManager.prototype.connect = function(callback) { ConnectorManager.prototype.connect = function(callback) {
var self = this var self = this
var emitter = new (require('events').EventEmitter)()
// in case database is slow to connect, prevent orphaning the client // in case database is slow to connect, prevent orphaning the client
if (this.isConnecting) { if (this.isConnecting) {
return callback(null) emitter.emit('success')
return emitter
} }
this.isConnecting = true this.isConnecting = true
...@@ -75,55 +82,66 @@ module.exports = (function() { ...@@ -75,55 +82,66 @@ module.exports = (function() {
var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config) var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
var connectCallback = function(err, client) { var connectCallback = function(err, client, done) {
self.isConnecting = false self.isConnecting = false
if (!!err) { if (!!err) {
switch(err.code) { // release the pool immediately, very important.
case 'ECONNREFUSED': done && done(err)
return callback(new Error('Failed to authenticate for PostgresSQL. Please double check your settings.'))
break if (err.code) {
case 'ENOTFOUND': switch(err.code) {
case 'EHOSTUNREACH': case 'ECONNREFUSED':
case 'EINVAL': emitter.emit('error', new Error("Failed to authenticate for PostgresSQL. Please double check your settings."))
return callback(new Error('Failed to find PostgresSQL server. Please double check your settings.')) break
break case 'ENOTFOUND':
default: case 'EHOSTUNREACH':
return callback(err) case 'EINVAL':
emitter.emit('error', new Error("Failed to find PostgresSQL server. Please double check your settings."))
break
default:
emitter.emit('error', err)
break
}
} }
} else if (client) { } else if (client) {
client.query("SET TIME ZONE 'UTC'").on('end', function() { client.query("SET TIME ZONE 'UTC'").on('end', function() {
self.isConnected = true self.isConnected = true
self.client = client self.client = client
callback(null, client) emitter.emit('success', done)
}); });
} else { } else {
self.client = null self.client = null
callback(null) emitter.emit('success', done)
} }
} }
if (this.pooling) { if (this.pooling) {
// acquire client from pool // acquire client from pool
this.pg.connect(uri, connectCallback) this.poolIdentifier = this.pg.pools.getOrCreate(this.sequelize.config)
this.poolIdentifier.connect(connectCallback)
} else { } else {
//create one-off client //create one-off client
if (this.client === null) { this.client = new this.pg.Client(uri)
this.client = new this.pg.Client(uri) this.client.connect(connectCallback)
this.client.connect(connectCallback)
} else {
connectCallback(null, this.client)
}
} }
return emitter
} }
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
var self = this if (this.poolIdentifier) {
if (this.client) this.client.end() this.poolIdentifier.destroyAllNow()
}
if (this.client) {
this.client.end()
}
this.client = null this.client = null
this.isConnecting = false this.isConnecting = false
this.isConnected = false this.isConnected = false
} }
return ConnectorManager return ConnectorManager
})() })()
\ No newline at end of file
...@@ -18,7 +18,7 @@ module.exports = (function() { ...@@ -18,7 +18,7 @@ module.exports = (function() {
} }
Utils.inherit(Query, AbstractQuery) Utils.inherit(Query, AbstractQuery)
Query.prototype.run = function(sql) { Query.prototype.run = function(sql, done) {
this.sql = sql this.sql = sql
var self = this var self = this
...@@ -40,6 +40,7 @@ module.exports = (function() { ...@@ -40,6 +40,7 @@ module.exports = (function() {
}.bind(this)) }.bind(this))
query.on('end', function() { query.on('end', function() {
done && done()
this.emit('sql', this.sql) this.emit('sql', this.sql)
if (receivedError) { if (receivedError) {
...@@ -56,11 +57,11 @@ module.exports = (function() { ...@@ -56,11 +57,11 @@ module.exports = (function() {
return 'id' return 'id'
} }
var onSuccess = function(rows) { var onSuccess = function(rows, sql) {
var results = [] var results = []
, self = this , self = this
, isTableNameQuery = (this.sql.indexOf('SELECT table_name FROM information_schema.tables') === 0) , isTableNameQuery = (sql.indexOf('SELECT table_name FROM information_schema.tables') === 0)
, isRelNameQuery = (this.sql.indexOf('SELECT relname FROM pg_class WHERE oid IN') === 0) , isRelNameQuery = (sql.indexOf('SELECT relname FROM pg_class WHERE oid IN') === 0)
if (isTableNameQuery || isRelNameQuery) { if (isTableNameQuery || isRelNameQuery) {
if (isRelNameQuery) { if (isRelNameQuery) {
...@@ -162,4 +163,4 @@ module.exports = (function() { ...@@ -162,4 +163,4 @@ module.exports = (function() {
} }
return Query return Query
})() })()
\ No newline at end of file
...@@ -91,25 +91,25 @@ module.exports = (function() { ...@@ -91,25 +91,25 @@ module.exports = (function() {
var chainer = new Utils.QueryChainer() var chainer = new Utils.QueryChainer()
self.showAllTables().success(function(tableNames) { self.showAllTables().success(function(tableNames) {
chainer.add(self, 'disableForeignKeyConstraints', [])
chainer.add(self, 'disableForeignKeyConstraints', []) tableNames.forEach(function(tableName) {
chainer.add(self, 'dropTable', [tableName, {cascade: true}])
})
tableNames.forEach(function(tableName) { chainer.add(self, 'enableForeignKeyConstraints', [])
chainer.add(self, 'dropTable', [tableName, {cascade: true}])
})
chainer.add(self, 'enableForeignKeyConstraints', []) chainer
.runSerially()
.success(function() {
self.emit('dropAllTables', null)
emitter.emit('success', null)
})
.error(function(err) {
self.emit('dropAllTables', err)
emitter.emit('error', err)
})
chainer
.runSerially()
.success(function() {
self.emit('dropAllTables', null)
emitter.emit('success', null)
})
.error(function(err) {
self.emit('dropAllTables', err)
emitter.emit('error', err)
})
}).error(function(err) { }).error(function(err) {
self.emit('dropAllTables', err) self.emit('dropAllTables', err)
emitter.emit('error', err) emitter.emit('error', err)
......
...@@ -116,12 +116,8 @@ var Support = { ...@@ -116,12 +116,8 @@ var Support = {
var sequelize = Support.createSequelizeInstance({ dialect: Support.getTestDialect() }) var sequelize = Support.createSequelizeInstance({ dialect: Support.getTestDialect() })
before(function(done) {
this.sequelize = sequelize
done()
})
beforeEach(function(done) { beforeEach(function(done) {
this.sequelize = sequelize
Support.clearDatabase(this.sequelize, function() { Support.clearDatabase(this.sequelize, function() {
done() done()
}) })
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!