【Spring】事务的执行原理(三)

事务的回滚

  1. 如果获取事务属性不为空,并且抛出的异常是RuntimeException或者Error类型,调用事务管理器中的rollback方法进行回滚

  2. 如果事务属性为空或者抛出的异常不是RuntimeException,也不是Error,将继续提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

/**
* 处理抛出异常下的事务
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 判空
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 如果事务属性不为空并且异常是是RuntimeException或者Error
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 获取事务管理器,调用rollback方法进行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// 如果事务属性为空或者异常不是RuntimeException或者Error,继续提交事务
try {
// 提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
}

// DefaultTransactionAttribute中实现了rollbackOn方法
public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {
/**
* 判断是否是RuntimeException或者Error
*/
@Override
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
}

rollback方法在AbstractPlatformTransactionManager中实现,主要分为以下三种情况:

  1. 判断事务是否设置了保存点,如果设置了将事务回滚到保存点
  2. 如果是一个独立的新事务,直接回滚即可
  3. 如果既没有设置保存点,也不是一个新事务,说明可能处于嵌套事务中,此时只设置回滚状态rollbackOnly为true,当它的外围事务进行提交时,如果发现回滚状态为true,则不提交

以上步骤执行完毕,调用cleanupAfterCompletion方法进行资源的清理已经挂起事务的恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/*
* 回滚
*/
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 转为DefaultTransactionStatus
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 处理回滚
processRollback(defStatus, false);
}

/**
* 处理回滚
*/
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
// 回滚之前的触发器
triggerBeforeCompletion(status);
// 是否有保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 回滚至保存点
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) { // 如果是一个独立的新事务
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 直接回滚
doRollback(status);
}
else {
// 如果处于一个嵌套的事务汇总
if (status.hasTransaction()) {
// 如果本地的回滚状态置为true 或者 事务失败进行全局回滚
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 设置事务rollbackOnly状态为true
doSetRollbackOnly(stat);
}
else {
// 打印日志,意思是由事务的组织者决定是否回滚
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
// 打印DEBUG日志,应该回滚但是没有获取到事务
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回滚之后的触发器
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清除相关资源并恢复挂起的事务
cleanupAfterCompletion(status);
}
}

/**
* 当一个事务失败以后,是否全局的标进行事务回滚
*/
public final boolean isGlobalRollbackOnParticipationFailure() {
return this.globalRollbackOnParticipationFailure;
}
}

回滚处理

rollbackToHeldSavepoint回滚至保存点

rollbackToHeldSavepoint方法在AbstractTransactionStatus中实现,它调用了getSavepointManager方法获取保存点管理器,调用SavepointManager的rollbackToSavepoint方法进行回滚的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractTransactionStatus implements TransactionStatus {

public void rollbackToHeldSavepoint() throws TransactionException {
Object savepoint = getSavepoint();
if (savepoint == null) {
throw new TransactionUsageException(
"Cannot roll back to savepoint - no savepoint associated with current transaction");
}
// getSavepointManager方法在DefaultTransactionStatus中实现
getSavepointManager().rollbackToSavepoint(savepoint);
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
}

SavepointManager是一个接口,它的继承关系如下:

DefaultTransactionStatus中获取SavepointManager的方法:

  1. 获取transaction对象,前面的知识可知这里是一个DataSourceTransactionObject
  2. 由继承关系可知DataSourceTransactionObject也是SavepointManager子类,所以将DataSourceTransactionObject转为SavepointManager返回
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class DefaultTransactionStatus extends AbstractTransactionStatus {
    @Override
    protected SavepointManager getSavepointManager() {
    // 前面的知识可知这里是一个DataSourceTransactionObject
    Object transaction = this.transaction;
    if (!(transaction instanceof SavepointManager)) {
    throw new NestedTransactionNotSupportedException(
    "Transaction object [" + this.transaction + "] does not support savepoints");
    }
    // 将DataSourceTransactionObject转为SavepointManager
    return (SavepointManager) transaction;
    }
    }

DataSourceTransactionObject是DataSourceTransactionManager的内部类,它继承了JdbcTransactionObjectSupport,rollbackToSavepoint方法在JdbcTransactionObjectSupport中实现:

  1. 获取ConnectionHolder,ConnectionHolder持有数据库连接
  2. 调用底层的rollback方法将事务回滚至保存点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
// 内部类DataSourceTransactionObject
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

}
}

// JdbcTransactionObjectSupport
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
/**
* 回滚至保存点
*/
@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
// 获取ConnectionHolder
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
// 调用底层的rollback方法将事务回滚至保存点
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
}
catch (Throwable ex) {
throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
}
}

/**
* 释放保存点
*/
@Override
public void releaseSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
// 调用底层的方法释放保存点
conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
}
catch (Throwable ex) {
logger.debug("Could not explicitly release JDBC savepoint", ex);
}
}
}

doRollback事务回滚

回滚事务时先获取数据库连接,然后调用底层的rollback进行回滚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {

@Override
protected void doRollback(DefaultTransactionStatus status) {
// 获取数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
// 获取数据库连接
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
// 调用底层的回滚方法
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
}

doSetRollbackOnly设置回滚状态

doSetRollbackOnly方法在DataSourceTransactionManager中实现:

  1. 将事务转为DataSourceTransactionObject对象,前面讲过DataSourceTransactionObject持有了数据库连接对象ConnectionHolder

  2. 将ConnectionHolder的rollbackOnly属性置为true,先标记事务的回滚状态,交由外围事务进行判断统一进行回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {

@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
// 获取数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
// 设置回滚状态
txObject.setRollbackOnly();
}

/**
* 内部类DataSourceTransactionObject
*/
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

// 省略其他方法

// 设置回滚状态
public void setRollbackOnly() {
// 将ConnectionHolder的rollbackOnly属性置为true,在ConnectionHolder的父类ResourceHolderSupport中实现
getConnectionHolder().setRollbackOnly();
}
}
}

// ConnectionHolder的父类ResourceHolderSupport
public abstract class ResourceHolderSupport implements ResourceHolder {

private boolean rollbackOnly = false;

/**
* 标记事务回滚状态为true
*/
public void setRollbackOnly() {
this.rollbackOnly = true;
}
}

资源清理

在事务回滚之后,需要清理相关的资源以及恢复被挂起的事务:

  1. 如果事务的newSynchronization状态为true,清除当前线程绑定的事务相关信息
    • 在TransactionSynchronizationManager的clear方法中实现,清理了当前线程绑定的事务名称、事务隔离级别等信息
  2. 如果是一个新事务,清除当前线程与数据库连接的绑定关系,在DataSourceTransactionManager的doCleanupAfterCompletion方法中实现
  3. 如果挂起的事务不为空,恢复挂起的事务
    • 获取数据源,恢复数据源与挂起事务的绑定关系
    • 恢复挂起事务与当前线程的同步信息
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

      /**
      * 回滚之后的清除操作
      * @see #doCleanupAfterCompletion
      */
      private void cleanupAfterCompletion(DefaultTransactionStatus status) {
      status.setCompleted();
      if (status.isNewSynchronization()) {
      // 清除当前线程绑定的信息
      TransactionSynchronizationManager.clear();
      }
      // 如果是一个新事务
      if (status.isNewTransaction()) {
      // 清除当前线程与数据库连接的绑定关系
      doCleanupAfterCompletion(status.getTransaction());
      }
      // 如果挂起的事务不为空
      if (status.getSuspendedResources() != null) {
      if (status.isDebug()) {
      logger.debug("Resuming suspended transaction after completion of inner transaction");
      }
      Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
      // 恢复挂起的事务
      resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
      }
      }

      /**
      * 恢复挂起的事务
      * @see #doResume
      * @see #suspend
      */
      protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
      throws TransactionException {

      if (resourcesHolder != null) {
      // 获取挂起的事务
      Object suspendedResources = resourcesHolder.suspendedResources;
      if (suspendedResources != null) {
      // 获取数据源,并与挂起的事务进行绑定
      doResume(transaction, suspendedResources);
      }
      // 挂起事务的同步信息
      List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
      if (suspendedSynchronizations != null) {
      // 恢复事务与线程的同步信息
      TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
      TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
      doResumeSynchronization(suspendedSynchronizations);
      }
      }
      }
      }

      // DataSourceTransactionManager
      public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
      implements ResourceTransactionManager, InitializingBean {
      // 主要是清除当前线程与数据库连接的绑定关系
      protected void doCleanupAfterCompletion(Object transaction) {
      DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

      // Remove the connection holder from the thread, if exposed.
      if (txObject.isNewConnectionHolder()) {
      TransactionSynchronizationManager.unbindResource(obtainDataSource());
      }

      // 获取数据库连接
      Connection con = txObject.getConnectionHolder().getConnection();
      try {
      if (txObject.isMustRestoreAutoCommit()) {
      // 自动提交置为true
      con.setAutoCommit(true);
      }
      // 重置数据库连接的相关设置
      DataSourceUtils.resetConnectionAfterTransaction(
      con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
      }
      catch (Throwable ex) {
      logger.debug("Could not reset JDBC Connection after transaction", ex);
      }

      if (txObject.isNewConnectionHolder()) {
      if (logger.isDebugEnabled()) {
      logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
      }
      // 释放连接
      DataSourceUtils.releaseConnection(con, this.dataSource);
      }
      // 清除当前线程与数据库连接的绑定关系
      txObject.getConnectionHolder().clear();
      }

      @Override
      protected void doResume(@Nullable Object transaction, Object suspendedResources) {
      // 获取数据源,并与挂起的事务进行绑定
      TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
      }
      }

      // TransactionSynchronizationManager
      public abstract class TransactionSynchronizationManager {

      // 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
      private static final ThreadLocal<Map<Object, Object>> resources =
      new NamedThreadLocal<>("Transactional resources");
      // 保存了线程绑定的事务同步信息TransactionSynchronization
      private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
      new NamedThreadLocal<>("Transaction synchronizations");

      // 保存了线程绑定的事务名称
      private static final ThreadLocal<String> currentTransactionName =
      new NamedThreadLocal<>("Current transaction name");
      // 保存了线程绑定的事务只读状态
      private static final ThreadLocal<Boolean> currentTransactionReadOnly =
      new NamedThreadLocal<>("Current transaction read-only status");

      // 保存了线程绑定的事务隔离级别
      private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
      new NamedThreadLocal<>("Current transaction isolation level");
      // 保存了线程绑定的事务活跃状态
      private static final ThreadLocal<Boolean> actualTransactionActive =
      new NamedThreadLocal<>("Actual transaction active");

      /**
      * 清理事务与当前线程的各种同步状态
      */
      public static void clear() {
      // 清除当前线程绑定的事务同步信息TransactionSynchronization
      synchronizations.remove();
      // 清除当前线程绑定的事务名称
      currentTransactionName.remove();
      // 清除线程绑定的事务只读状态
      currentTransactionReadOnly.remove();
      // 清除线程绑定的事务隔离级别
      currentTransactionIsolationLevel.remove();
      // 清除线程绑定的事务活跃状态
      actualTransactionActive.remove();
      }
      }

总结

参考

【猫吻鱼】Spring源码分析:全集整理

Spring版本:5.2.5.RELEASE