不要怂,就是干,撸起袖子干!

Commit d8484e92 by Jan Aagaard Meier

Removed semicolons and a bit of unnessecary code in mariadb connectormanager

1 parent 7be7eb9c
...@@ -29,17 +29,19 @@ module.exports = (function() { ...@@ -29,17 +29,19 @@ module.exports = (function() {
minConnections: 0, minConnections: 0,
maxIdleTime: 1000, maxIdleTime: 1000,
handleDisconnects: false, handleDisconnects: false,
validate: validateConnection validate: function(client) {
}); return client && client.connected
this.pendingQueries = 0; }
this.useReplicaton = !!config.replication; })
this.useQueue = config.queue !== undefined ? config.queue : true; this.pendingQueries = 0
this.useReplicaton = !!config.replication
this.useQueue = config.queue !== undefined ? config.queue : true
var self = this var self = this
if (this.useReplicaton) { if (this.useReplicaton) {
var reads = 0, var reads = 0
writes = 0; , writes = 0
// Init configs with options from config if not present // Init configs with options from config if not present
for (var i in config.replication.read) { for (var i in config.replication.read) {
...@@ -49,7 +51,7 @@ module.exports = (function() { ...@@ -49,7 +51,7 @@ module.exports = (function() {
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
database: this.config.database database: this.config.database
}); })
} }
config.replication.write = Utils._.defaults(config.replication.write, { config.replication.write = Utils._.defaults(config.replication.write, {
host: this.config.host, host: this.config.host,
...@@ -57,27 +59,27 @@ module.exports = (function() { ...@@ -57,27 +59,27 @@ module.exports = (function() {
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
database: this.config.database database: this.config.database
}); })
// I'll make my own pool, with blackjack and hookers! // I'll make my own pool, with blackjack and hookers!
this.pool = { this.pool = {
release: function (client) { release: function (client) {
if (client.queryType == 'read') { if (client.queryType == 'read') {
return this.read.release(client); return this.read.release(client)
} else { } else {
return this.write.release(client); return this.write.release(client)
} }
}, },
acquire: function (callback, priority, queryType) { acquire: function (callback, priority, queryType) {
if (queryType == 'SELECT') { if (queryType == 'SELECT') {
this.read.acquire(callback, priority); this.read.acquire(callback, priority)
} else { } else {
this.write.acquire(callback, priority); this.write.acquire(callback, priority)
} }
}, },
drain: function () { drain: function () {
this.read.drain(); this.read.drain()
this.write.drain(); this.write.drain()
}, },
read: Pooling.Pool({ read: Pooling.Pool({
name: 'sequelize-read', name: 'sequelize-read',
...@@ -85,12 +87,12 @@ module.exports = (function() { ...@@ -85,12 +87,12 @@ module.exports = (function() {
if (reads >= self.config.replication.read.length) { if (reads >= self.config.replication.read.length) {
reads = 0 reads = 0
} }
var config = self.config.replication.read[reads++]; var config = self.config.replication.read[reads++]
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
connection.queryType = 'read' connection.queryType = 'read'
done(null, connection) done(null, connection)
}, config); }, config)
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
...@@ -106,7 +108,7 @@ module.exports = (function() { ...@@ -106,7 +108,7 @@ module.exports = (function() {
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
connection.queryType = 'write' connection.queryType = 'write'
done(null, connection) done(null, connection)
}, self.config.replication.write); }, self.config.replication.write)
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
...@@ -146,9 +148,7 @@ module.exports = (function() { ...@@ -146,9 +148,7 @@ module.exports = (function() {
}) })
} }
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype); Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
var isConnecting = false;
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.isConnected && !this.pool) { if (!this.isConnected && !this.pool) {
...@@ -160,30 +160,31 @@ module.exports = (function() { ...@@ -160,30 +160,31 @@ module.exports = (function() {
query: new Query(this.client, this.sequelize, callee, options || {}), query: new Query(this.client, this.sequelize, callee, options || {}),
client: this.client, client: this.client,
sql: sql sql: sql
}; }
enqueue.call(this, queueItem, options); enqueue.call(this, queueItem, options)
return queueItem.query; return queueItem.query
} }
var self = this, query = new Query(this.client, this.sequelize, callee, options || {}); var self = this
this.pendingQueries++; , query = new Query(this.client, this.sequelize, callee, options || {})
this.pendingQueries++
query.done(function() { query.done(function() {
self.pendingQueries--; self.pendingQueries--
if (self.pool) { if (self.pool) {
self.pool.release(query.client); self.pool.release(query.client)
} else { } else {
if (self.pendingQueries === 0) { if (self.pendingQueries === 0) {
setTimeout(function() { setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self) self.pendingQueries === 0 && self.disconnect.call(self)
}, 100); }, 100)
} }
} }
}); })
if (!this.pool) { if (!this.pool) {
query.run(sql); query.run(sql)
} else { } else {
this.pool.acquire(function(err, client) { this.pool.acquire(function(err, client) {
if (err) { if (err) {
...@@ -192,72 +193,64 @@ module.exports = (function() { ...@@ -192,72 +193,64 @@ module.exports = (function() {
query.client = client query.client = client
query.run(sql) query.run(sql)
return; return
}, undefined, options.type) }, undefined, options.type)
} }
return query; return query
}; }
ConnectorManager.prototype.connect = function() { ConnectorManager.prototype.connect = function() {
var self = this; var self = this
// in case database is slow to connect, prevent orphaning the client // in case database is slow to connect, prevent orphaning the client
if (this.isConnecting || this.pool) { if (this.isConnecting || this.pool) {
return; return
} }
connect.call(self, function(err, client) { connect.call(self, function(err, client) {
self.client = client; self.client = client
return; return
}); })
return; return
}; }
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
if (this.client) { if (this.client) {
disconnect.call(this, this.client) disconnect.call(this, this.client)
} }
return return
}; }
// private // private
var disconnect = function(client) { var disconnect = function(client) {
var self = this; var self = this
if (!this.useQueue) { if (!this.useQueue) {
this.client = null; this.client = null
client.end()
return
} }
client.end(function() { var intervalObj = null
if (!self.useQueue) { var cleanup = function () {
return client.destroy(); // make sure to let queued items be finish before calling end
if (self && self.hasQueuedItems) {
return
} }
client.end()
var intervalObj = null if (self && self.client) {
var cleanup = function () { self.client = null
var retryCt = 0
// make sure to let client finish before calling destroy
if (self && self.hasQueuedItems) {
return
}
// needed to prevent mysql connection leak
client.destroy()
if (self && self.client) {
self.client = null
}
clearInterval(intervalObj)
} }
intervalObj = setInterval(cleanup, 10) clearInterval(intervalObj)
cleanup() }
return intervalObj = setInterval(cleanup, 10)
}) cleanup()
} }
var connect = function(done, config) { var connect = function(done, config) {
config = config || this.config config = config || this.config
var emitter = new (require('events').EventEmitter)() var self = this
, self = this
, client , client
this.isConnecting = true this.isConnecting = true
...@@ -268,12 +261,12 @@ module.exports = (function() { ...@@ -268,12 +261,12 @@ module.exports = (function() {
password: config.password, password: config.password,
db: config.database, db: config.database,
metadata: true metadata: true
}; }
if (config.dialectOptions) { if (config.dialectOptions) {
Object.keys(config.dialectOptions).forEach(function (key) { Object.keys(config.dialectOptions).forEach(function (key) {
connectionConfig[key] = config.dialectOptions[key]; connectionConfig[key] = config.dialectOptions[key];
}); })
} }
client = new mariadb() client = new mariadb()
...@@ -310,10 +303,6 @@ module.exports = (function() { ...@@ -310,10 +303,6 @@ module.exports = (function() {
}) })
} }
var validateConnection = function(client) {
return client && client.state !== 'disconnected'
}
var enqueue = function(queueItem, options) { var enqueue = function(queueItem, options) {
options = options || {} options = options || {}
if (this.activeQueue.length < this.maxConcurrentQueries) { if (this.activeQueue.length < this.maxConcurrentQueries) {
...@@ -350,7 +339,7 @@ module.exports = (function() { ...@@ -350,7 +339,7 @@ module.exports = (function() {
var transferQueuedItems = function(count) { var transferQueuedItems = function(count) {
for(var i = 0; i < count; i++) { for(var i = 0; i < count; i++) {
var queueItem = this.queue.shift(); var queueItem = this.queue.shift()
if (queueItem) { if (queueItem) {
enqueue.call(this, queueItem) enqueue.call(this, queueItem)
} }
......
...@@ -58,7 +58,7 @@ module.exports = (function() { ...@@ -58,7 +58,7 @@ module.exports = (function() {
case "TIMESTAMP": case "TIMESTAMP":
case "DATETIME": case "DATETIME":
row[prop] = new Date(row[prop] + 'Z') row[prop] = new Date(row[prop] + 'Z')
break; break
case "BIT": case "BIT":
case "BLOB": case "BLOB":
case "TINYBLOB": case "TINYBLOB":
...@@ -67,7 +67,7 @@ module.exports = (function() { ...@@ -67,7 +67,7 @@ module.exports = (function() {
if (metadata.charsetNrs[prop] === 63) { // binary if (metadata.charsetNrs[prop] === 63) { // binary
row[prop] = new Buffer(row[prop]) row[prop] = new Buffer(row[prop])
} }
break; break
case "TIME": case "TIME":
case "CHAR": case "CHAR":
case "VARCHAR": case "VARCHAR":
...@@ -75,7 +75,7 @@ module.exports = (function() { ...@@ -75,7 +75,7 @@ module.exports = (function() {
case "ENUM": case "ENUM":
case "GEOMETRY": case "GEOMETRY":
case "NULL": case "NULL":
break; break
default: default:
// blank // blank
} }
...@@ -85,41 +85,39 @@ module.exports = (function() { ...@@ -85,41 +85,39 @@ module.exports = (function() {
}) })
.on('error', function(err) { .on('error', function(err) {
errorDetected = true errorDetected = true
self.emit('sql', this.sql) self.emit('sql', self.sql)
self.emit('error', err, this.callee) self.emit('error', err, self.callee)
}.bind(this)) })
.on('end', function(info) { .on('end', function(info) {
if (alreadyEnded || errorDetected) { if (alreadyEnded || errorDetected) {
return return
} }
alreadyEnded = true alreadyEnded = true
self.emit('sql', this.sql) self.emit('sql', self.sql)
// we need to figure out whether to send the result set // we need to figure out whether to send the result set
// or info depending upon the type of query // or info depending upon the type of query
if (/^call/.test(this.sql.toLowerCase())) { if (/^call/.test(self.sql.toLowerCase())) {
self.emit('success', resultSet) self.emit('success', resultSet)
} else if( /^show/.test(this.sql.toLowerCase()) || } else if( /^show/.test(self.sql.toLowerCase()) ||
/^select/.test(this.sql.toLowerCase()) || /^select/.test(self.sql.toLowerCase()) ||
/^describe/.test(this.sql.toLowerCase())) { /^describe/.test(self.sql.toLowerCase())) {
self.emit('success', this.formatResults(resultSet)) self.emit('success', self.formatResults(resultSet))
} else { } else {
self.emit('success', this.formatResults(info)) self.emit('success', self.formatResults(info))
} }
}.bind(this)); })
}.bind(this)) })
.on('error', function(err) { .on('error', function(err) {
if (errorDetected) { if (errorDetected) {
return return
} }
errorDetected = true errorDetected = true
self.emit('sql', this.sql) self.emit('sql', self.sql)
self.emit('error', err, this.callee) self.emit('error', err, self.callee)
}.bind(this)) })
.on('end', function(info) { .setMaxListeners(100)
// nothing here (query info is returned during the 'result' event)
}.bind(this)).setMaxListeners(100)
return this return this
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!