不要怂,就是干,撸起袖子干!

Commit 722ed505 by Andy Edwards Committed by GitHub

refactor(dialects/postgres): asyncify methods (#12129)

1 parent ceb0de26
...@@ -84,7 +84,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -84,7 +84,7 @@ class ConnectionManager extends AbstractConnectionManager {
return this.lib.types.getTypeParser(oid, ...args); return this.lib.types.getTypeParser(oid, ...args);
} }
connect(config) { async connect(config) {
config.user = config.username; config.user = config.username;
const connectionConfig = _.pick(config, [ const connectionConfig = _.pick(config, [
'user', 'password', 'host', 'database', 'port' 'user', 'password', 'host', 'database', 'port'
...@@ -121,7 +121,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -121,7 +121,7 @@ class ConnectionManager extends AbstractConnectionManager {
])); ]));
} }
return new Promise((resolve, reject) => { const connection = await new Promise((resolve, reject) => {
let responded = false; let responded = false;
const connection = new this.lib.Client(connectionConfig); const connection = new this.lib.Client(connectionConfig);
...@@ -194,7 +194,8 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -194,7 +194,8 @@ class ConnectionManager extends AbstractConnectionManager {
resolve(connection); resolve(connection);
} }
}); });
}).then(connection => { });
let query = ''; let query = '';
if (this.sequelize.options.standardConformingStrings !== false && connection['standard_conforming_strings'] !== 'on') { if (this.sequelize.options.standardConformingStrings !== false && connection['standard_conforming_strings'] !== 'on') {
...@@ -218,52 +219,46 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -218,52 +219,46 @@ class ConnectionManager extends AbstractConnectionManager {
} }
if (query) { if (query) {
return Promise.resolve(connection.query(query)).then(() => connection); await connection.query(query);
} }
return connection;
}).then(connection => {
if (Object.keys(this.nameOidMap).length === 0 && if (Object.keys(this.nameOidMap).length === 0 &&
this.enumOids.oids.length === 0 && this.enumOids.oids.length === 0 &&
this.enumOids.arrayOids.length === 0) { this.enumOids.arrayOids.length === 0) {
return Promise.resolve(this._refreshDynamicOIDs(connection)).then(() => connection); await this._refreshDynamicOIDs(connection);
} }
return connection;
}).then(connection => {
// Don't let a Postgres restart (or error) to take down the whole app // Don't let a Postgres restart (or error) to take down the whole app
connection.on('error', error => { connection.on('error', error => {
connection._invalid = true; connection._invalid = true;
debug(`connection error ${error.code || error.message}`); debug(`connection error ${error.code || error.message}`);
this.pool.destroy(connection); this.pool.destroy(connection);
}); });
return connection; return connection;
});
} }
disconnect(connection) { async disconnect(connection) {
if (connection._ending) { if (connection._ending) {
debug('connection tried to disconnect but was already at ENDING state'); debug('connection tried to disconnect but was already at ENDING state');
return Promise.resolve(); return;
} }
return promisify(callback => connection.end(callback))(); return await promisify(callback => connection.end(callback))();
} }
validate(connection) { validate(connection) {
return !connection._invalid && !connection._ending; return !connection._invalid && !connection._ending;
} }
_refreshDynamicOIDs(connection) { async _refreshDynamicOIDs(connection) {
const databaseVersion = this.sequelize.options.databaseVersion; const databaseVersion = this.sequelize.options.databaseVersion;
const supportedVersion = '8.3.0'; const supportedVersion = '8.3.0';
// Check for supported version // Check for supported version
if ( (databaseVersion && semver.gte(databaseVersion, supportedVersion)) === false) { if ( (databaseVersion && semver.gte(databaseVersion, supportedVersion)) === false) {
return Promise.resolve(); return;
} }
// Refresh dynamic OIDs for some types const results = await (connection || this.sequelize).query(
// These include Geometry / Geography / HStore / Enum / Citext / Range
return (connection || this.sequelize).query(
'WITH ranges AS (' + 'WITH ranges AS (' +
' SELECT pg_range.rngtypid, pg_type.typname AS rngtypname,' + ' SELECT pg_range.rngtypid, pg_type.typname AS rngtypname,' +
' pg_type.typarray AS rngtyparray, pg_range.rngsubtype' + ' pg_type.typarray AS rngtyparray, pg_range.rngsubtype' +
...@@ -273,7 +268,8 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -273,7 +268,8 @@ class ConnectionManager extends AbstractConnectionManager {
' ranges.rngtypname, ranges.rngtypid, ranges.rngtyparray' + ' ranges.rngtypname, ranges.rngtypid, ranges.rngtyparray' +
' FROM pg_type LEFT OUTER JOIN ranges ON pg_type.oid = ranges.rngsubtype' + ' FROM pg_type LEFT OUTER JOIN ranges ON pg_type.oid = ranges.rngsubtype' +
' WHERE (pg_type.typtype IN(\'b\', \'e\'));' ' WHERE (pg_type.typtype IN(\'b\', \'e\'));'
).then(results => { );
let result = Array.isArray(results) ? results.pop() : results; let result = Array.isArray(results) ? results.pop() : results;
// When searchPath is prepended then two statements are executed and the result is // When searchPath is prepended then two statements are executed and the result is
...@@ -312,7 +308,6 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -312,7 +308,6 @@ class ConnectionManager extends AbstractConnectionManager {
this.enumOids = newEnumOids; this.enumOids = newEnumOids;
this.refreshTypeParser(dataTypes.postgres); this.refreshTypeParser(dataTypes.postgres);
});
} }
_clearDynamicOIDs() { _clearDynamicOIDs() {
......
...@@ -26,7 +26,7 @@ const _ = require('lodash'); ...@@ -26,7 +26,7 @@ const _ = require('lodash');
* @returns {Promise} * @returns {Promise}
* @private * @private
*/ */
function ensureEnums(qi, tableName, attributes, options, model) { async function ensureEnums(qi, tableName, attributes, options, model) {
const keys = Object.keys(attributes); const keys = Object.keys(attributes);
const keyLen = keys.length; const keyLen = keys.length;
...@@ -50,7 +50,7 @@ function ensureEnums(qi, tableName, attributes, options, model) { ...@@ -50,7 +50,7 @@ function ensureEnums(qi, tableName, attributes, options, model) {
} }
} }
return Promise.all(promises).then(results => { const results = await Promise.all(promises);
promises = []; promises = [];
let enumIdx = 0; let enumIdx = 0;
...@@ -142,16 +142,14 @@ function ensureEnums(qi, tableName, attributes, options, model) { ...@@ -142,16 +142,14 @@ function ensureEnums(qi, tableName, attributes, options, model) {
} }
} }
return promises const result = await promises
.reduce((promise, asyncFunction) => promise.then(asyncFunction), Promise.resolve()) .reduce(async (promise, asyncFunction) => await asyncFunction(await promise), Promise.resolve());
.then(result => {
// If ENUM processed, then refresh OIDs // If ENUM processed, then refresh OIDs
if (promises.length) { if (promises.length) {
return Promise.resolve(qi.sequelize.dialect.connectionManager._refreshDynamicOIDs()).then(() => result); await qi.sequelize.dialect.connectionManager._refreshDynamicOIDs();
} }
return result; return result;
});
});
} }
......
...@@ -47,7 +47,7 @@ class Query extends AbstractQuery { ...@@ -47,7 +47,7 @@ class Query extends AbstractQuery {
return [sql, bindParam]; return [sql, bindParam];
} }
run(sql, parameters) { async run(sql, parameters) {
const { connection } = this; const { connection } = this;
if (!_.isEmpty(this.options.searchPath)) { if (!_.isEmpty(this.options.searchPath)) {
...@@ -73,7 +73,11 @@ class Query extends AbstractQuery { ...@@ -73,7 +73,11 @@ class Query extends AbstractQuery {
const complete = this._logQuery(sql, debug, parameters); const complete = this._logQuery(sql, debug, parameters);
return query.catch(err => { let queryResult;
try {
queryResult = await query;
} catch (err) {
// set the client so that it will be reaped if the connection resets while executing // set the client so that it will be reaped if the connection resets while executing
if (err.code === 'ECONNRESET') { if (err.code === 'ECONNRESET') {
connection._invalid = true; connection._invalid = true;
...@@ -82,8 +86,8 @@ class Query extends AbstractQuery { ...@@ -82,8 +86,8 @@ class Query extends AbstractQuery {
err.sql = sql; err.sql = sql;
err.parameters = parameters; err.parameters = parameters;
throw this.formatError(err); throw this.formatError(err);
}) }
.then(queryResult => {
complete(); complete();
let rows = Array.isArray(queryResult) let rows = Array.isArray(queryResult)
...@@ -282,7 +286,6 @@ class Query extends AbstractQuery { ...@@ -282,7 +286,6 @@ class Query extends AbstractQuery {
return [rows, queryResult]; return [rows, queryResult];
} }
return rows; return rows;
});
} }
formatError(err) { formatError(err) {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!