不要怂,就是干,撸起袖子干!

Commit 8921b137 by sdepold

Merge branch 'mysql-pooling'

2 parents 1e641c88 ad7c3f08
......@@ -5,6 +5,7 @@
- [FEATURE] When instantiating the sequelize object, you can now pass a function to logging. This allows you to customize the logging behavior. Default is now: console.log (thanks to kenperkins)
- [BUG] The default logging is still console.log but is wrapped after initialization as it crashes node < 0.6.x.
- [FEATURE] postgresql support. (thanks to swoodtke)
- [FEATURE] connection-pooling for mysql. (thanks to megshark)
# v1.3.7 #
- [BUG] fixed issue where multiple belongsTo or hasOne associations to the same table overwrite each other
......
var Query = require("./query")
var Pooling = require('generic-pool')
, Query = require("./query")
, Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
......@@ -11,13 +12,43 @@ module.exports = (function() {
this.queue = []
this.activeQueue = []
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.poolCfg = this.config.pool
var self = this
if (this.poolCfg) {
//the user has requested pooling, so create our connection pool
if (!this.poolCfg.maxConnections) {
this.poolCfg.maxConnections = 10
}
this.pool = Pooling.Pool({
name: 'sequelize-mysql',
create: function (done) {
connect.call(self, done)
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
})
}
process.on('exit', function () {
//be nice & close our connections on exit
if (self.pool) {
self.pool.drain()
} else if (self.client) {
disconnect(self.client)
}
return
})
}
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
var isConnecting = false
ConnectorManager.prototype.query = function(sql, callee, options) {
if(!this.isConnected) this.connect()
if(!this.isConnected && !this.pool) this.connect()
var queueItem = {
query: new Query(this.client, callee, options || {}),
......@@ -32,53 +63,95 @@ module.exports = (function() {
ConnectorManager.prototype.connect = function() {
var self = this
// in case database is slow to connect, prevent orphaning the client
if (this.isConnecting) {
if (this.isConnecting || this.pool) {
return
}
this.client = require("mysql").createClient({
user: this.config.username,
password: this.config.password,
host: this.config.host,
port: this.config.port,
database: this.config.database
connect.call(self, function(err, client) {
self.client = client
return
})
this.client.setMaxListeners(this.maxConcurrentQueries)
this.isConnecting = false
return
}
ConnectorManager.prototype.disconnect = function() {
if (this.client)
disconnect.call(this, this.client)
return
}
// private
var disconnect = function(client) {
var self = this
this.client.end(function() {
client.end(function() {
var intervalObj = null
var cleanup = function () {
var retryCt = 0
// make sure to let client finish before calling destroy
if (!self.hasNoConnections) {
if (self && !self.hasNoConnections) {
return
}
// needed to prevent mysql connection leak
self.client.destroy()
client.destroy()
if (self && self.client) {
self.client = null
}
clearInterval(intervalObj)
}
intervalObj = setInterval(cleanup, 10)
cleanup()
return
})
}
// private
var connect = function(done) {
var self = this
var client = require("mysql").createClient({
user: self.config.username,
password: self.config.password,
host: self.config.host,
port: self.config.port,
database: self.config.database
})
client.setMaxListeners(self.maxConcurrentQueries)
self.isConnecting = false
done(null, client)
return
}
var enqueue = function(queueItem) {
if(this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem)
if (this.pool) {
var self = this
this.pool.acquire(function(err, client) {
if (err) {
queueItem.query.emit('failure', err)
return
}
//we set the client here, asynchronously, when getting a pooled connection
//allowing the ConnectorManager.query method to remain synchronous
queueItem.query.client = client
queueItem.client = client
execQueueItem.call(self, queueItem)
return
})
}
else
{
execQueueItem.call(this, queueItem)
}
} else {
this.queue.push(queueItem)
}
}
var dequeue = function(queueItem) {
//return the item's connection to the pool
if (this.pool) {
this.pool.release(queueItem.client)
}
this.activeQueue = without(this.activeQueue, queueItem)
}
......@@ -108,11 +181,11 @@ module.exports = (function() {
.success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql)
queueItem.query.run(queueItem.sql, queueItem.client)
}
ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
return (this.queue.length == 0) && (this.activeQueue.length == 0) && this.client._queue && (this.client._queue.length == 0)
return (this.queue.length == 0) && (this.activeQueue.length == 0) && (this.client == null || (this.client._queue && (this.client._queue.length == 0)))
})
ConnectorManager.prototype.__defineGetter__('isConnected', function() {
......@@ -130,3 +203,5 @@ module.exports = (function() {
return ConnectorManager
})()
......@@ -32,7 +32,8 @@ module.exports = (function() {
username: username,
password: (( (["", null, false].indexOf(password) > -1) || (typeof password == 'undefined')) ? null : password),
host : this.options.host,
port : this.options.port
port : this.options.port,
pool : this.options.pool
}
var ConnectorManager = require("./dialects/" + this.options.dialect + "/connector-manager")
......
......@@ -15,7 +15,8 @@
"lingo": "0.0.x",
"validator": "0.3.x",
"moment": "1.1.x",
"commander": "0.5.x"
"commander": "0.5.x",
"generic-pool": "1.0.9"
},
"devDependencies": {
"jasmine-node": "1.0.22",
......
......@@ -3,12 +3,14 @@ module.exports = {
return parseInt(Math.random() * 999)
},
//make maxIdleTime small so that tests exit promptly
mysql: {
username: "root",
password: null,
database: 'sequelize_test',
host: '127.0.0.1',
port: 3306
port: 3306,
pool: { maxConnections: 5, maxIdleTime: 30}
},
sqlite: {
......
var config = require("../config/config")
, Sequelize = require("../../index")
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { logging: false })
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { pool: config.mysql.pool, logging: false })
, Helpers = new (require("../config/helpers"))(sequelize)
describe('HasMany', function() {
......
var config = require("../config/config")
, Sequelize = require("../../index")
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { logging: false })
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { pool: config.mysql.pool, logging: false })
, Helpers = new (require("../config/helpers"))(sequelize)
describe('Associations', function() {
......
var config = require("../config/config")
, Sequelize = require("../../index")
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { logging: false })
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { pool: config.mysql.pool, logging: false })
, Helpers = new (require("../config/helpers"))(sequelize)
describe('ConnectorManager', function() {
......
var config = require("../config/config")
, Sequelize = require("../../index")
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { logging: false })
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { pool: config.mysql.pool, logging: false })
, Helpers = new (require("../config/helpers"))(sequelize)
describe('DAOFactory', function() {
......
var config = require("../config/config")
, Sequelize = require("../../index")
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { logging: false })
, sequelize = new Sequelize(config.mysql.database, config.mysql.username, config.mysql.password, { pool: config.mysql.pool, logging: false })
, Helpers = new (require("../config/helpers"))(sequelize)
, QueryGenerator = require("../../lib/dialects/mysql/query-generator")
, util = require("util")
......
......@@ -2,5 +2,6 @@ module.exports = {
username: "root",
password: null,
database: 'sequelize_test',
host: '127.0.0.1'
host: '127.0.0.1',
pool: { maxConnections: 5, maxIdleTime: 30000}
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!