不要怂,就是干,撸起袖子干!

Commit 6388507e by Pedro Augusto de Paula Barbosa Committed by GitHub

fix(mysql): release connection on deadlocks (#13102)

* test(mysql, mariadb): improve transaction tests

- Greatly improve test for `SELECT ... LOCK IN SHARE MODE`
- Greatly improve test for deadlock handling

* fix(mysql): release connection on deadlocks

This is a follow-up for a problem not covered by #12841.

* refactor(mariadb): `query.js` similar to mysql's

* Update comments with a reference to this PR
1 parent ced4dc78
...@@ -7,6 +7,7 @@ const DataTypes = require('../../data-types'); ...@@ -7,6 +7,7 @@ const DataTypes = require('../../data-types');
const { logger } = require('../../utils/logger'); const { logger } = require('../../utils/logger');
const ER_DUP_ENTRY = 1062; const ER_DUP_ENTRY = 1062;
const ER_DEADLOCK = 1213;
const ER_ROW_IS_REFERENCED = 1451; const ER_ROW_IS_REFERENCED = 1451;
const ER_NO_REFERENCED_ROW = 1452; const ER_NO_REFERENCED_ROW = 1452;
...@@ -46,40 +47,25 @@ class Query extends AbstractQuery { ...@@ -46,40 +47,25 @@ class Query extends AbstractQuery {
try { try {
results = await connection.query(this.sql, parameters); results = await connection.query(this.sql, parameters);
complete(); } catch (error) {
if (options.transaction && error.errno === ER_DEADLOCK) {
// Log warnings if we've got them. // MariaDB automatically rolls-back transactions in the event of a deadlock.
if (showWarnings && results && results.warningStatus > 0) { // However, we still initiate a manual rollback to ensure the connection gets released - see #13102.
await this.logWarnings(results);
}
} catch (err) {
// MariaDB automatically rolls-back transactions in the event of a
// deadlock.
//
// Even though we shouldn't need to do this, we initiate a manual
// rollback. Without the rollback, the next transaction using the
// connection seems to retain properties of the previous transaction
// (e.g. isolation level) and not work as expected.
//
// For example (in our tests), a follow-up READ_COMMITTED transaction
// doesn't work as expected unless we explicitly rollback the
// transaction: it would fail to read a value inserted outside of that
// transaction.
if (options.transaction && err.errno === 1213) {
try { try {
await options.transaction.rollback(); await options.transaction.rollback();
} catch (err) { } catch (error_) {
// Ignore errors - since MariaDB automatically rolled back, we're // Ignore errors - since MariaDB automatically rolled back, we're
// not that worried about this redundant rollback failing. // not that worried about this redundant rollback failing.
} }
options.transaction.finished = 'rollback'; options.transaction.finished = 'rollback';
} }
error.sql = sql;
error.parameters = parameters;
throw this.formatError(error);
} finally {
complete(); complete();
err.sql = sql;
err.parameters = parameters;
throw this.formatError(err);
} }
if (showWarnings && results && results.warningStatus > 0) { if (showWarnings && results && results.warningStatus > 0) {
......
...@@ -6,6 +6,7 @@ const _ = require('lodash'); ...@@ -6,6 +6,7 @@ const _ = require('lodash');
const { logger } = require('../../utils/logger'); const { logger } = require('../../utils/logger');
const ER_DUP_ENTRY = 1062; const ER_DUP_ENTRY = 1062;
const ER_DEADLOCK = 1213;
const ER_ROW_IS_REFERENCED = 1451; const ER_ROW_IS_REFERENCED = 1451;
const ER_NO_REFERENCED_ROW = 1452; const ER_NO_REFERENCED_ROW = 1452;
...@@ -57,18 +58,26 @@ class Query extends AbstractQuery { ...@@ -57,18 +58,26 @@ class Query extends AbstractQuery {
.setMaxListeners(100); .setMaxListeners(100);
}); });
} }
} catch (err) { } catch (error) {
// MySQL automatically rolls-back transactions in the event of a deadlock if (options.transaction && error.errno === ER_DEADLOCK) {
if (options.transaction && err.errno === 1213) { // MySQL automatically rolls-back transactions in the event of a deadlock.
options.transaction.finished = 'rollback'; // However, we still initiate a manual rollback to ensure the connection gets released - see #13102.
try {
await options.transaction.rollback();
} catch (error_) {
// Ignore errors - since MySQL automatically rolled back, we're
// not that worried about this redundant rollback failing.
} }
err.sql = sql; options.transaction.finished = 'rollback';
err.parameters = parameters;
throw this.formatError(err);
} }
error.sql = sql;
error.parameters = parameters;
throw this.formatError(error);
} finally {
complete(); complete();
}
if (showWarnings && results && results.warningStatus > 0) { if (showWarnings && results && results.warningStatus > 0) {
await this.logWarnings(results); await this.logWarnings(results);
......
...@@ -80,6 +80,7 @@ ...@@ -80,6 +80,7 @@
"nyc": "^15.0.0", "nyc": "^15.0.0",
"p-map": "^4.0.0", "p-map": "^4.0.0",
"p-props": "^4.0.0", "p-props": "^4.0.0",
"p-settle": "^4.1.1",
"p-timeout": "^4.0.0", "p-timeout": "^4.0.0",
"pg": "^8.2.1", "pg": "^8.2.1",
"pg-hstore": "^2.x", "pg-hstore": "^2.x",
......
...@@ -18,8 +18,8 @@ describe(Support.getTestDialectTeaser('Replication'), () => { ...@@ -18,8 +18,8 @@ describe(Support.getTestDialectTeaser('Replication'), () => {
this.sequelize = Support.getSequelizeInstance(null, null, null, { this.sequelize = Support.getSequelizeInstance(null, null, null, {
replication: { replication: {
write: Support.getConnectionOptions(), write: Support.getConnectionOptionsWithoutPool(),
read: [Support.getConnectionOptions()] read: [Support.getConnectionOptionsWithoutPool()]
} }
}); });
......
'use strict'; 'use strict';
const chai = require('chai'), const chai = require('chai');
expect = chai.expect, const expect = chai.expect;
Support = require('./support'), const Support = require('./support');
dialect = Support.getTestDialect(), const dialect = Support.getTestDialect();
Sequelize = require('../../index'), const { Sequelize, QueryTypes, DataTypes, Transaction } = require('../../index');
QueryTypes = require('../../lib/query-types'), const sinon = require('sinon');
Transaction = require('../../lib/transaction'), const current = Support.sequelize;
sinon = require('sinon'), const delay = require('delay');
current = Support.sequelize, const pSettle = require('p-settle');
delay = require('delay');
if (current.dialect.supports.transactions) { if (current.dialect.supports.transactions) {
...@@ -493,7 +492,7 @@ if (current.dialect.supports.transactions) { ...@@ -493,7 +492,7 @@ if (current.dialect.supports.transactions) {
expect(count).to.equal(2, 'transactions were fully rolled-back, and no new rows were added'); expect(count).to.equal(2, 'transactions were fully rolled-back, and no new rows were added');
}); });
it('should release the connection for a deadlocked transaction', async function() { it('should release the connection for a deadlocked transaction (1/2)', async function() {
const Task = await getAndInitializeTaskModel(this.sequelize); const Task = await getAndInitializeTaskModel(this.sequelize);
// 1 of 2 queries should deadlock and be rolled back by InnoDB // 1 of 2 queries should deadlock and be rolled back by InnoDB
...@@ -523,6 +522,127 @@ if (current.dialect.supports.transactions) { ...@@ -523,6 +522,127 @@ if (current.dialect.supports.transactions) {
} }
); );
}); });
it('should release the connection for a deadlocked transaction (2/2)', async function() {
const verifyDeadlock = async () => {
const User = this.sequelize.define('user', {
username: DataTypes.STRING,
awesome: DataTypes.BOOLEAN
}, { timestamps: false });
await this.sequelize.sync({ force: true });
const { id } = await User.create({ username: 'jan' });
// First, we start a transaction T1 and perform a SELECT with it using the `LOCK.SHARE` mode (setting a shared mode lock on the row).
// This will cause other sessions to be able to read the row but not modify it.
// So, if another transaction tries to update those same rows, it will wait until T1 commits (or rolls back).
// https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-reads.html
const t1 = await this.sequelize.transaction();
const t1Jan = await User.findByPk(id, { lock: t1.LOCK.SHARE, transaction: t1 });
// Then we start another transaction T2 and see that it can indeed read the same row.
const t2 = await this.sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED });
const t2Jan = await User.findByPk(id, { transaction: t2 });
// Then, we want to see that an attempt to update that row from T2 will be queued until T1 commits.
// However, before commiting T1 we will also perform an update via T1 on the same rows.
// This should cause T2 to notice that it can't function anymore, so it detects a deadlock and automatically rolls itself back (and throws an error).
// Meanwhile, T1 should still be ok.
const executionOrder = [];
const [t2AttemptData, t1AttemptData] = await pSettle([
(async () => {
try {
executionOrder.push('Begin attempt to update via T2');
await t2Jan.update({ awesome: false }, { transaction: t2 });
executionOrder.push('Done updating via T2'); // Shouldn't happen
} catch (error) {
executionOrder.push('Failed to update via T2');
throw error;
}
await delay(30);
try {
// We shouldn't reach this point, but if we do, let's at least commit the transaction
// to avoid forever occupying one connection of the pool with a pending transaction.
executionOrder.push('Attempting to commit T2');
await t2.commit();
executionOrder.push('Done committing T2');
} catch {
executionOrder.push('Failed to commit T2');
}
})(),
(async () => {
await delay(100);
try {
executionOrder.push('Begin attempt to update via T1');
await t1Jan.update({ awesome: true }, { transaction: t1 });
executionOrder.push('Done updating via T1');
} catch (error) {
executionOrder.push('Failed to update via T1'); // Shouldn't happen
throw error;
}
await delay(150);
try {
executionOrder.push('Attempting to commit T1');
await t1.commit();
executionOrder.push('Done committing T1');
} catch {
executionOrder.push('Failed to commit T1'); // Shouldn't happen
}
})()
]);
expect(t1AttemptData.isFulfilled).to.be.true;
expect(t2AttemptData.isRejected).to.be.true;
expect(t2AttemptData.reason.message).to.include('Deadlock found when trying to get lock; try restarting transaction');
expect(t1.finished).to.equal('commit');
expect(t2.finished).to.equal('rollback');
const expectedExecutionOrder = [
'Begin attempt to update via T2',
'Begin attempt to update via T1', // 100ms after
'Done updating via T1', // right after
'Failed to update via T2', // right after
'Attempting to commit T1', // 150ms after
'Done committing T1' // right after
];
// The order things happen in the database must be the one shown above. However, sometimes it can happen that
// the calls in the JavaScript event loop that are communicating with the database do not match exactly this order.
// In particular, it is possible that the JS event loop logs `'Failed to update via T2'` before logging `'Done updating via T1'`,
// even though the database updated T1 first (and then rushed to declare a deadlock for T2).
const anotherAcceptableExecutionOrderFromJSPerspective = [
'Begin attempt to update via T2',
'Begin attempt to update via T1', // 100ms after
'Failed to update via T2', // right after
'Done updating via T1', // right after
'Attempting to commit T1', // 150ms after
'Done committing T1' // right after
];
const executionOrderOk = Support.isDeepEqualToOneOf(
executionOrder,
[
expectedExecutionOrder,
anotherAcceptableExecutionOrderFromJSPerspective
]
);
if (!executionOrderOk) {
throw new Error(`Unexpected execution order: ${executionOrder.join(' > ')}`);
}
};
for (let i = 0; i < 3 * Support.getPoolMax(); i++) {
await verifyDeadlock();
await delay(10);
}
});
}); });
} }
...@@ -916,55 +1036,123 @@ if (current.dialect.supports.transactions) { ...@@ -916,55 +1036,123 @@ if (current.dialect.supports.transactions) {
}); });
} }
it('supports for share', async function() { it('supports for share (i.e. `SELECT ... LOCK IN SHARE MODE`)', async function() {
const verifySelectLockInShareMode = async () => {
const User = this.sequelize.define('user', { const User = this.sequelize.define('user', {
username: Support.Sequelize.STRING, username: DataTypes.STRING,
awesome: Support.Sequelize.BOOLEAN awesome: DataTypes.BOOLEAN
}, { timestamps: false }); }, { timestamps: false });
const t1CommitSpy = sinon.spy();
const t2FindSpy = sinon.spy();
const t2UpdateSpy = sinon.spy();
await this.sequelize.sync({ force: true }); await this.sequelize.sync({ force: true });
const user = await User.create({ username: 'jan' }); const { id } = await User.create({ username: 'jan' });
// First, we start a transaction T1 and perform a SELECT with it using the `LOCK.SHARE` mode (setting a shared mode lock on the row).
// This will cause other sessions to be able to read the row but not modify it.
// So, if another transaction tries to update those same rows, it will wait until T1 commits (or rolls back).
// https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-reads.html
const t1 = await this.sequelize.transaction(); const t1 = await this.sequelize.transaction();
const t1Jan = await User.findByPk(user.id, { await User.findByPk(id, { lock: t1.LOCK.SHARE, transaction: t1 });
lock: t1.LOCK.SHARE,
transaction: t1
});
const t2 = await this.sequelize.transaction({ // Then we start another transaction T2 and see that it can indeed read the same row.
isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED const t2 = await this.sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED });
}); const t2Jan = await User.findByPk(id, { transaction: t2 });
await Promise.all([ // Then, we want to see that an attempt to update that row from T2 will be queued until T1 commits.
const executionOrder = [];
const [t2AttemptData, t1AttemptData] = await pSettle([
(async () => { (async () => {
const t2Jan = await User.findByPk(user.id, { try {
transaction: t2 executionOrder.push('Begin attempt to update via T2');
});
t2FindSpy();
await t2Jan.update({ awesome: false }, { transaction: t2 }); await t2Jan.update({ awesome: false }, { transaction: t2 });
t2UpdateSpy(); executionOrder.push('Done updating via T2');
} catch (error) {
executionOrder.push('Failed to update via T2'); // Shouldn't happen
throw error;
}
await delay(30);
try {
executionOrder.push('Attempting to commit T2');
await t2.commit(); await t2.commit();
executionOrder.push('Done committing T2');
} catch {
executionOrder.push('Failed to commit T2'); // Shouldn't happen
}
})(), })(),
(async () => { (async () => {
await t1Jan.update({ awesome: true }, { transaction: t1 }); await delay(100);
await delay(2000);
t1CommitSpy(); try {
executionOrder.push('Begin attempt to read via T1');
await User.findAll({ transaction: t1 });
executionOrder.push('Done reading via T1');
} catch (error) {
executionOrder.push('Failed to read via T1'); // Shouldn't happen
throw error;
}
await delay(150);
try {
executionOrder.push('Attempting to commit T1');
await t1.commit(); await t1.commit();
executionOrder.push('Done committing T1');
} catch {
executionOrder.push('Failed to commit T1'); // Shouldn't happen
}
})() })()
]); ]);
// (t2) find call should have returned before (t1) commit expect(t1AttemptData.isFulfilled).to.be.true;
expect(t2FindSpy).to.have.been.calledBefore(t1CommitSpy); expect(t2AttemptData.isFulfilled).to.be.true;
expect(t1.finished).to.equal('commit');
expect(t2.finished).to.equal('commit');
const expectedExecutionOrder = [
'Begin attempt to update via T2',
'Begin attempt to read via T1', // 100ms after
'Done reading via T1', // right after
'Attempting to commit T1', // 150ms after
'Done committing T1', // right after
'Done updating via T2', // right after
'Attempting to commit T2', // 30ms after
'Done committing T2' // right after
];
// But (t2) update call should not happen before (t1) commit // The order things happen in the database must be the one shown above. However, sometimes it can happen that
expect(t2UpdateSpy).to.have.been.calledAfter(t1CommitSpy); // the calls in the JavaScript event loop that are communicating with the database do not match exactly this order.
// In particular, it is possible that the JS event loop logs `'Done updating via T2'` before logging `'Done committing T1'`,
// even though the database committed T1 first (and then rushed to complete the pending update query from T2).
const anotherAcceptableExecutionOrderFromJSPerspective = [
'Begin attempt to update via T2',
'Begin attempt to read via T1', // 100ms after
'Done reading via T1', // right after
'Attempting to commit T1', // 150ms after
'Done updating via T2', // right after
'Done committing T1', // right after
'Attempting to commit T2', // 30ms after
'Done committing T2' // right after
];
const executionOrderOk = Support.isDeepEqualToOneOf(
executionOrder,
[
expectedExecutionOrder,
anotherAcceptableExecutionOrderFromJSPerspective
]
);
if (!executionOrderOk) {
throw new Error(`Unexpected execution order: ${executionOrder.join(' > ')}`);
}
};
for (let i = 0; i < 3 * Support.getPoolMax(); i++) {
await verifySelectLockInShareMode();
await delay(10);
}
}); });
}); });
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
const fs = require('fs'); const fs = require('fs');
const path = require('path'); const path = require('path');
const { isDeepStrictEqual } = require('util');
const _ = require('lodash'); const _ = require('lodash');
const Sequelize = require('../index'); const Sequelize = require('../index');
const Config = require('./config/config'); const Config = require('./config/config');
...@@ -119,11 +120,10 @@ const Support = { ...@@ -119,11 +120,10 @@ const Support = {
return this.getSequelizeInstance(config.database, config.username, config.password, sequelizeOptions); return this.getSequelizeInstance(config.database, config.username, config.password, sequelizeOptions);
}, },
getConnectionOptions() { getConnectionOptionsWithoutPool() {
const config = Config[this.getTestDialect()]; // Do not break existing config object - shallow clone before `delete config.pool`
const config = { ...Config[this.getTestDialect()] };
delete config.pool; delete config.pool;
return config; return config;
}, },
...@@ -207,6 +207,10 @@ const Support = { ...@@ -207,6 +207,10 @@ const Support = {
return `[${dialect.toUpperCase()}] ${moduleName}`; return `[${dialect.toUpperCase()}] ${moduleName}`;
}, },
getPoolMax() {
return Config[this.getTestDialect()].pool.max;
},
expectsql(query, assertions) { expectsql(query, assertions) {
const expectations = assertions.query || assertions; const expectations = assertions.query || assertions;
let expectation = expectations[Support.sequelize.dialect.name]; let expectation = expectations[Support.sequelize.dialect.name];
...@@ -234,6 +238,10 @@ const Support = { ...@@ -234,6 +238,10 @@ const Support = {
const bind = assertions.bind[Support.sequelize.dialect.name] || assertions.bind['default'] || assertions.bind; const bind = assertions.bind[Support.sequelize.dialect.name] || assertions.bind['default'] || assertions.bind;
expect(query.bind).to.deep.equal(bind); expect(query.bind).to.deep.equal(bind);
} }
},
isDeepEqualToOneOf(actual, expectedOptions) {
return expectedOptions.some(expected => isDeepStrictEqual(actual, expected));
} }
}; };
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!