不要怂,就是干,撸起袖子干!

Commit ceb0de26 by Andy Edwards Committed by GitHub

refactor(dialects/abstract): asyncify methods (#12128)

1 parent 92cbae90
Showing with 69 additions and 75 deletions
...@@ -91,15 +91,15 @@ class ConnectionManager { ...@@ -91,15 +91,15 @@ class ConnectionManager {
* @private * @private
* @returns {Promise} * @returns {Promise}
*/ */
_onProcessExit() { async _onProcessExit() {
if (!this.pool) { if (!this.pool) {
return Promise.resolve(); return;
} }
return this.pool.drain().then(() => { await this.pool.drain();
debug('connection drain due to process exit'); debug('connection drain due to process exit');
return this.pool.destroyAllNow();
}); return await this.pool.destroyAllNow();
} }
/** /**
...@@ -107,13 +107,13 @@ class ConnectionManager { ...@@ -107,13 +107,13 @@ class ConnectionManager {
* *
* @returns {Promise} * @returns {Promise}
*/ */
close() { async close() {
// Mark close of pool // Mark close of pool
this.getConnection = function getConnection() { this.getConnection = async function getConnection() {
return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!')); throw new Error('ConnectionManager.getConnection was called after the connection manager was closed!');
}; };
return this._onProcessExit(); return await this._onProcessExit();
} }
/** /**
...@@ -127,11 +127,10 @@ class ConnectionManager { ...@@ -127,11 +127,10 @@ class ConnectionManager {
this.pool = new Pool({ this.pool = new Pool({
name: 'sequelize', name: 'sequelize',
create: () => this._connect(config), create: () => this._connect(config),
destroy: connection => { destroy: async connection => {
return this._disconnect(connection) const result = await this._disconnect(connection);
.then(result => { debug('connection destroy');
debug('connection destroy');return result; return result;
});
}, },
validate: config.pool.validate, validate: config.pool.validate,
max: config.pool.max, max: config.pool.max,
...@@ -180,27 +179,26 @@ class ConnectionManager { ...@@ -180,27 +179,26 @@ class ConnectionManager {
this.pool[connection.queryType].destroy(connection); this.pool[connection.queryType].destroy(connection);
debug('connection destroy'); debug('connection destroy');
}, },
destroyAllNow: () => { destroyAllNow: async () => {
return Promise.all([ await Promise.all([
this.pool.read.destroyAllNow(), this.pool.read.destroyAllNow(),
this.pool.write.destroyAllNow() this.pool.write.destroyAllNow()
]).then(() => { debug('all connections destroyed'); });
},
drain: () => {
return Promise.all([
this.pool.write.drain(),
this.pool.read.drain()
]); ]);
debug('all connections destroyed');
}, },
drain: async () => Promise.all([
this.pool.write.drain(),
this.pool.read.drain()
]),
read: new Pool({ read: new Pool({
name: 'sequelize:read', name: 'sequelize:read',
create: () => { create: async () => {
// round robin config // round robin config
const nextRead = reads++ % config.replication.read.length; const nextRead = reads++ % config.replication.read.length;
return this._connect(config.replication.read[nextRead]).then(connection => { const connection = await this._connect(config.replication.read[nextRead]);
connection.queryType = 'read'; connection.queryType = 'read';
return connection; return connection;
});
}, },
destroy: connection => this._disconnect(connection), destroy: connection => this._disconnect(connection),
validate: config.pool.validate, validate: config.pool.validate,
...@@ -213,11 +211,10 @@ class ConnectionManager { ...@@ -213,11 +211,10 @@ class ConnectionManager {
}), }),
write: new Pool({ write: new Pool({
name: 'sequelize:write', name: 'sequelize:write',
create: () => { create: async () => {
return this._connect(config.replication.write).then(connection => { const connection = await this._connect(config.replication.write);
connection.queryType = 'write'; connection.queryType = 'write';
return connection; return connection;
});
}, },
destroy: connection => this._disconnect(connection), destroy: connection => this._disconnect(connection),
validate: config.pool.validate, validate: config.pool.validate,
...@@ -243,16 +240,14 @@ class ConnectionManager { ...@@ -243,16 +240,14 @@ class ConnectionManager {
* *
* @returns {Promise<Connection>} * @returns {Promise<Connection>}
*/ */
getConnection(options) { async getConnection(options) {
options = options || {}; options = options || {};
let promise;
if (this.sequelize.options.databaseVersion === 0) { if (this.sequelize.options.databaseVersion === 0) {
if (this.versionPromise) { if (!this.versionPromise) {
promise = this.versionPromise; this.versionPromise = (async () => {
} else { try {
promise = this.versionPromise = this._connect(this.config.replication.write || this.config) const connection = await this._connect(this.config.replication.write || this.config);
.then(connection => {
const _options = {}; const _options = {};
_options.transaction = { connection }; // Cheat .query to use our private connection _options.transaction = { connection }; // Cheat .query to use our private connection
...@@ -262,36 +257,36 @@ class ConnectionManager { ...@@ -262,36 +257,36 @@ class ConnectionManager {
//connection might have set databaseVersion value at initialization, //connection might have set databaseVersion value at initialization,
//avoiding a useless round trip //avoiding a useless round trip
if (this.sequelize.options.databaseVersion === 0) { if (this.sequelize.options.databaseVersion === 0) {
return this.sequelize.databaseVersion(_options).then(version => { const version = await this.sequelize.databaseVersion(_options);
const parsedVersion = _.get(semver.coerce(version), 'version') || version; const parsedVersion = _.get(semver.coerce(version), 'version') || version;
this.sequelize.options.databaseVersion = semver.valid(parsedVersion) this.sequelize.options.databaseVersion = semver.valid(parsedVersion)
? parsedVersion ? parsedVersion
: this.defaultVersion; : this.defaultVersion;
this.versionPromise = null;
return this._disconnect(connection);
});
} }
this.versionPromise = null; this.versionPromise = null;
return this._disconnect(connection); return await this._disconnect(connection);
}).catch(err => { } catch (err) {
this.versionPromise = null; this.versionPromise = null;
throw err; throw err;
}); }
})();
} }
} else { await this.versionPromise;
promise = Promise.resolve();
} }
return promise.then(() => { let result;
return this.pool.acquire(options.type, options.useMaster)
.catch(error => { try {
if (error instanceof TimeoutError) throw new errors.ConnectionAcquireTimeoutError(error); result = await this.pool.acquire(options.type, options.useMaster);
throw error; } catch (error) {
}); if (error instanceof TimeoutError) throw new errors.ConnectionAcquireTimeoutError(error);
}).then(result => { throw error;
debug('connection acquired');return result; }
});
debug('connection acquired');
return result;
} }
/** /**
...@@ -301,11 +296,9 @@ class ConnectionManager { ...@@ -301,11 +296,9 @@ class ConnectionManager {
* *
* @returns {Promise} * @returns {Promise}
*/ */
releaseConnection(connection) { async releaseConnection(connection) {
return Promise.resolve().then(() => { this.pool.release(connection);
this.pool.release(connection); debug('connection released');
debug('connection released');
});
} }
/** /**
...@@ -315,10 +308,11 @@ class ConnectionManager { ...@@ -315,10 +308,11 @@ class ConnectionManager {
* @private * @private
* @returns {Promise<Connection>} * @returns {Promise<Connection>}
*/ */
_connect(config) { async _connect(config) {
return this.sequelize.runHooks('beforeConnect', config) await this.sequelize.runHooks('beforeConnect', config);
.then(() => this.dialect.connectionManager.connect(config)) const connection = await this.dialect.connectionManager.connect(config);
.then(connection => this.sequelize.runHooks('afterConnect', connection, config).then(() => connection)); await this.sequelize.runHooks('afterConnect', connection, config);
return connection;
} }
/** /**
...@@ -328,10 +322,10 @@ class ConnectionManager { ...@@ -328,10 +322,10 @@ class ConnectionManager {
* @private * @private
* @returns {Promise} * @returns {Promise}
*/ */
_disconnect(connection) { async _disconnect(connection) {
return this.sequelize.runHooks('beforeDisconnect', connection) await this.sequelize.runHooks('beforeDisconnect', connection);
.then(() => this.dialect.connectionManager.disconnect(connection)) await this.dialect.connectionManager.disconnect(connection);
.then(() => this.sequelize.runHooks('afterDisconnect', connection)); return this.sequelize.runHooks('afterDisconnect', connection);
} }
/** /**
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!