不要怂,就是干,撸起袖子干!

Commit ff57af63 by Daniel Durante

postgres' pools will now work correctly.

1 parent db577ddb
...@@ -6,11 +6,12 @@ module.exports = (function() { ...@@ -6,11 +6,12 @@ module.exports = (function() {
var pgModule = config.dialectModulePath || 'pg' var pgModule = config.dialectModulePath || 'pg'
this.sequelize = sequelize this.sequelize = sequelize
this.client = null this.client = null
this.config = config || {} this.config = config || {}
this.config.port = this.config.port || 5432 this.config.port = this.config.port || 5432
this.pooling = (!!this.config.poolCfg && (this.config.poolCfg.maxConnections > 0)) this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0))
this.pg = this.config.native ? require(pgModule).native : require(pgModule) this.pg = this.config.native ? require(pgModule).native : require(pgModule)
this.poolIdentifier = null
// Better support for BigInts // Better support for BigInts
// https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935 // https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
...@@ -18,52 +19,60 @@ module.exports = (function() { ...@@ -18,52 +19,60 @@ module.exports = (function() {
// set pooling parameters if specified // set pooling parameters if specified
if (this.pooling) { if (this.pooling) {
this.pg.defaults.poolSize = this.config.poolCfg.maxConnections this.pg.defaults.poolSize = this.config.pool.maxConnections
this.pg.defaults.poolIdleTimeout = this.config.poolCfg.maxIdleTime this.pg.defaults.poolIdleTimeout = this.config.pool.maxIdleTime
} }
this.disconnectTimeoutId = null this.disconnectTimeoutId = null
this.pendingQueries = 0 this.pendingQueries = 0
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50) this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
process.on('exit', function() {
this.disconnect()
}.bind(this))
} }
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype) Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
var isConnecting = false ConnectorManager.prototype.endQuery = function() {
var isConnected = false
ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this var self = this
if (this.client === null) { if (!self.pooling && self.pendingQueries === 0) {
this.connect() setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self)
}, 100)
} }
var query = new Query(this.client, this.sequelize, callee, options || {})
self.pendingQueries += 1
return query.run(sql)
.success(function() { self.endQuery.call(self) })
.error(function() { self.endQuery.call(self) })
} }
ConnectorManager.prototype.endQuery = function() { ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this var self = this
self.pendingQueries -= 1
if (self.pendingQueries == 0) { self.pendingQueries++
setTimeout(function() {
self.pendingQueries == 0 && self.disconnect.call(self) return new Utils.CustomEventEmitter(function(emitter) {
}, 100) self.connect()
} .on('error', function(err) {
emitter.emit('error', err)
})
.on('success', function(done) {
var query = new Query(self.client, self.sequelize, callee, options || {})
done = done || null
query.run(sql, done)
.success(function(results) { emitter.emit('success', results); self.endQuery.call(self) })
.error(function(err) { emitter.emit('error', err); self.endQuery.call(self) })
.on('sql', function(sql) { emitter.emit('sql', sql) })
})
}).run().complete(function() { self.pendingQueries-- })
} }
ConnectorManager.prototype.connect = function() { ConnectorManager.prototype.connect = function(callback) {
var self = this var self = this
var emitter = new (require('events').EventEmitter)() var emitter = new (require('events').EventEmitter)()
// in case database is slow to connect, prevent orphaning the client // in case database is slow to connect, prevent orphaning the client
if (this.isConnecting) { if (this.isConnecting) {
return emitter.emit('success')
return emitter
} }
this.isConnecting = true this.isConnecting = true
...@@ -71,36 +80,44 @@ module.exports = (function() { ...@@ -71,36 +80,44 @@ module.exports = (function() {
var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config) var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
var connectCallback = function(err, client) { var connectCallback = function(err, client, done) {
self.isConnecting = false self.isConnecting = false
if (!!err) { if (!!err) {
switch(err.code) { // release the pool immediately, very important.
case 'ECONNREFUSED': done && done(err)
emitter.emit('error', 'Failed to authenticate for PostgresSQL. Please double check your settings.')
break if (err.code) {
case 'ENOTFOUND': switch(err.code) {
case 'EHOSTUNREACH': case 'ECONNREFUSED':
case 'EINVAL': emitter.emit('error', new Error("Failed to authenticate for PostgresSQL. Please double check your settings."))
emitter.emit('error', 'Failed to find PostgresSQL server. Please double check your settings.') break
break case 'ENOTFOUND':
default: case 'EHOSTUNREACH':
emitter.emit('error', err) case 'EINVAL':
emitter.emit('error', new Error("Failed to find PostgresSQL server. Please double check your settings."))
break
default:
emitter.emit('error', err)
break
}
} }
} else if (client) { } else if (client) {
client.query("SET TIME ZONE 'UTC'") client.query("SET TIME ZONE 'UTC'").on('end', function() {
.on('end', function() {
self.isConnected = true self.isConnected = true
this.client = client self.client = client
emitter.emit('success', done)
}); });
} else { } else {
this.client = null self.client = null
emitter.emit('success', done)
} }
} }
if (this.pooling) { if (this.pooling) {
// acquire client from pool // acquire client from pool
this.pg.connect(uri, connectCallback) this.poolIdentifier = this.pg.pools.getOrCreate(this.sequelize.config)
this.poolIdentifier.connect(connectCallback)
} else { } else {
//create one-off client //create one-off client
this.client = new this.pg.Client(uri) this.client = new this.pg.Client(uri)
...@@ -111,8 +128,14 @@ module.exports = (function() { ...@@ -111,8 +128,14 @@ module.exports = (function() {
} }
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
var self = this if (this.poolIdentifier) {
if (this.client) this.client.end() this.poolIdentifier.destroyAllNow()
}
if (this.client) {
this.client.end()
}
this.client = null this.client = null
this.isConnecting = false this.isConnecting = false
this.isConnected = false this.isConnected = false
......
...@@ -18,16 +18,17 @@ module.exports = (function() { ...@@ -18,16 +18,17 @@ module.exports = (function() {
} }
Utils.inherit(Query, AbstractQuery) Utils.inherit(Query, AbstractQuery)
Query.prototype.run = function(sql) { Query.prototype.run = function(sql, done) {
this.sql = sql this.sql = sql
var self = this
if (this.options.logging !== false) { if (this.options.logging !== false) {
this.options.logging('Executing: ' + this.sql) this.options.logging('Executing: ' + this.sql)
} }
var receivedError = false var receivedError = false
, query = this.client.query(sql) , query = this.client.query(sql)
, rows = [] , rows = []
query.on('row', function(row) { query.on('row', function(row) {
rows.push(row) rows.push(row)
...@@ -39,13 +40,14 @@ module.exports = (function() { ...@@ -39,13 +40,14 @@ module.exports = (function() {
}.bind(this)) }.bind(this))
query.on('end', function() { query.on('end', function() {
done && done()
this.emit('sql', this.sql) this.emit('sql', this.sql)
if (receivedError) { if (receivedError) {
return return
} }
onSuccess.call(this, rows) onSuccess.call(this, rows, sql)
}.bind(this)) }.bind(this))
return this return this
...@@ -55,11 +57,11 @@ module.exports = (function() { ...@@ -55,11 +57,11 @@ module.exports = (function() {
return 'id' return 'id'
} }
var onSuccess = function(rows) { var onSuccess = function(rows, sql) {
var results = [] var results = []
, self = this , self = this
, isTableNameQuery = (this.sql.indexOf('SELECT table_name FROM information_schema.tables') === 0) , isTableNameQuery = (sql.indexOf('SELECT table_name FROM information_schema.tables') === 0)
, isRelNameQuery = (this.sql.indexOf('SELECT relname FROM pg_class WHERE oid IN') === 0) , isRelNameQuery = (sql.indexOf('SELECT relname FROM pg_class WHERE oid IN') === 0)
if (isTableNameQuery || isRelNameQuery) { if (isTableNameQuery || isRelNameQuery) {
if (isRelNameQuery) { if (isRelNameQuery) {
...@@ -70,7 +72,7 @@ module.exports = (function() { ...@@ -70,7 +72,7 @@ module.exports = (function() {
} }
}) })
} else { } else {
results = rows.map(function(row) { return Utils._.values(row) }) results = rows.map(function(row) { return Utils._.values(row) })
} }
return this.emit('success', results) return this.emit('success', results)
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!