不要怂,就是干,撸起袖子干!

Commit 1432cfd1 by Andy Edwards Committed by GitHub

refactor(dialects/mysql): asyncify methods (#12130)

1 parent 722ed505
...@@ -54,7 +54,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -54,7 +54,7 @@ class ConnectionManager extends AbstractConnectionManager {
* @returns {Promise<Connection>} * @returns {Promise<Connection>}
* @private * @private
*/ */
connect(config) { async connect(config) {
const connectionConfig = Object.assign({ const connectionConfig = Object.assign({
host: config.host, host: config.host,
port: config.port, port: config.port,
...@@ -68,84 +68,77 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -68,84 +68,77 @@ class ConnectionManager extends AbstractConnectionManager {
supportBigNumbers: true supportBigNumbers: true
}, config.dialectOptions); }, config.dialectOptions);
return new Promise((resolve, reject) => { try {
const connection = this.lib.createConnection(connectionConfig); const connection = await new Promise((resolve, reject) => {
const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => {
// clean up connect & error event if there is error const errorHandler = e => {
connection.removeListener('connect', connectHandler); // clean up connect & error event if there is error
connection.removeListener('error', connectHandler); connection.removeListener('connect', connectHandler);
reject(e); connection.removeListener('error', connectHandler);
}; reject(e);
};
const connectHandler = () => {
// clean up error event if connected const connectHandler = () => {
connection.removeListener('error', errorHandler); // clean up error event if connected
resolve(connection); connection.removeListener('error', errorHandler);
};
// don't use connection.once for error event handling here
// mysql2 emit error two times in case handshake was failed
// first error is protocol_lost and second is timeout
// if we will use `once.error` node process will crash on 2nd error emit
connection.on('error', errorHandler);
connection.once('connect', connectHandler);
})
.then(result => {
debug('connection acquired');return result;
})
.then(connection => {
connection.on('error', error => {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
case 'EPIPE':
case 'PROTOCOL_CONNECTION_LOST':
this.pool.destroy(connection);
}
});
return new Promise((resolve, reject) => {
if (!this.sequelize.config.keepDefaultTimezone) {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
let tzOffset = this.sequelize.options.timezone;
tzOffset = /\//.test(tzOffset) ? momentTz.tz(tzOffset).format('Z') : tzOffset;
return connection.query(`SET time_zone = '${tzOffset}'`, err => {
if (err) { reject(err); } else { resolve(connection); }
});
}
// return connection without executing SET time_zone query
resolve(connection); resolve(connection);
}); };
})
.catch(err => { // don't use connection.once for error event handling here
switch (err.code) { // mysql2 emit error two times in case handshake was failed
case 'ECONNREFUSED': // first error is protocol_lost and second is timeout
throw new SequelizeErrors.ConnectionRefusedError(err); // if we will use `once.error` node process will crash on 2nd error emit
case 'ER_ACCESS_DENIED_ERROR': connection.on('error', errorHandler);
throw new SequelizeErrors.AccessDeniedError(err); connection.once('connect', connectHandler);
case 'ENOTFOUND': });
throw new SequelizeErrors.HostNotFoundError(err);
case 'EHOSTUNREACH': debug('connection acquired');
throw new SequelizeErrors.HostNotReachableError(err); connection.on('error', error => {
case 'EINVAL': switch (error.code) {
throw new SequelizeErrors.InvalidConnectionError(err); case 'ESOCKET':
default: case 'ECONNRESET':
throw new SequelizeErrors.ConnectionError(err); case 'EPIPE':
case 'PROTOCOL_CONNECTION_LOST':
this.pool.destroy(connection);
} }
}); });
if (!this.sequelize.config.keepDefaultTimezone) {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
let tzOffset = this.sequelize.options.timezone;
tzOffset = /\//.test(tzOffset) ? momentTz.tz(tzOffset).format('Z') : tzOffset;
await promisify(cb => connection.query(`SET time_zone = '${tzOffset}'`, cb))();
}
return connection;
} catch (err) {
switch (err.code) {
case 'ECONNREFUSED':
throw new SequelizeErrors.ConnectionRefusedError(err);
case 'ER_ACCESS_DENIED_ERROR':
throw new SequelizeErrors.AccessDeniedError(err);
case 'ENOTFOUND':
throw new SequelizeErrors.HostNotFoundError(err);
case 'EHOSTUNREACH':
throw new SequelizeErrors.HostNotReachableError(err);
case 'EINVAL':
throw new SequelizeErrors.InvalidConnectionError(err);
default:
throw new SequelizeErrors.ConnectionError(err);
}
}
} }
disconnect(connection) { async disconnect(connection) {
// Don't disconnect connections with CLOSED state // Don't disconnect connections with CLOSED state
if (connection._closing) { if (connection._closing) {
debug('connection tried to disconnect but was already at CLOSED state'); debug('connection tried to disconnect but was already at CLOSED state');
return Promise.resolve(); return;
} }
return promisify(callback => connection.end(callback))(); return await promisify(callback => connection.end(callback))();
} }
validate(connection) { validate(connection) {
......
...@@ -21,31 +21,29 @@ const sequelizeErrors = require('../../errors'); ...@@ -21,31 +21,29 @@ const sequelizeErrors = require('../../errors');
@private @private
*/ */
function removeColumn(qi, tableName, columnName, options) { async function removeColumn(qi, tableName, columnName, options) {
options = options || {}; options = options || {};
return qi.sequelize.query( const [results] = await qi.sequelize.query(
qi.QueryGenerator.getForeignKeyQuery(tableName.tableName ? tableName : { qi.QueryGenerator.getForeignKeyQuery(tableName.tableName ? tableName : {
tableName, tableName,
schema: qi.sequelize.config.database schema: qi.sequelize.config.database
}, columnName), }, columnName),
Object.assign({ raw: true }, options) Object.assign({ raw: true }, options)
) );
.then(([results]) => {
//Exclude primary key constraint //Exclude primary key constraint
if (!results.length || results[0].constraint_name === 'PRIMARY') { if (results.length && results[0].constraint_name !== 'PRIMARY') {
// No foreign key constraints found, so we can remove the column await Promise.all(results.map(constraint => qi.sequelize.query(
return; qi.QueryGenerator.dropForeignKeyQuery(tableName, constraint.constraint_name),
}
return Promise.all(results.map(constraint => qi.sequelize.query(
qi.QueryGenerator.dropForeignKeyQuery(tableName, constraint.constraint_name),
Object.assign({ raw: true }, options)
)));
})
.then(() => qi.sequelize.query(
qi.QueryGenerator.removeColumnQuery(tableName, columnName),
Object.assign({ raw: true }, options) Object.assign({ raw: true }, options)
)); )));
}
return await qi.sequelize.query(
qi.QueryGenerator.removeColumnQuery(tableName, columnName),
Object.assign({ raw: true }, options)
);
} }
/** /**
...@@ -56,35 +54,34 @@ function removeColumn(qi, tableName, columnName, options) { ...@@ -56,35 +54,34 @@ function removeColumn(qi, tableName, columnName, options) {
* *
* @private * @private
*/ */
function removeConstraint(qi, tableName, constraintName, options) { async function removeConstraint(qi, tableName, constraintName, options) {
const sql = qi.QueryGenerator.showConstraintsQuery( const sql = qi.QueryGenerator.showConstraintsQuery(
tableName.tableName ? tableName : { tableName.tableName ? tableName : {
tableName, tableName,
schema: qi.sequelize.config.database schema: qi.sequelize.config.database
}, constraintName); }, constraintName);
return qi.sequelize.query(sql, Object.assign({}, options, const constraints = await qi.sequelize.query(sql, Object.assign({}, options,
{ type: qi.sequelize.QueryTypes.SHOWCONSTRAINTS })) { type: qi.sequelize.QueryTypes.SHOWCONSTRAINTS }));
.then(constraints => {
const constraint = constraints[0]; const constraint = constraints[0];
let query; let query;
if (!constraint || !constraint.constraintType) { if (!constraint || !constraint.constraintType) {
throw new sequelizeErrors.UnknownConstraintError( throw new sequelizeErrors.UnknownConstraintError(
{ {
message: `Constraint ${constraintName} on table ${tableName} does not exist`, message: `Constraint ${constraintName} on table ${tableName} does not exist`,
constraint: constraintName, constraint: constraintName,
table: tableName table: tableName
}); });
} }
if (constraint.constraintType === 'FOREIGN KEY') { if (constraint.constraintType === 'FOREIGN KEY') {
query = qi.QueryGenerator.dropForeignKeyQuery(tableName, constraintName); query = qi.QueryGenerator.dropForeignKeyQuery(tableName, constraintName);
} else { } else {
query = qi.QueryGenerator.removeIndexQuery(constraint.tableName, constraint.constraintName); query = qi.QueryGenerator.removeIndexQuery(constraint.tableName, constraint.constraintName);
} }
return qi.sequelize.query(query, options); return await qi.sequelize.query(query, options);
});
} }
exports.removeConstraint = removeConstraint; exports.removeConstraint = removeConstraint;
......
'use strict'; 'use strict';
const Utils = require('../../utils');
const AbstractQuery = require('../abstract/query'); const AbstractQuery = require('../abstract/query');
const sequelizeErrors = require('../../errors'); const sequelizeErrors = require('../../errors');
const _ = require('lodash'); const _ = require('lodash');
...@@ -27,7 +26,7 @@ class Query extends AbstractQuery { ...@@ -27,7 +26,7 @@ class Query extends AbstractQuery {
return [sql, bindParam.length > 0 ? bindParam : undefined]; return [sql, bindParam.length > 0 ? bindParam : undefined];
} }
run(sql, parameters) { async run(sql, parameters) {
this.sql = sql; this.sql = sql;
const { connection, options } = this; const { connection, options } = this;
...@@ -36,7 +35,7 @@ class Query extends AbstractQuery { ...@@ -36,7 +35,7 @@ class Query extends AbstractQuery {
const complete = this._logQuery(sql, debug, parameters); const complete = this._logQuery(sql, debug, parameters);
return new Utils.Promise((resolve, reject) => { const results = await new Promise((resolve, reject) => {
const handler = (err, results) => { const handler = (err, results) => {
complete(); complete();
...@@ -59,16 +58,13 @@ class Query extends AbstractQuery { ...@@ -59,16 +58,13 @@ class Query extends AbstractQuery {
} else { } else {
connection.query({ sql }, handler).setMaxListeners(100); connection.query({ sql }, handler).setMaxListeners(100);
} }
}) });
// Log warnings if we've got them. // Log warnings if we've got them.
.then(results => { if (showWarnings && results && results.warningStatus > 0) {
if (showWarnings && results && results.warningStatus > 0) { await this.logWarnings(results);
return this.logWarnings(results); }
}
return results;
})
// Return formatted results... // Return formatted results...
.then(results => this.formatResults(results)); return this.formatResults(results);
} }
/** /**
...@@ -165,27 +161,26 @@ class Query extends AbstractQuery { ...@@ -165,27 +161,26 @@ class Query extends AbstractQuery {
return result; return result;
} }
logWarnings(results) { async logWarnings(results) {
return this.run('SHOW WARNINGS').then(warningResults => { const warningResults = await this.run('SHOW WARNINGS');
const warningMessage = `MySQL Warnings (${this.connection.uuid || 'default'}): `; const warningMessage = `MySQL Warnings (${this.connection.uuid || 'default'}): `;
const messages = []; const messages = [];
for (const _warningRow of warningResults) { for (const _warningRow of warningResults) {
if (_warningRow === undefined || typeof _warningRow[Symbol.iterator] !== 'function') continue; if (_warningRow === undefined || typeof _warningRow[Symbol.iterator] !== 'function') continue;
for (const _warningResult of _warningRow) { for (const _warningResult of _warningRow) {
if (Object.prototype.hasOwnProperty.call(_warningResult, 'Message')) { if (Object.prototype.hasOwnProperty.call(_warningResult, 'Message')) {
messages.push(_warningResult.Message); messages.push(_warningResult.Message);
} else { } else {
for (const _objectKey of _warningResult.keys()) { for (const _objectKey of _warningResult.keys()) {
messages.push([_objectKey, _warningResult[_objectKey]].join(': ')); messages.push([_objectKey, _warningResult[_objectKey]].join(': '));
}
} }
} }
} }
}
this.sequelize.log(warningMessage + messages.join('; '), this.options); this.sequelize.log(warningMessage + messages.join('; '), this.options);
return results; return results;
});
} }
formatError(err) { formatError(err) {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!