不要怂,就是干,撸起袖子干!

Commit fd22bf1c by Jan Aagaard Meier

Last fixes to mariadb, and fix import with relative paths on windows

1 parent 9a0c7d4b
var mariasql var mariadb
, Pooling = require('generic-pool') , Pooling = require('generic-pool')
, Query = require("./query") , Query = require("./query")
, Utils = require("../../utils") , Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) } , without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
try { mariasql = require("mariasql") } catch (err) {
console.log("You need to install mariasql package manually"); }
module.exports = (function() { module.exports = (function() {
var ConnectorManager = function(sequelize, config) { var ConnectorManager = function(sequelize, config) {
try {
if (config.dialectModulePath) {
mariadb = require(config.dialectModulePath)
} else {
mariadb = require('mariasql')
}
} catch (err) {
console.log('You need to install mariasql package manually')
}
this.sequelize = sequelize this.sequelize = sequelize
this.client = null this.client = null
this.config = config || {} this.config = config || {}
this.config.port = this.config.port || 3306
this.disconnectTimeoutId = null this.disconnectTimeoutId = null
this.queue = [] this.queue = []
this.activeQueue = [] this.activeQueue = []
...@@ -19,7 +27,9 @@ module.exports = (function() { ...@@ -19,7 +27,9 @@ module.exports = (function() {
this.poolCfg = Utils._.defaults(this.config.pool, { this.poolCfg = Utils._.defaults(this.config.pool, {
maxConnections: 10, maxConnections: 10,
minConnections: 0, minConnections: 0,
maxIdleTime: 1000 maxIdleTime: 1000,
handleDisconnects: false,
validate: validateConnection
}); });
this.pendingQueries = 0; this.pendingQueries = 0;
this.useReplicaton = !!config.replication; this.useReplicaton = !!config.replication;
...@@ -38,8 +48,7 @@ module.exports = (function() { ...@@ -38,8 +48,7 @@ module.exports = (function() {
port: this.config.port, port: this.config.port,
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
db: this.config.database, database: this.config.database
ssl: this.config.ssl
}); });
} }
config.replication.write = Utils._.defaults(config.replication.write, { config.replication.write = Utils._.defaults(config.replication.write, {
...@@ -47,8 +56,7 @@ module.exports = (function() { ...@@ -47,8 +56,7 @@ module.exports = (function() {
port: this.config.port, port: this.config.port,
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
db: this.config.database, database: this.config.database
ssl: this.config.ssl
}); });
// I'll make my own pool, with blackjack and hookers! // I'll make my own pool, with blackjack and hookers!
...@@ -74,17 +82,20 @@ module.exports = (function() { ...@@ -74,17 +82,20 @@ module.exports = (function() {
read: Pooling.Pool({ read: Pooling.Pool({
name: 'sequelize-read', name: 'sequelize-read',
create: function (done) { create: function (done) {
if (reads >= self.config.replication.read.length) reads = 0; if (reads >= self.config.replication.read.length) {
reads = 0
}
var config = self.config.replication.read[reads++]; var config = self.config.replication.read[reads++];
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
connection.queryType = 'read' connection.queryType = 'read'
done(err, connection) done(null, connection)
}, config); }, config);
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
}, },
validate: self.poolCfg.validate,
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime idleTimeoutMillis: self.poolCfg.maxIdleTime
...@@ -94,12 +105,13 @@ module.exports = (function() { ...@@ -94,12 +105,13 @@ module.exports = (function() {
create: function (done) { create: function (done) {
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
connection.queryType = 'write' connection.queryType = 'write'
done(err, connection) done(null, connection)
}, self.config.replication.write); }, self.config.replication.write);
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
}, },
validate: self.poolCfg.validate,
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime idleTimeoutMillis: self.poolCfg.maxIdleTime
...@@ -117,6 +129,7 @@ module.exports = (function() { ...@@ -117,6 +129,7 @@ module.exports = (function() {
}, },
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
validate: self.poolCfg.validate,
idleTimeoutMillis: self.poolCfg.maxIdleTime idleTimeoutMillis: self.poolCfg.maxIdleTime
}) })
} }
...@@ -145,6 +158,7 @@ module.exports = (function() { ...@@ -145,6 +158,7 @@ module.exports = (function() {
if (this.useQueue) { if (this.useQueue) {
var queueItem = { var queueItem = {
query: new Query(this.client, this.sequelize, callee, options || {}), query: new Query(this.client, this.sequelize, callee, options || {}),
client: this.client,
sql: sql sql: sql
}; };
...@@ -157,11 +171,12 @@ module.exports = (function() { ...@@ -157,11 +171,12 @@ module.exports = (function() {
query.done(function() { query.done(function() {
self.pendingQueries--; self.pendingQueries--;
if (self.pool) self.pool.release(query.client); if (self.pool) {
else { self.pool.release(query.client);
} else {
if (self.pendingQueries === 0) { if (self.pendingQueries === 0) {
setTimeout(function() { setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self); self.pendingQueries === 0 && self.disconnect.call(self)
}, 100); }, 100);
} }
} }
...@@ -169,16 +184,16 @@ module.exports = (function() { ...@@ -169,16 +184,16 @@ module.exports = (function() {
if (!this.pool) { if (!this.pool) {
query.run(sql); query.run(sql);
} } else {
else {
this.pool.acquire(function(err, client) { this.pool.acquire(function(err, client) {
if (err) return query.emit('error', err); if (err) {
return query.emit('error', err)
}
query.client = client; query.client = client
query.run(sql); query.run(sql)
return; return;
}, undefined, options.type); }, undefined, options.type)
} }
return query; return query;
...@@ -198,8 +213,10 @@ module.exports = (function() { ...@@ -198,8 +213,10 @@ module.exports = (function() {
}; };
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
if (this.client) disconnect.call(this, this.client); if (this.client) {
return; disconnect.call(this, this.client)
}
return
}; };
...@@ -207,49 +224,94 @@ module.exports = (function() { ...@@ -207,49 +224,94 @@ module.exports = (function() {
var disconnect = function(client) { var disconnect = function(client) {
var self = this; var self = this;
if (!this.useQueue) {
this.client = null;
}
if(client.connected) { client.end(function() {
client.end() if (!self.useQueue) {
return client.destroy();
} }
var intervalObj = null
var cleanup = function () {
var retryCt = 0
// make sure to let client finish before calling destroy
if (self && self.hasQueuedItems) {
return
}
// needed to prevent mysql connection leak
client.destroy()
if (self && self.client) {
self.client = null self.client = null
self.isConnecting = false }
clearInterval(intervalObj)
}
intervalObj = setInterval(cleanup, 10)
cleanup()
return return
})
} }
var connect = function(done, config) { var connect = function(done, config) {
config = config || this.config config = config || this.config
var connection = new mariasql() var emitter = new (require('events').EventEmitter)()
, self = this , self = this
, client
this.isConnecting = true this.isConnecting = true
connection.connect({ var connectionConfig = {
host: config.host, host: config.host,
port: config.port, port: config.port,
user: config.username, user: config.username,
password: config.password, password: config.password,
db: config.database, db: config.database,
ssl: config.ssl || undefined,
metadata: true metadata: true
// timezone: 'Z' // unsupported by mariasql };
})
connection.on('connect', function() { if (config.dialectOptions) {
connection.query("SET time_zone = '+0:00'"); Object.keys(config.dialectOptions).forEach(function (key) {
connection.setMaxListeners(self.maxConcurrentQueries) connectionConfig[key] = config.dialectOptions[key];
this.isConnecting = false });
}
done(null, connection) client = new mariadb()
}).on('error', function(err) { client.connect(connectionConfig)
this.isConnecting = false client
.on('error', function (err) {
self.isConnecting = false
done(err) done(err)
// disconnect.call(self, connection)
}).on('close', function() {
disconnect.call(self, connection)
}) })
.on('connect', function () {
client.query("SET time_zone = '+0:00'").on('result', function (res) {
res.on('end', function () {
client.setMaxListeners(self.maxConcurrentQueries)
self.isConnecting = false
if (config.pool.handleDisconnects) {
handleDisconnect(self.pool, client)
}
done(null, client)
})
})
})
.on('close', function() {
disconnect.call(self, client)
})
}
var handleDisconnect = function(pool, client) {
client.on('error', function(err) {
if (err.code !== 'PROTOCOL_CONNECTION_LOST') {
throw err
}
pool.destroy(client)
})
}
var validateConnection = function(client) {
return client && client.state !== 'disconnected'
} }
var enqueue = function(queueItem, options) { var enqueue = function(queueItem, options) {
...@@ -287,10 +349,6 @@ module.exports = (function() { ...@@ -287,10 +349,6 @@ module.exports = (function() {
} }
var transferQueuedItems = function(count) { var transferQueuedItems = function(count) {
// prevent possible overrun condition
if( count > this.queue.length )
count = this.queue.length
for(var i = 0; i < count; i++) { for(var i = 0; i < count; i++) {
var queueItem = this.queue.shift(); var queueItem = this.queue.shift();
if (queueItem) { if (queueItem) {
...@@ -300,8 +358,6 @@ module.exports = (function() { ...@@ -300,8 +358,6 @@ module.exports = (function() {
} }
var afterQuery = function(queueItem) { var afterQuery = function(queueItem) {
var self = this
dequeue.call(this, queueItem) dequeue.call(this, queueItem)
transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length) transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
disconnectIfNoConnections.call(this) disconnectIfNoConnections.call(this)
...@@ -319,7 +375,7 @@ module.exports = (function() { ...@@ -319,7 +375,7 @@ module.exports = (function() {
} }
ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() { ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queries && (this.client._queries.length > 0)) return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0))
}) })
// legacy // legacy
...@@ -328,7 +384,7 @@ module.exports = (function() { ...@@ -328,7 +384,7 @@ module.exports = (function() {
}) })
ConnectorManager.prototype.__defineGetter__('isConnected', function() { ConnectorManager.prototype.__defineGetter__('isConnected', function() {
return this.client != null && this.client.connected == true return this.client != null
}) })
var disconnectIfNoConnections = function() { var disconnectIfNoConnections = function() {
......
...@@ -245,11 +245,10 @@ module.exports = (function() { ...@@ -245,11 +245,10 @@ module.exports = (function() {
Sequelize.prototype.import = function(path) { Sequelize.prototype.import = function(path) {
// is it a relative path? // is it a relative path?
if (url.parse(path).pathname.indexOf('/') !== 0) { if (Path.normalize(path).indexOf(path.sep) !== 0) {
// make path relative to the caller // make path relative to the caller
var callerFilename = Utils.stack()[1].getFileName() var callerFilename = Utils.stack()[1].getFileName()
, callerMatch = callerFilename.match(/(.+\/).+?$/) , callerPath = Path.dirname(callerFilename)
, callerPath = callerMatch[1]
path = Path.resolve(callerPath, path) path = Path.resolve(callerPath, path)
} }
......
...@@ -41,14 +41,14 @@ module.exports = { ...@@ -41,14 +41,14 @@ module.exports = {
}, },
mariadb: { mariadb: {
database: process.env.SEQ_PG_DB || process.env.SEQ_DB || 'sequelize_test', database: process.env.SEQ_MYSQL_DB || process.env.SEQ_DB || 'sequelize_test',
username: process.env.SEQ_PG_USER || process.env.SEQ_USER || "root", username: process.env.SEQ_MYSQL_USER || process.env.SEQ_USER || "root",
password: process.env.SEQ_PG_PW || process.env.SEQ_PW || null, password: process.env.SEQ_MYSQL_PW || process.env.SEQ_PW || null,
host: process.env.SEQ_PG_HOST || process.env.SEQ_HOST || '127.0.0.1', host: process.env.SEQ_MYSQL_HOST || process.env.SEQ_HOST || '127.0.0.1',
port: process.env.SEQ_PG_PORT || process.env.SEQ_PORT || 3306, port: process.env.SEQ_MYSQL_PORT || process.env.SEQ_PORT || 3306,
pool: { pool: {
maxConnections: process.env.SEQ_PG_POOL_MAX || process.env.SEQ_POOL_MAX || 5, maxConnections: process.env.SEQ_MYSQL_POOL_MAX || process.env.SEQ_POOL_MAX || 1,
maxIdleTime: process.env.SEQ_PG_POOL_IDLE || process.env.SEQ_POOL_IDLE || 3000 maxIdleTime: process.env.SEQ_MYSQL_POOL_IDLE || process.env.SEQ_POOL_IDLE || 3000
} }
} }
} }
...@@ -63,14 +63,27 @@ var Support = { ...@@ -63,14 +63,27 @@ var Support = {
}, },
clearDatabase: function(sequelize, callback) { clearDatabase: function(sequelize, callback) {
var disablequery = sequelize.getQueryInterface().QueryGenerator.disableForeignKeyConstraintsQuery()
, dropAll = function (cb) {
return function () {
sequelize sequelize
.getQueryInterface() .getQueryInterface()
.dropAllTables() .dropAllTables()
.success(function() { .success(function() {
sequelize.daoFactoryManager.daos = [] sequelize.daoFactoryManager.daos = []
callback && callback() cb && cb()
}) })
.error(function(err) { console.log(err) }) .error(function(err) { console.log("Error clearing database " + err) })
}
}
if (disablequery) {
sequelize.query(disablequery).success(dropAll(function () {
sequelize.query(sequelize.getQueryInterface().QueryGenerator.enableForeignKeyConstraintsQuery()).success(callback)
}))
} else {
dropAll(callback)
}
}, },
getSupportedDialects: function() { getSupportedDialects: function() {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!