不要怂,就是干,撸起袖子干!

Commit 09c8c3f3 by Mick Hansen

Merge pull request #1892 from sequelize/connection-management-refactor

Refactor connection handling - Closes #1601
2 parents 4df8132e a6e249a2
...@@ -15,6 +15,7 @@ Notice: All 1.7.x changes are present in 2.0.x aswell ...@@ -15,6 +15,7 @@ Notice: All 1.7.x changes are present in 2.0.x aswell
- [BUG] An error is now thrown if an association would create a naming conflict between the association and the foreign key when doing eager loading. Closes [#1272](https://github.com/sequelize/sequelize/issues/1272) - [BUG] An error is now thrown if an association would create a naming conflict between the association and the foreign key when doing eager loading. Closes [#1272](https://github.com/sequelize/sequelize/issues/1272)
- [BUG] Fix logging options for sequelize.sync - [BUG] Fix logging options for sequelize.sync
- [BUG] find no longer applies limit: 1 if querying on a primary key, should fix a lot of subquery issues. - [BUG] find no longer applies limit: 1 if querying on a primary key, should fix a lot of subquery issues.
- [BUG] Transactions now use the pool so you will never go over your pool defined connection limit
- [INTERNALS] `bulkDeleteQuery` was removed from the MySQL / abstract query generator, since it was never used internally. Please use `deleteQuery` instead. - [INTERNALS] `bulkDeleteQuery` was removed from the MySQL / abstract query generator, since it was never used internally. Please use `deleteQuery` instead.
...@@ -26,6 +27,7 @@ Notice: All 1.7.x changes are present in 2.0.x aswell ...@@ -26,6 +27,7 @@ Notice: All 1.7.x changes are present in 2.0.x aswell
- `sequelize.showAllSchemas` now returns an array of schemas, instead of an array containinig an array of schemas - `sequelize.showAllSchemas` now returns an array of schemas, instead of an array containinig an array of schemas
- `sequelize.transaction()` now returns a promise rather than a instance of Sequelize.Transaction - `sequelize.transaction()` now returns a promise rather than a instance of Sequelize.Transaction
- `bulkCreate`, `bulkUpdate` and `bulkDestroy` (and aliases) now take both a `hooks` and an `individualHooks` option, `hooks` defines whether or not to run the main hooks, and `individualHooks` defines whether to run hooks for each instance affected. - `bulkCreate`, `bulkUpdate` and `bulkDestroy` (and aliases) now take both a `hooks` and an `individualHooks` option, `hooks` defines whether or not to run the main hooks, and `individualHooks` defines whether to run hooks for each instance affected.
- It is no longer possible to disable pooling, disable pooling will just result in a 1/1 pool.
# v2.0.0-dev11 # v2.0.0-dev11
### Caution: This release contains many changes and is highly experimental ### Caution: This release contains many changes and is highly experimental
......
"use strict";
var Pooling = require('generic-pool')
, Promise = require('../../promise')
, _ = require('lodash')
, defaultPoolingConfig = {
max: 5,
min: 0,
idle: 10000,
handleDisconnects: false
}
, ConnectionManager;
ConnectionManager = function(dialect, sequelize) {
var config = sequelize.config
, self = this;
this.sequelize = sequelize;
this.config = config;
this.dialect = dialect;
if (config.pool) {
config.pool = _.extend({}, config.pool); // Make sure we don't modify the existing config object (user might re-use it)
config.pool =_.defaults(config.pool, defaultPoolingConfig, {
validate: this.$validate.bind(this)
}) ;
} else {
// If the user has turned off pooling we provide a 0/1 pool for backwards compat
config.pool = _.defaults({
max: 1,
min: 0,
}, defaultPoolingConfig, {
validate: this.$validate.bind(this)
});
}
// Map old names
if (config.pool.maxIdleTime) config.pool.idle = config.pool.maxIdleTime;
if (config.pool.maxConnections) config.pool.max = config.pool.maxConnections;
if (config.pool.minConnections) config.pool.min = config.pool.minConnections;
if (config.replication) {
var reads = 0
, writes = 0;
if (!Array.isArray(config.replication.read)) {
config.replication.read = [config.replication.read];
}
// Make sure we don't modify the existing config object (user might re-use it)
config.replication.write = _.extend({}, config.replication.write);
config.replication.read = config.replication.read.map(function (read) {
return _.extend({}, read);
});
// Map main connection config
config.replication.write = _.defaults(config.replication.write, {
host: config.host,
port: config.port,
username: config.username,
password: config.password,
database: config.database
});
for (var i = 0; i < config.replication.read.length; i++) {
config.replication.read[i] = _.defaults(config.replication.read[i], {
host: this.config.host,
port: this.config.port,
username: this.config.username,
password: this.config.password,
database: this.config.database
});
}
// I'll make my own pool, with blackjack and hookers! (original credit goes to @janzeh)
this.pool = {
release: function(client) {
if (client.queryType === 'read') {
return self.pool.read.release(client);
} else {
return self.pool.write.release(client);
}
},
acquire: function(callback, priority, queryType) {
if (queryType === 'SELECT') {
self.pool.read.acquire(callback, priority);
} else {
self.pool.write.acquire(callback, priority);
}
},
drain: function() {
self.pool.read.drain();
self.pool.write.drain();
},
read: Pooling.Pool({
name: 'sequelize-connection-read',
create: function(callback) {
if (reads >= config.replication.read.length) {
reads = 0;
}
// Simple round robin config
self.$connect(config.replication.read[reads++]).tap(function (connection) {
connection.queryType = 'read';
}).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
idleTimeoutMillis: config.pool.idle
}),
write: Pooling.Pool({
name: 'sequelize-connection-write',
create: function(callback) {
self.$connect(config.replication.write).tap(function (connection) {
connection.queryType = 'write';
}).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
idleTimeoutMillis: config.pool.idle
})
};
} else {
this.pool = Pooling.Pool({
name: 'sequelize-connection',
create: function(callback) {
self.$connect(config).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
max: config.pool.max,
min: config.pool.min,
validate: config.pool.validate,
idleTimeoutMillis: config.pool.idle
});
}
this.onProcessExit = function() {
// Cleanup
self.pool.drain();
return;
}.bind(this);
process.on('exit', this.onProcessExit);
};
ConnectionManager.prototype.getConnection = function(options) {
var self = this;
options = options || {};
return new Promise(function (resolve, reject) {
self.pool.acquire(function(err, connection) {
if (err) return reject(err);
resolve(connection);
}, options.priority, options.type);
});
};
ConnectionManager.prototype.releaseConnection = function(connection) {
var self = this;
return new Promise(function (resolve, reject) {
self.pool.release(connection);
resolve();
});
};
ConnectionManager.prototype.$connect = function(config) {
return this.dialect.connectionManager.connect(config);
};
ConnectionManager.prototype.$disconnect = function(connection) {
return this.dialect.connectionManager.disconnect(connection);
};
ConnectionManager.prototype.$validate = function(connection) {
if (!this.dialect.connectionManager.validate) return Promise.resolve();
return this.dialect.connectionManager.validate(connection);
};
module.exports = ConnectionManager;
\ No newline at end of file
'use strict';
module.exports = (function() {
var ConnectorManager = function(sequelize, config) {
throw new Error('Define the constructor!');
};
ConnectorManager.prototype.query = function(sql, callee, options) {
throw new Error('Define the query method!');
};
ConnectorManager.prototype.afterTransactionSetup = function(callback) {
callback();
};
ConnectorManager.prototype.connect = function() {
throw new Error('Define the connect method!');
};
ConnectorManager.prototype.disconnect = function() {
throw new Error('Define the disconnect method!');
};
ConnectorManager.prototype.reconnect = function() {
this.disconnect();
this.connect();
};
ConnectorManager.prototype.cleanup = function() {
if (this.onProcessExit) {
process.removeListener('exit', this.onProcessExit);
}
};
return ConnectorManager;
})();
...@@ -13,4 +13,4 @@ AbstractDialect.prototype.supports = { ...@@ -13,4 +13,4 @@ AbstractDialect.prototype.supports = {
schemas: false schemas: false
}; };
module.exports = AbstractDialect; module.exports = AbstractDialect;
\ No newline at end of file
"use strict";
var AbstractConnectionManager = require('../abstract/connection-manager')
, ConnectionManager
, Utils = require('../../utils')
, Promise = require('../../promise');
ConnectionManager = function(dialect, sequelize) {
AbstractConnectionManager.call(this, dialect, sequelize);
this.sequelize = sequelize;
this.sequelize.config.port = this.sequelize.config.port || 3306;
try {
this.lib = require(sequelize.config.dialectModulePath || 'mariasql');
} catch (err) {
throw new Error('Please install mariasql package manually');
}
};
Utils._.extend(ConnectionManager.prototype, AbstractConnectionManager.prototype);
ConnectionManager.prototype.connect = function(config) {
var self = this;
return new Promise(function (resolve, reject) {
var connectionConfig = {
host: config.host,
port: config.port,
user: config.username,
password: config.password,
db: config.database,
metadata: true
};
if (config.dialectOptions) {
Object.keys(config.dialectOptions).forEach(function(key) {
connectionConfig[key] = config.dialectOptions[key];
});
}
if (connectionConfig.unixSocket) {
delete connectionConfig.host;
delete connectionConfig.port;
}
var connection = new self.lib();
connection.connect(connectionConfig);
connection.on('error', function(err) {
return reject(err);
});
connection.on('connect', function() {
return resolve(connection);
});
}).tap(function (connection) {
connection.query("SET time_zone = '+0:00'");
});
};
ConnectionManager.prototype.disconnect = function(connection) {
return new Promise(function (resolve, reject) {
connection.end();
resolve();
});
};
ConnectionManager.prototype.validate = function(connection) {
return connection && connection.state !== 'disconnected';
};
module.exports = ConnectionManager;
\ No newline at end of file
'use strict'; 'use strict';
var _ = require('lodash') var _ = require('lodash')
, MySQL = require('../mysql'); , MySQL = require('../mysql')
, ConnectionManager = require('./connection-manager')
, Query = require('./query');
var MariaDialect = function(sequelize) { var MariaDialect = function(sequelize) {
this.sequelize = sequelize; this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
}; };
MariaDialect.prototype = _.defaults({ MariaDialect.prototype = _.defaults({
'LIMIT ON UPDATE': true 'LIMIT ON UPDATE': true
}, MySQL.prototype); }, MySQL.prototype);
module.exports = MariaDialect; MariaDialect.prototype.Query = Query;
module.exports = MariaDialect;
\ No newline at end of file
"use strict";
var AbstractConnectionManager = require('../abstract/connection-manager')
, ConnectionManager
, Utils = require('../../utils')
, Promise = require('../../promise');
ConnectionManager = function(dialect, sequelize) {
AbstractConnectionManager.call(this, dialect, sequelize);
this.sequelize = sequelize;
this.sequelize.config.port = this.sequelize.config.port || 3306;
try {
this.lib = require(sequelize.config.dialectModulePath || 'mysql');
} catch (err) {
throw new Error('Please install mysql package manually');
}
};
Utils._.extend(ConnectionManager.prototype, AbstractConnectionManager.prototype);
ConnectionManager.prototype.connect = function(config) {
var self = this;
return new Promise(function (resolve, reject) {
var connectionConfig = {
host: config.host,
port: config.port,
user: config.username,
password: config.password,
database: config.database,
timezone: 'Z'
};
if (config.dialectOptions) {
Object.keys(config.dialectOptions).forEach(function(key) {
connectionConfig[key] = config.dialectOptions[key];
});
}
var connection = self.lib.createConnection(connectionConfig);
connection.connect(function(err) {
if (err) {
switch (err.code) {
case 'ECONNREFUSED':
case 'ER_ACCESS_D2ENIED_ERROR':
reject('Failed to authenticate for MySQL. Please double check your settings.');
break;
case 'ENOTFOUND':
case 'EHOSTUNREACH':
case 'EINVAL':
reject('Failed to find MySQL server. Please double check your settings.');
break;
default:
reject(err);
break;
}
return;
}
resolve(connection);
});
}).tap(function (connection) {
connection.query("SET time_zone = '+0:00'");
});
};
ConnectionManager.prototype.disconnect = function(connection) {
return new Promise(function (resolve, reject) {
connection.end(function(err) {
if (err) return reject(err);
resolve();
});
});
};
ConnectionManager.prototype.validate = function(connection) {
return connection && connection.state !== 'disconnected';
};
module.exports = ConnectionManager;
\ No newline at end of file
'use strict'; 'use strict';
var _ = require('lodash') var _ = require('lodash')
, Abstract = require('../abstract'); , Abstract = require('../abstract')
, ConnectionManager = require('./connection-manager')
, Query = require('./query');
var MysqlDialect = function(sequelize) { var MysqlDialect = function(sequelize) {
this.sequelize = sequelize; this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
}; };
MysqlDialect.prototype.supports = _.defaults({ MysqlDialect.prototype.supports = _.defaults({
...@@ -14,4 +17,6 @@ MysqlDialect.prototype.supports = _.defaults({ ...@@ -14,4 +17,6 @@ MysqlDialect.prototype.supports = _.defaults({
forShare: 'LOCK IN SHARE MODE' forShare: 'LOCK IN SHARE MODE'
}, Abstract.prototype.supports); }, Abstract.prototype.supports);
module.exports = MysqlDialect; MysqlDialect.prototype.Query = Query;
module.exports = MysqlDialect;
\ No newline at end of file
...@@ -5,8 +5,8 @@ var Utils = require('../../utils') ...@@ -5,8 +5,8 @@ var Utils = require('../../utils')
, uuid = require('node-uuid'); , uuid = require('node-uuid');
module.exports = (function() { module.exports = (function() {
var Query = function(client, sequelize, callee, options) { var Query = function(connection, sequelize, callee, options) {
this.client = client; this.connection = connection;
this.callee = callee; this.callee = callee;
this.sequelize = sequelize; this.sequelize = sequelize;
this.uuid = uuid.v4(); this.uuid = uuid.v4();
...@@ -18,10 +18,6 @@ module.exports = (function() { ...@@ -18,10 +18,6 @@ module.exports = (function() {
var self = this; var self = this;
this.checkLoggingOption(); this.checkLoggingOption();
this.promise = new Utils.Promise(function(resolve, reject) {
self.resolve = resolve;
self.reject = reject;
});
}; };
Utils.inherit(Query, AbstractQuery); Utils.inherit(Query, AbstractQuery);
...@@ -30,22 +26,24 @@ module.exports = (function() { ...@@ -30,22 +26,24 @@ module.exports = (function() {
this.sql = sql; this.sql = sql;
if (this.options.logging !== false) { if (this.options.logging !== false) {
this.sequelize.log('Executing (' + this.options.uuid + '): ' + this.sql); this.sequelize.log('Executing (' + this.connnection.uuid + '): ' + this.sql);
} }
self.client.query(self.sql, function(err, results, fields) { var promise = new Utils.Promise(function(resolve, reject) {
self.promise.emit('sql', self.sql, self.options.uuid); self.connection.query(self.sql, function(err, results, fields) {
promise.emit('sql', self.sql, self.connection.uuid);
if (err) { if (err) {
err.sql = sql; err.sql = sql;
reject(err);
} else {
resolve(self.formatResults(results));
}
}).setMaxListeners(100);
self.reject(err); });
} else {
self.resolve(self.formatResults(results));
}
}).setMaxListeners(100);
return this.promise; return promise;
}; };
return Query; return Query;
......
"use strict";
var AbstractConnectionManager = require('../abstract/connection-manager')
, ConnectionManager
, Utils = require('../../utils')
, Promise = require('../../promise');
ConnectionManager = function(dialect, sequelize) {
AbstractConnectionManager.call(this, dialect, sequelize);
this.sequelize = sequelize;
this.sequelize.config.port = this.sequelize.config.port || 5432;
try {
this.lib = sequelize.config.native ? require(sequelize.config.dialectModulePath || 'pg').native : require(sequelize.config.dialectModulePath || 'pg');
} catch (err) {
throw new Error('Please install postgres package manually');
}
};
Utils._.extend(ConnectionManager.prototype, AbstractConnectionManager.prototype);
ConnectionManager.prototype.connect = function(config) {
var self = this;
return new Promise(function (resolve, reject) {
var connectionString = self.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(config)
, connection = new self.lib.Client(connectionString)
, responded = false;
connection.connect(function(err) {
if (err) {
if (err.code) {
switch (err.code) {
case 'ECONNREFUSED':
reject(new Error('Failed to authenticate for PostgresSQL. Please double check your settings.'));
break;
case 'ENOTFOUND':
case 'EHOSTUNREACH':
case 'EINVAL':
reject(new Error('Failed to find PostgresSQL server. Please double check your settings.'));
break;
default:
reject(err);
break;
}
} else {
reject(new Error(err.message));
}
return;
}
responded = true;
resolve(connection);
});
// If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
connection.on('end', function () {
if (!responded) {
reject(new Error('Connection timed out'));
}
});
}).tap(function (connection) {
if (self.sequelize.config.keepDefaultTimezone) return;
return new Promise(function (resolve, reject) {
connection.query("SET TIME ZONE 'UTC'").on('error', function (err) {
reject(err);
}).on('end', function () {
resolve();
});
});
});
};
ConnectionManager.prototype.disconnect = function(connection) {
return new Promise(function (resolve, reject) {
connection.end();
resolve();
});
};
module.exports = ConnectionManager;
\ No newline at end of file
'use strict';
var Query = require('./query')
, Utils = require('../../utils');
module.exports = (function() {
var ConnectorManager = function(sequelize, config) {
var pgModule = config.dialectModulePath || 'pg';
this.sequelize = sequelize;
this.client = null;
this.config = config || {};
this.config.port = this.config.port || 5432;
this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0));
this.pg = this.config.native ? require(pgModule).native : require(pgModule);
// Better support for BigInts
// https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
this.pg.types.setTypeParser(20, String);
this.disconnectTimeoutId = null;
this.pendingQueries = 0;
this.clientDrained = true;
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50);
this.ConnectionParameters = require(pgModule + '/lib/connection-parameters');
this.onProcessExit = function() {
this.disconnect();
}.bind(this);
process.on('exit', this.onProcessExit);
};
Utils._.extend(ConnectorManager.prototype, require('../abstract/connector-manager').prototype);
ConnectorManager.prototype.endQuery = function() {
var self = this;
self.pendingQueries--;
if (!self.pooling && self.pendingQueries === 0) {
setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self);
}, 100);
}
};
ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this;
self.pendingQueries++;
self.clientDrained = false;
return self.connect().then(function(done) {
var query = new Query(self.client, self.sequelize, callee, options || {});
// We return the query regardless of error or success in the query
return query.run(sql).finally (function() {
self.endQuery.call(self);
done && done();
});
});
};
ConnectorManager.prototype.afterTransactionSetup = function(callback) {
this.setTimezone(this.client, 'UTC', callback);
};
ConnectorManager.prototype.connect = function(callback) {
var self = this;
return new Utils.Promise(function(resolve, reject) {
// in case database is slow to connect, prevent orphaning the client
// TODO: We really need some sort of queue/flush/drain mechanism
if (this.isConnecting && !this.pooling && this.client === null) {
return resolve();
}
this.isConnecting = true;
this.isConnected = false;
var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
, config = new this.ConnectionParameters(uri);
// set pooling parameters if specified
if (this.pooling) {
config.poolSize = this.config.pool.maxConnections || 10;
config.poolIdleTimeout = this.config.pool.maxIdleTime || 30000;
config.reapIntervalMillis = this.config.pool.reapInterval || 1000;
config.uuid = this.config.uuid;
}
var connectCallback = function(err, client, done) {
var timezoneCallback = function() {
self.isConnected = true;
self.client = client;
resolve(done);
};
self.isConnecting = false;
if (!!err) {
// release the pool immediately, very important.
done && done(err);
self.client = null;
if (err.code) {
switch (err.code) {
case 'ECONNREFUSED':
reject(new Error('Failed to authenticate for PostgresSQL. Please double check your settings.'));
break;
case 'ENOTFOUND':
case 'EHOSTUNREACH':
case 'EINVAL':
reject(new Error('Failed to find PostgresSQL server. Please double check your settings.'));
break;
default:
reject(err);
break;
}
} else {
reject(new Error(err.message));
}
} else if (client) {
if (self.config.keepDefaultTimezone) {
timezoneCallback();
} else {
self.setTimezone(client, 'UTC', timezoneCallback);
}
} else if (self.config.native) {
if (self.config.keepDefaultTimezone) {
timezoneCallback();
} else {
self.setTimezone(self.client, 'UTC', timezoneCallback);
}
} else {
done && done();
self.client = null;
resolve();
}
};
if (this.pooling) {
// acquire client from pool
this.pg.connect(config, connectCallback);
} else {
if (!!this.client) {
connectCallback(null, this.client);
} else {
//create one-off client
var responded = false;
this.client = new this.pg.Client(config);
this.client.connect(function(err, client, done) {
responded = true;
connectCallback(err, client || self.client, done);
});
// If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
this.client.on('end', function() {
if (!responded) {
connectCallback(new Error('Connection timed out'));
}
});
// Closes a client correctly even if we have backed up queries
// https://github.com/brianc/node-postgres/pull/346
this.client.on('drain', function() {
self.clientDrained = true;
});
}
}
}.bind(this));
};
ConnectorManager.prototype.setTimezone = function(client, timezone, callback) {
client.query("SET TIME ZONE '" + (timezone ||  'UTC') + "'").on('error', function (err) {
callback(err);
}).on('end', function () {
callback();
});
};
ConnectorManager.prototype.disconnect = function() {
if (this.client) {
if (this.clientDrained) {
this.client.end();
}
this.client = null;
}
this.isConnecting = false;
this.isConnected = false;
};
return ConnectorManager;
})();
'use strict'; 'use strict';
var _ = require('lodash') var _ = require('lodash')
, Abstract = require('../abstract'); , Abstract = require('../abstract')
, ConnectionManager = require('./connection-manager')
, Query = require('./query');
var PostgresDialect = function(sequelize) { var PostgresDialect = function(sequelize) {
this.sequelize = sequelize; this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
}; };
PostgresDialect.prototype.supports = _.defaults({ PostgresDialect.prototype.supports = _.defaults({
...@@ -15,4 +18,6 @@ PostgresDialect.prototype.supports = _.defaults({ ...@@ -15,4 +18,6 @@ PostgresDialect.prototype.supports = _.defaults({
forShare: 'FOR SHARE' forShare: 'FOR SHARE'
}, Abstract.prototype.supports); }, Abstract.prototype.supports);
module.exports = PostgresDialect; PostgresDialect.prototype.Query = Query;
module.exports = PostgresDialect;
\ No newline at end of file
"use strict";
var AbstractConnectionManager = require('../abstract/connection-manager')
, ConnectionManager
, Utils = require('../../utils')
, Promise = require('../../promise');
ConnectionManager = function(dialect, sequelize) {
this.sequelize = sequelize;
this.config = sequelize.config;
this.dialect = dialect;
this.connections = {};
try {
this.lib = require(sequelize.config.dialectModulePath || 'sqlite3').verbose();
} catch (err) {
throw new Error('Please install sqlite3 package manually');
}
};
Utils._.extend(ConnectionManager.prototype, AbstractConnectionManager.prototype);
ConnectionManager.prototype.getConnection = function(options) {
var self = this;
options = options || {};
options.uuid = options.uuid || 'default';
if (self.connections[options.uuid]) return Promise.resolve(self.connections[options.uuid]);
return new Promise(function (resolve, reject) {
self.connections[options.uuid] = new self.lib.Database(self.sequelize.options.storage || ':memory:', function(err) {
if (err) {
if (err.code === 'SQLITE_CANTOPEN') return reject('Failed to find SQL server. Please double check your settings.');
return reject(err);
}
resolve(self.connections[options.uuid]);
});
}).tap(function (connection) {
if (self.sequelize.options.foreignKeys !== false) {
// Make it possible to define and use foreign key constraints unless
// explicitly disallowed. It's still opt-in per relation
connection.run('PRAGMA FOREIGN_KEYS=ON');
}
});
};
ConnectionManager.prototype.releaseConnection = function(connection) {
if (connection.uuid) {
connection.close();
}
};
module.exports = ConnectionManager;
\ No newline at end of file
'use strict';
var sqlite3
, Utils = require('../../utils')
, Query = require('./query');
module.exports = (function() {
var ConnectorManager = function(sequelize, config) {
this.sequelize = sequelize;
this.config = config;
if (config.dialectModulePath) {
sqlite3 = require(config.dialectModulePath).verbose();
} else {
sqlite3 = require('sqlite3').verbose();
}
};
Utils._.extend(ConnectorManager.prototype, require('../abstract/connector-manager').prototype);
ConnectorManager.prototype.connect = function() {
var emitter = new (require('events').EventEmitter)()
, self = this
, db;
this.database = db = new sqlite3.Database(self.sequelize.options.storage || ':memory:', function(err) {
if (err) {
if (err.code === 'SQLITE_CANTOPEN') {
emitter.emit('error', 'Failed to find SQL server. Please double check your settings.');
}
}
if (!err && self.sequelize.options.foreignKeys !== false) {
// Make it possible to define and use foreign key constraints unless
// explicitly disallowed. It's still opt-in per relation
db.run('PRAGMA FOREIGN_KEYS=ON');
}
});
};
ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.database) {
this.connect();
}
return new Query(this.database, this.sequelize, callee, options).run(sql);
};
return ConnectorManager;
})();
'use strict'; 'use strict';
var _ = require('lodash') var _ = require('lodash')
, Abstract = require('../abstract'); , Abstract = require('../abstract')
, ConnectionManager = require('./connection-manager')
, Query = require('./query');
var SqliteDialect = function(sequelize) { var SqliteDialect = function(sequelize) {
this.sequelize = sequelize; this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
}; };
SqliteDialect.prototype.supports = _.defaults({ SqliteDialect.prototype.supports = _.defaults({
...@@ -12,4 +15,7 @@ SqliteDialect.prototype.supports = _.defaults({ ...@@ -12,4 +15,7 @@ SqliteDialect.prototype.supports = _.defaults({
'DEFAULT VALUES': true 'DEFAULT VALUES': true
}, Abstract.prototype.supports); }, Abstract.prototype.supports);
module.exports = SqliteDialect; SqliteDialect.prototype.Query = Query;
SqliteDialect.prototype.name = 'sqlite';
module.exports = SqliteDialect;
\ No newline at end of file
...@@ -8,7 +8,6 @@ var url = require('url') ...@@ -8,7 +8,6 @@ var url = require('url')
, ModelManager = require('./model-manager') , ModelManager = require('./model-manager')
, QueryInterface = require('./query-interface') , QueryInterface = require('./query-interface')
, Transaction = require('./transaction') , Transaction = require('./transaction')
, TransactionManager = require('./transaction-manager')
, QueryTypes = require('./query-types') , QueryTypes = require('./query-types')
, sequelizeErrors = require('./errors') , sequelizeErrors = require('./errors')
, Promise = require('./promise'); , Promise = require('./promise');
...@@ -123,7 +122,6 @@ module.exports = (function() { ...@@ -123,7 +122,6 @@ module.exports = (function() {
sync: {}, sync: {},
logging: console.log, logging: console.log,
omitNull: false, omitNull: false,
queue: true,
native: false, native: false,
replication: false, replication: false,
ssl: undefined, ssl: undefined,
...@@ -162,7 +160,7 @@ module.exports = (function() { ...@@ -162,7 +160,7 @@ module.exports = (function() {
var Dialect = require('./dialects/' + this.getDialect()); var Dialect = require('./dialects/' + this.getDialect());
this.dialect = new Dialect(this); this.dialect = new Dialect(this);
} catch (err) { } catch (err) {
throw new Error('The dialect ' + this.getDialect() + ' is not supported.'); throw new Error('The dialect ' + this.getDialect() + ' is not supported. ('+err+')');
} }
/** /**
...@@ -171,7 +169,7 @@ module.exports = (function() { ...@@ -171,7 +169,7 @@ module.exports = (function() {
*/ */
this.models = {}; this.models = {};
this.modelManager = this.daoFactoryManager = new ModelManager(this); this.modelManager = this.daoFactoryManager = new ModelManager(this);
this.transactionManager = new TransactionManager(this); this.connectionManager = this.dialect.connectionManager;
this.importCache = {}; this.importCache = {};
}; };
...@@ -467,6 +465,8 @@ module.exports = (function() { ...@@ -467,6 +465,8 @@ module.exports = (function() {
* @see {Model#build} for more information about callee. * @see {Model#build} for more information about callee.
*/ */
Sequelize.prototype.query = function(sql, callee, options, replacements) { Sequelize.prototype.query = function(sql, callee, options, replacements) {
var self = this;
if (arguments.length === 4) { if (arguments.length === 4) {
if (Array.isArray(replacements)) { if (Array.isArray(replacements)) {
sql = Utils.format([sql].concat(replacements), this.options.dialect); sql = Utils.format([sql].concat(replacements), this.options.dialect);
...@@ -488,7 +488,15 @@ module.exports = (function() { ...@@ -488,7 +488,15 @@ module.exports = (function() {
type: (sql.toLowerCase().indexOf('select') === 0) ? QueryTypes.SELECT : false type: (sql.toLowerCase().indexOf('select') === 0) ? QueryTypes.SELECT : false
}); });
return this.transactionManager.query(sql, callee, options); return Promise.resolve(
options.transaction ? options.transaction.connection : self.connectionManager.getConnection()
).then(function (connection) {
var query = new self.dialect.Query(connection, self, callee, options);
return query.run(sql).finally(function() {
if (options.transaction) return;
return self.connectionManager.releaseConnection(connection);
});
});
}; };
/** /**
......
'use strict';
var Utils = require('./utils');
var TransactionManager = module.exports = function(sequelize) {
this.sequelize = sequelize;
this.connectorManagers = {};
try {
this.ConnectorManager = require('./dialects/' + sequelize.getDialect() + '/connector-manager');
} catch (err) {
throw new Error('The dialect ' + sequelize.getDialect() + ' is not supported.');
}
};
TransactionManager.prototype.getConnectorManager = function(uuid) {
uuid = uuid || 'default';
if (!this.connectorManagers.hasOwnProperty(uuid)) {
var config = Utils._.extend({ uuid: uuid }, this.sequelize.config);
if (uuid !== 'default') {
config.pool = Utils._.extend(
{},
Utils._.clone(config.pool || {}),
{
minConnections: 1,
maxConnections: 1,
useReplicaton: false
}
);
config.keepDefaultTimezone = true;
}
this.connectorManagers[uuid] = new this.ConnectorManager(this.sequelize, config);
}
return this.connectorManagers[uuid];
};
TransactionManager.prototype.releaseConnectionManager = function(uuid) {
this.connectorManagers[uuid].cleanup();
delete this.connectorManagers[uuid];
};
TransactionManager.prototype.query = function(sql, callee, options) {
options = options || {};
options.uuid = 'default';
if (options.transaction) {
options.uuid = options.transaction.id;
}
return this.getConnectorManager(options.uuid).query(sql, callee, options);
};
...@@ -87,37 +87,36 @@ Transaction.prototype.rollback = function() { ...@@ -87,37 +87,36 @@ Transaction.prototype.rollback = function() {
}; };
Transaction.prototype.prepareEnvironment = function() { Transaction.prototype.prepareEnvironment = function() {
var self = this var self = this;
, connectorManager = self.sequelize.transactionManager.getConnectorManager(this.id);
return this.begin().then(function () { return this.sequelize.connectionManager.getConnection({
uuid: self.id
}).then(function (connection) {
self.connection = connection;
self.connection.uuid = self.id;
}).then(function () {
return self.begin();
}).then(function () {
return self.setIsolationLevel(); return self.setIsolationLevel();
}).then(function () { }).then(function () {
return self.setAutocommit(); return self.setAutocommit();
}).then(function () {
return new Utils.Promise(function (resolve, reject) {
connectorManager.afterTransactionSetup(function (err, result) {
if (err) return reject(err);
return resolve(result);
});
});
}); });
}; };
Transaction.prototype.begin = function(callback) { Transaction.prototype.begin = function() {
return this return this
.sequelize .sequelize
.getQueryInterface() .getQueryInterface()
.startTransaction(this, {}); .startTransaction(this, {});
}; };
Transaction.prototype.setAutocommit = function(callback) { Transaction.prototype.setAutocommit = function() {
return this return this
.sequelize .sequelize
.getQueryInterface() .getQueryInterface()
.setAutocommit(this, this.options.autocommit); .setAutocommit(this, this.options.autocommit);
}; };
Transaction.prototype.setIsolationLevel = function(callback) { Transaction.prototype.setIsolationLevel = function() {
return this return this
.sequelize .sequelize
.getQueryInterface() .getQueryInterface()
...@@ -125,5 +124,5 @@ Transaction.prototype.setIsolationLevel = function(callback) { ...@@ -125,5 +124,5 @@ Transaction.prototype.setIsolationLevel = function(callback) {
}; };
Transaction.prototype.cleanup = function() { Transaction.prototype.cleanup = function() {
this.sequelize.transactionManager.releaseConnectionManager(this.id); return this.sequelize.connectionManager.releaseConnection(this.connection);
}; };
...@@ -74,7 +74,7 @@ describe(Support.getTestDialectTeaser("Configuration"), function() { ...@@ -74,7 +74,7 @@ describe(Support.getTestDialectTeaser("Configuration"), function() {
var sequelize = new Sequelize('dbname', 'root', 'pass', { var sequelize = new Sequelize('dbname', 'root', 'pass', {
dialect: dialect dialect: dialect
}) })
, config = sequelize.connectorManager.config , config = sequelize.config
, port , port
if (Support.dialectIsMySQL()) { if (Support.dialectIsMySQL()) {
......
...@@ -2024,6 +2024,52 @@ describe(Support.getTestDialectTeaser("DAOFactory"), function () { ...@@ -2024,6 +2024,52 @@ describe(Support.getTestDialectTeaser("DAOFactory"), function () {
}) })
}) })
if (dialect !== 'sqlite') {
it('supports multiple async transactions', function(done) {
this.timeout(25000);
Support.prepareTransactionTest(this.sequelize, function(sequelize) {
var User = sequelize.define('User', { username: Sequelize.STRING })
var testAsync = function(i, done) {
sequelize.transaction().then(function(t) {
return User.create({
username: 'foo'
}, {
transaction: t
}).then(function () {
return User.findAll({
where: {
username: "foo"
}
}).then(function (users) {
expect(users).to.have.length(0);
});
}).then(function () {
return User.findAll({
where: {
username: "foo"
},
transaction: t
}).then(function (users) {
expect(users).to.have.length(1);
});
}).then(function () {
return t;
});
}).then(function (t) {
return t.rollback();
}).nodeify(done);
}
User.sync({ force: true }).success(function() {
var tasks = []
for (var i = 0; i < 1000; i++) {
tasks.push(testAsync.bind(this, i))
};
async.parallelLimit(tasks, (sequelize.config.pool && sequelize.config.pool.max || 5) - 1, done); // Needs to be one less than 1 else the non transaction query won't ever get a connection
});
});
});
}
describe('Unique', function() { describe('Unique', function() {
it("should set unique when unique is true", function(done) { it("should set unique when unique is true", function(done) {
......
...@@ -91,6 +91,7 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () { ...@@ -91,6 +91,7 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () {
}) })
it('triggers the actual adapter error', function(done) { it('triggers the actual adapter error', function(done) {
this this
.sequelizeWithInvalidConnection .sequelizeWithInvalidConnection
.authenticate() .authenticate()
...@@ -98,16 +99,16 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () { ...@@ -98,16 +99,16 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () {
if (dialect === 'mariadb') { if (dialect === 'mariadb') {
expect(err.message).to.match(/Access denied for user/) expect(err.message).to.match(/Access denied for user/)
} else if (dialect === 'postgres') { } else if (dialect === 'postgres') {
// When the test is run with only it produces: expect(
// Error: Error: Failed to authenticate for PostgresSQL. Please double check your settings. err.message.match(/Failed to authenticate for PostgresSQL/) ||
// When its run with all the other tests it produces: err.message.match(/invalid port number/)
// Error: invalid port number: "99999" ).to.be.ok
expect(err.message).to.match(/invalid port number/)
} else { } else {
expect(err.message).to.match(/Failed to authenticate/) expect(err.message).to.match(/Failed to authenticate/)
} }
done() done()
}) })
}) })
}) })
......
...@@ -37,7 +37,7 @@ var Support = { ...@@ -37,7 +37,7 @@ var Support = {
if (dialect === 'sqlite') { if (dialect === 'sqlite') {
var options = Sequelize.Utils._.extend({}, sequelize.options, { storage: path.join(__dirname, 'tmp', 'db.sqlite') }) var options = Sequelize.Utils._.extend({}, sequelize.options, { storage: path.join(__dirname, 'tmp', 'db.sqlite') })
, _sequelize = new Sequelize(sequelize.config.datase, null, null, options) , _sequelize = new Sequelize(sequelize.config.database, null, null, options)
_sequelize.sync({ force: true }).success(function() { callback(_sequelize) }) _sequelize.sync({ force: true }).success(function() { callback(_sequelize) })
} else { } else {
......
var chai = require('chai')
, expect = chai.expect
, Support = require(__dirname + '/support')
, TransactionManager = require(__dirname + '/../lib/transaction-manager')
describe(Support.getTestDialectTeaser("TransactionManager"), function () {
beforeEach(function() {
this.transactionManager = new TransactionManager(this.sequelize)
})
describe('getConnectorManager', function() {
describe('if no uuid is passed', function() {
it('uses the default connector', function() {
var connectorManager = this.transactionManager.getConnectorManager()
expect(connectorManager.config.uuid).to.equal('default')
})
it('uses the pooling configuration of sequelize', function() {
var connectorManager = this.transactionManager.getConnectorManager()
, self = this
if (Support.getTestDialect() !== 'sqlite') {
expect(this.sequelize.config.pool.maxConnections).to.equal(5)
}
Object.keys(this.sequelize.config.pool || {}).forEach(function(key) {
expect(connectorManager.config.pool[key]).to.equal(self.sequelize.config.pool[key])
})
})
})
describe('if the passed uuid is not equal to "default"', function() {
it('uses the non-default connector', function() {
var connectorManager = this.transactionManager.getConnectorManager('a-uuid')
expect(connectorManager.config.uuid).to.equal('a-uuid')
})
it('creates a new connector manager', function() {
this.transactionManager.getConnectorManager()
expect(Object.keys(this.transactionManager.connectorManagers).length).to.equal(1)
this.transactionManager.getConnectorManager('a-uuid')
expect(Object.keys(this.transactionManager.connectorManagers).length).to.equal(2)
this.transactionManager.getConnectorManager('a-uuid')
expect(Object.keys(this.transactionManager.connectorManagers).length).to.equal(2)
})
it('treats the connector managers as singleton', function() {
var connectorManager1 = this.transactionManager.getConnectorManager('a-uuid')
, connectorManager2 = this.transactionManager.getConnectorManager('a-uuid')
expect(connectorManager1).to.equal(connectorManager2)
})
it('uses the pooling configuration of sequelize but with disabled replication and forced to one connection', function() {
var connectorManager = this.transactionManager.getConnectorManager('a-uuid')
, self = this
if (Support.getTestDialect() !== 'sqlite') {
expect(this.sequelize.config.pool.maxConnections).to.equal(5)
}
Object.keys(this.sequelize.config.pool || {}).forEach(function(key) {
if (['minConnections', 'maxConnections'].indexOf(key) === -1) {
expect(connectorManager.config.pool[key]).to.equal(self.sequelize.config.pool[key])
}
})
expect(connectorManager.config.pool.minConnections).to.equal(1)
expect(connectorManager.config.pool.maxConnections).to.equal(1)
})
})
})
})
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!