不要怂,就是干,撸起袖子干!

Commit bd59b872 by Paul Oldridge Committed by Sushant

fix(mysql/maridb): set isolation level for transaction not entire session (#11476)

1 parent f295a740
...@@ -30,6 +30,7 @@ AbstractDialect.prototype.supports = { ...@@ -30,6 +30,7 @@ AbstractDialect.prototype.supports = {
bulkDefault: false, bulkDefault: false,
schemas: false, schemas: false,
transactions: true, transactions: true,
settingIsolationLevelDuringTransaction: true,
transactionOptions: { transactionOptions: {
type: false type: false
}, },
......
...@@ -16,7 +16,7 @@ const TransactionQueries = { ...@@ -16,7 +16,7 @@ const TransactionQueries = {
return; return;
} }
return `SET SESSION TRANSACTION ISOLATION LEVEL ${value};`; return `SET TRANSACTION ISOLATION LEVEL ${value};`;
}, },
generateTransactionId() { generateTransactionId() {
......
...@@ -25,6 +25,7 @@ MariadbDialect.prototype.supports = _.merge( ...@@ -25,6 +25,7 @@ MariadbDialect.prototype.supports = _.merge(
'LIMIT ON UPDATE': true, 'LIMIT ON UPDATE': true,
lock: true, lock: true,
forShare: 'LOCK IN SHARE MODE', forShare: 'LOCK IN SHARE MODE',
settingIsolationLevelDuringTransaction: false,
schemas: true, schemas: true,
inserts: { inserts: {
ignoreDuplicates: ' IGNORE', ignoreDuplicates: ' IGNORE',
......
...@@ -24,6 +24,7 @@ MysqlDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype. ...@@ -24,6 +24,7 @@ MysqlDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype.
'LIMIT ON UPDATE': true, 'LIMIT ON UPDATE': true,
lock: true, lock: true,
forShare: 'LOCK IN SHARE MODE', forShare: 'LOCK IN SHARE MODE',
settingIsolationLevelDuringTransaction: false,
inserts: { inserts: {
ignoreDuplicates: ' IGNORE', ignoreDuplicates: ' IGNORE',
updateOnDuplicate: ' ON DUPLICATE KEY UPDATE' updateOnDuplicate: ' ON DUPLICATE KEY UPDATE'
......
...@@ -129,7 +129,6 @@ class Transaction { ...@@ -129,7 +129,6 @@ class Transaction {
.then(() => { .then(() => {
return this.begin() return this.begin()
.then(() => this.setDeferrable()) .then(() => this.setDeferrable())
.then(() => this.setIsolationLevel())
.catch(setupErr => this.rollback().finally(() => { .catch(setupErr => this.rollback().finally(() => {
throw setupErr; throw setupErr;
})); }));
...@@ -142,13 +141,6 @@ class Transaction { ...@@ -142,13 +141,6 @@ class Transaction {
}); });
} }
begin() {
return this
.sequelize
.getQueryInterface()
.startTransaction(this, this.options);
}
setDeferrable() { setDeferrable() {
if (this.options.deferrable) { if (this.options.deferrable) {
return this return this
...@@ -158,11 +150,18 @@ class Transaction { ...@@ -158,11 +150,18 @@ class Transaction {
} }
} }
setIsolationLevel() { begin() {
return this const queryInterface = this.sequelize.getQueryInterface();
.sequelize
.getQueryInterface() if ( this.sequelize.dialect.supports.settingIsolationLevelDuringTransaction ) {
.setIsolationLevel(this, this.options.isolationLevel, this.options); return queryInterface.startTransaction(this, this.options).then(() => {
return queryInterface.setIsolationLevel(this, this.options.isolationLevel, this.options);
});
}
return queryInterface.setIsolationLevel(this, this.options.isolationLevel, this.options).then(() => {
return queryInterface.startTransaction(this, this.options);
});
} }
cleanup() { cleanup() {
......
...@@ -527,6 +527,77 @@ if (current.dialect.supports.transactions) { ...@@ -527,6 +527,77 @@ if (current.dialect.supports.transactions) {
} }
describe('isolation levels', () => {
it('should read the most recent committed rows when using the READ COMMITTED isolation level', function() {
const User = this.sequelize.define('user', {
username: Support.Sequelize.STRING
});
return expect(
this.sequelize.sync({ force: true }).then(() => {
return this.sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED }, transaction => {
return User.findAll({ transaction })
.then(users => expect( users ).to.have.lengthOf(0))
.then(() => User.create({ username: 'jan' })) // Create a User outside of the transaction
.then(() => User.findAll({ transaction }))
.then(users => expect( users ).to.have.lengthOf(1)); // We SHOULD see the created user inside the transaction
});
})
).to.eventually.be.fulfilled;
});
// mssql is excluded because it implements REPREATABLE READ using locks rather than a snapshot, and will see the new row
if (!['sqlite', 'mssql'].includes(dialect)) {
it('should not read newly committed rows when using the REPEATABLE READ isolation level', function() {
const User = this.sequelize.define('user', {
username: Support.Sequelize.STRING
});
return expect(
this.sequelize.sync({ force: true }).then(() => {
return this.sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ }, transaction => {
return User.findAll({ transaction })
.then(users => expect( users ).to.have.lengthOf(0))
.then(() => User.create({ username: 'jan' })) // Create a User outside of the transaction
.then(() => User.findAll({ transaction }))
.then(users => expect( users ).to.have.lengthOf(0)); // We SHOULD NOT see the created user inside the transaction
});
})
).to.eventually.be.fulfilled;
});
}
// PostgreSQL is excluded because it detects Serialization Failure on commit instead of acquiring locks on the read rows
if (!['sqlite', 'postgres', 'postgres-native'].includes(dialect)) {
it('should block updates after reading a row using SERIALIZABLE', function() {
const User = this.sequelize.define('user', {
username: Support.Sequelize.STRING
}),
transactionSpy = sinon.spy();
return this.sequelize.sync({ force: true }).then(() => {
return User.create({ username: 'jan' });
}).then(() => {
return this.sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }).then(transaction => {
return User.findAll( { transaction } )
.then(() => Promise.join(
User.update({ username: 'joe' }, {
where: {
username: 'jan'
}
}).then(() => expect(transactionSpy).to.have.been.called ), // Update should not succeed before transaction has committed
Promise.delay(2000)
.then(() => transaction.commit())
.then(transactionSpy)
));
});
});
});
}
});
if (current.dialect.supports.lock) { if (current.dialect.supports.lock) {
describe('row locking', () => { describe('row locking', () => {
it('supports for update', function() { it('supports for update', function() {
......
...@@ -50,4 +50,28 @@ describe('Transaction', () => { ...@@ -50,4 +50,28 @@ describe('Transaction', () => {
return Sequelize.Promise.resolve(); return Sequelize.Promise.resolve();
}); });
}); });
it('should set isolation level correctly', function() {
const expectations = {
all: [
'SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;',
'START TRANSACTION;'
],
postgres: [
'START TRANSACTION;',
'SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;'
],
sqlite: [
'BEGIN DEFERRED TRANSACTION;',
'PRAGMA read_uncommitted = ON;'
],
mssql: [
'BEGIN TRANSACTION;'
]
};
return current.transaction({ isolationLevel: Sequelize.Transaction.ISOLATION_LEVELS.READ_UNCOMMITTED }, () => {
expect(this.stub.args.map(arg => arg[0])).to.deep.equal(expectations[dialect] || expectations.all);
return Sequelize.Promise.resolve();
});
});
}); });
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!