SHAN


  • Home

  • Archives

【Spring】AOP实现原理(一):Aop基础知识(基于注解)

Posted on 2022-04-10

AOP相关概念

在学习AOP实现原理之前,先了解下AOP相关基础知识。

AOP面向切面编程,它可以通过预编译方式或者基于动态代理对我们编写的代码进行拦截(也叫增强处理),在方法执行前后可以做一些操作,一般我们会看到以下几个概念:

连接点(JointPoint): AOP进行切入的位置称为连接点,一般指程序中的某个方法,对该方法进行拦截

通知(Advice): 在某个连接点执行的操作称为通知,也就是被拦截方法执行前后需要执行的操作称为通知,一共有五种

  • 前置通知:作用于被拦截方法执行之前
  • 后置通知:作用于被拦截方法执行之后进行的操作,无论被拦截方法是否抛出异常都会执行
  • 环绕通知:作用于被拦截方法执行之前和执行之后
  • 返回通知:作用于被拦截方法正常执行完毕返回时,如果抛出异常将不会执行
  • 异常通知:作用于被拦截方法抛出异常时

切点(Pointcut): 切点作用在于让程序知道需要在哪个连接点(方法)上执行通知,所以它也可以是一个表达式,匹配所有需要拦截的方法。

切面(Aspect): 切点和通知共同组成了切面,其中切点定义了需要在哪些连接点上执行通知,通知里面定义了具体需要进行的操作。

织入(Weaving):将切面连接到应用程序类型或者对象上,创建一个被通知的对象(advised object)的过程称为织入,换句话说织入就是将切面应用到目标对象的过程,它可以在编译期时(使用AspectJ)、加载时或者在运行时实现,Spring AOP是在运行时基于动态代理实现的。

Advisor:它是对切面的封装,使用了@AspectJ注解的类会被封装成Advisor。

Spring AOP和AspectJ区别

Spring AOP

Spring AOP是基于动态代理实现拦截功能的,默认使用JDK动态代理实现,当然这需要目标对象实现接口,如果目标对象没有实现接口,则使用CGLIB生成。

AspectJ

AspectJ提供了三种方式实现AOP:

  • 编译时织入:在编译期间将代码进行织入到目标类的class文件中。

  • 编译后织入:在编译后将代码织入到目标类的class文件中。

  • 加载时织入:在JVM加载class文件的时候进行织入。

Spring AOP的应用

了解了AOP相关知识后我们来实现一个需求:

  1. 自定义一个日志注解MyLogger
  2. 对使用了MyLogger注解的方法进行拦截,在方法的执行前后分别进行一些操作(环绕通知):
    • 方法执行前打印方法传入的参数
    • 方法执行后打印方法的返回值

自定义注解

1
2
3
4
5
6
7
8
import java.lang.annotation.*;

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyLogger {

}

定义切面Aspect

这里使用注解@Aspect来标记这是一个切面,切面是切点和通知的集合,分别使用注解@Pointcut和@Around实现。

1
2
3
4
5
6
7
@Slf4j
@Aspect // 使用注解定义切面
@Component
@EnableAspectJAutoProxy // 开启AOP
public class MyLogAspect {

}
切点Pointcut

使用表达式@annotation(com.demo.mybatis.annotation.MyLogger)匹配所有使用了@MyLogger注解的方法。

1
2
3
4
5
6
7
/**
* 定义切点,匹配所有使用了@MyLogger注解的方法
*/
@Pointcut("@annotation(com.example.annotation.MyLogger)") // 这里传入MyLogger的全路径
public void logPoiontcut() {

}
通知Advice

定义一个logAroudAdvice方法,使用@Around注解标记这是一个环绕通知,logPoiontcut()引用了切点,表示通知要作用于哪些连接点上,该方法需要传入一个ProceedingJoinPoint类型参数(连接点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 通知Advice,这里使用了环绕通知
* @param joinPoint 连接点
* @return
* @throws Throwable
*/
@Around("logPoiontcut()") // 引用切点
public Object logAroudAdvice(ProceedingJoinPoint joinPoint) throws Throwable {
// 方法执行前的日志打印
printBeforeLog(joinPoint);
// 执行方法
Object returnValue = joinPoint.proceed();
// 方法执行后的日志打印
printAfterLog(returnValue);
return returnValue;
}

完整的切面如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Slf4j
@Aspect // 使用注解定义切面
@Component
@EnableAspectJAutoProxy
public class MyLogAspect {

/**
* 定义切点,匹配所有使用了@MyLogger注解的方法
*/
@Pointcut("@annotation(com.example.annotation.MyLogger)")
public void logPoiontcut() {
}

/**
* 通知Advice,这里使用了环绕通知
* @param joinPoint 连接点
* @return
* @throws Throwable
*/
@Around("logPoiontcut()") // 引用切点
public Object logAroudAdvice(ProceedingJoinPoint joinPoint) throws Throwable {
// 方法执行前的日志打印
printBeforeLog(joinPoint);
// 执行方法
Object returnValue = joinPoint.proceed();
// 方法执行后的日志打印
printAfterLog(returnValue);
return returnValue;
}

/**
* 方法执行前的日志打印
* @param joinPoint
*/
public void printBeforeLog(ProceedingJoinPoint joinPoint) {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
// 获取方法
Method method = methodSignature.getMethod();
log.info("开始执行方法:{}", method.getName());
// 获取参数
Object[] args = joinPoint.getArgs();
if (args == null || args.length == 0) {
return;
}
// 获取参数名称
String[] parameterNames = methodSignature.getParameterNames();
StringBuilder parameterBuilder = new StringBuilder();
for (int i = 0; i < args.length; i++) {
parameterBuilder.append(parameterNames[i]).append(":").append(args[i]);
if (i < parameterNames.length - 1) {
parameterBuilder.append(",");
}
}
log.info("方法参数【{}】", parameterBuilder.toString());
}

/**
* 方法执行后的日志打印
* @param returnValue
*/
public void printAfterLog(Object returnValue) {
log.info("方法返回值【{}】", returnValue == null ? null : returnValue.toString());
}
}

测试

定义一个用于计算的Service,实现一个两数相加的方法addTwoNum,并使用@MyLogger注解,对方法进行拦截,在方法执行前后打印相关日志

1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@Service
public class ComputeService {

// 使用自定义日志注解对方法进行拦截
@MyLogger
public Integer addTwoNum(Integer value1, Integer value2) {
log.info("执行addTwoNum方法");
return value1 + value2;
}
}

编写单元测试:

1
2
3
4
5
6
7
@Autowired
private ComputeService computeService;

@Test
public void testAddTwoNum {
computeService.addTwoNum(1, 2);
}

由于ComputeService没有实现接口,可以看到Spring默认使用了CGLIB生成对象:

日志输出如下,可以看到方法执行前后打印了相关日志:

1
2
3
4
开始执行方法:addTwoNum
方法参数【value1:1,value2:2】
执行addTwoNum方法
方法返回值【3】

参考

Spring官方文档

【 FatalFlower】AspectJ 简介

【Spring】事务的执行原理(三)

Posted on 2022-04-07

事务的回滚

  1. 如果获取事务属性不为空,并且抛出的异常是RuntimeException或者Error类型,调用事务管理器中的rollback方法进行回滚

  2. 如果事务属性为空或者抛出的异常不是RuntimeException,也不是Error,将继续提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

/**
* 处理抛出异常下的事务
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 判空
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 如果事务属性不为空并且异常是是RuntimeException或者Error
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 获取事务管理器,调用rollback方法进行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// 如果事务属性为空或者异常不是RuntimeException或者Error,继续提交事务
try {
// 提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
}

// DefaultTransactionAttribute中实现了rollbackOn方法
public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {
/**
* 判断是否是RuntimeException或者Error
*/
@Override
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
}

rollback方法在AbstractPlatformTransactionManager中实现,主要分为以下三种情况:

  1. 判断事务是否设置了保存点,如果设置了将事务回滚到保存点
  2. 如果是一个独立的新事务,直接回滚即可
  3. 如果既没有设置保存点,也不是一个新事务,说明可能处于嵌套事务中,此时只设置回滚状态rollbackOnly为true,当它的外围事务进行提交时,如果发现回滚状态为true,则不提交

以上步骤执行完毕,调用cleanupAfterCompletion方法进行资源的清理已经挂起事务的恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/*
* 回滚
*/
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 转为DefaultTransactionStatus
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 处理回滚
processRollback(defStatus, false);
}

/**
* 处理回滚
*/
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
// 回滚之前的触发器
triggerBeforeCompletion(status);
// 是否有保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 回滚至保存点
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) { // 如果是一个独立的新事务
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 直接回滚
doRollback(status);
}
else {
// 如果处于一个嵌套的事务汇总
if (status.hasTransaction()) {
// 如果本地的回滚状态置为true 或者 事务失败进行全局回滚
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 设置事务rollbackOnly状态为true
doSetRollbackOnly(stat);
}
else {
// 打印日志,意思是由事务的组织者决定是否回滚
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
// 打印DEBUG日志,应该回滚但是没有获取到事务
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回滚之后的触发器
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清除相关资源并恢复挂起的事务
cleanupAfterCompletion(status);
}
}

/**
* 当一个事务失败以后,是否全局的标进行事务回滚
*/
public final boolean isGlobalRollbackOnParticipationFailure() {
return this.globalRollbackOnParticipationFailure;
}
}

回滚处理

rollbackToHeldSavepoint回滚至保存点

rollbackToHeldSavepoint方法在AbstractTransactionStatus中实现,它调用了getSavepointManager方法获取保存点管理器,调用SavepointManager的rollbackToSavepoint方法进行回滚的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractTransactionStatus implements TransactionStatus {

public void rollbackToHeldSavepoint() throws TransactionException {
Object savepoint = getSavepoint();
if (savepoint == null) {
throw new TransactionUsageException(
"Cannot roll back to savepoint - no savepoint associated with current transaction");
}
// getSavepointManager方法在DefaultTransactionStatus中实现
getSavepointManager().rollbackToSavepoint(savepoint);
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
}

SavepointManager是一个接口,它的继承关系如下:

DefaultTransactionStatus中获取SavepointManager的方法:

  1. 获取transaction对象,前面的知识可知这里是一个DataSourceTransactionObject
  2. 由继承关系可知DataSourceTransactionObject也是SavepointManager子类,所以将DataSourceTransactionObject转为SavepointManager返回
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class DefaultTransactionStatus extends AbstractTransactionStatus {
    @Override
    protected SavepointManager getSavepointManager() {
    // 前面的知识可知这里是一个DataSourceTransactionObject
    Object transaction = this.transaction;
    if (!(transaction instanceof SavepointManager)) {
    throw new NestedTransactionNotSupportedException(
    "Transaction object [" + this.transaction + "] does not support savepoints");
    }
    // 将DataSourceTransactionObject转为SavepointManager
    return (SavepointManager) transaction;
    }
    }

DataSourceTransactionObject是DataSourceTransactionManager的内部类,它继承了JdbcTransactionObjectSupport,rollbackToSavepoint方法在JdbcTransactionObjectSupport中实现:

  1. 获取ConnectionHolder,ConnectionHolder持有数据库连接
  2. 调用底层的rollback方法将事务回滚至保存点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
// 内部类DataSourceTransactionObject
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

}
}

// JdbcTransactionObjectSupport
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
/**
* 回滚至保存点
*/
@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
// 获取ConnectionHolder
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
// 调用底层的rollback方法将事务回滚至保存点
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
}
catch (Throwable ex) {
throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
}
}

/**
* 释放保存点
*/
@Override
public void releaseSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
// 调用底层的方法释放保存点
conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
}
catch (Throwable ex) {
logger.debug("Could not explicitly release JDBC savepoint", ex);
}
}
}

doRollback事务回滚

回滚事务时先获取数据库连接,然后调用底层的rollback进行回滚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {

@Override
protected void doRollback(DefaultTransactionStatus status) {
// 获取数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
// 获取数据库连接
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
// 调用底层的回滚方法
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
}

doSetRollbackOnly设置回滚状态

doSetRollbackOnly方法在DataSourceTransactionManager中实现:

  1. 将事务转为DataSourceTransactionObject对象,前面讲过DataSourceTransactionObject持有了数据库连接对象ConnectionHolder

  2. 将ConnectionHolder的rollbackOnly属性置为true,先标记事务的回滚状态,交由外围事务进行判断统一进行回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {

@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
// 获取数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
// 设置回滚状态
txObject.setRollbackOnly();
}

/**
* 内部类DataSourceTransactionObject
*/
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

// 省略其他方法

// 设置回滚状态
public void setRollbackOnly() {
// 将ConnectionHolder的rollbackOnly属性置为true,在ConnectionHolder的父类ResourceHolderSupport中实现
getConnectionHolder().setRollbackOnly();
}
}
}

// ConnectionHolder的父类ResourceHolderSupport
public abstract class ResourceHolderSupport implements ResourceHolder {

private boolean rollbackOnly = false;

/**
* 标记事务回滚状态为true
*/
public void setRollbackOnly() {
this.rollbackOnly = true;
}
}

资源清理

在事务回滚之后,需要清理相关的资源以及恢复被挂起的事务:

  1. 如果事务的newSynchronization状态为true,清除当前线程绑定的事务相关信息
    • 在TransactionSynchronizationManager的clear方法中实现,清理了当前线程绑定的事务名称、事务隔离级别等信息
  2. 如果是一个新事务,清除当前线程与数据库连接的绑定关系,在DataSourceTransactionManager的doCleanupAfterCompletion方法中实现
  3. 如果挂起的事务不为空,恢复挂起的事务
    • 获取数据源,恢复数据源与挂起事务的绑定关系
    • 恢复挂起事务与当前线程的同步信息
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

      /**
      * 回滚之后的清除操作
      * @see #doCleanupAfterCompletion
      */
      private void cleanupAfterCompletion(DefaultTransactionStatus status) {
      status.setCompleted();
      if (status.isNewSynchronization()) {
      // 清除当前线程绑定的信息
      TransactionSynchronizationManager.clear();
      }
      // 如果是一个新事务
      if (status.isNewTransaction()) {
      // 清除当前线程与数据库连接的绑定关系
      doCleanupAfterCompletion(status.getTransaction());
      }
      // 如果挂起的事务不为空
      if (status.getSuspendedResources() != null) {
      if (status.isDebug()) {
      logger.debug("Resuming suspended transaction after completion of inner transaction");
      }
      Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
      // 恢复挂起的事务
      resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
      }
      }

      /**
      * 恢复挂起的事务
      * @see #doResume
      * @see #suspend
      */
      protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
      throws TransactionException {

      if (resourcesHolder != null) {
      // 获取挂起的事务
      Object suspendedResources = resourcesHolder.suspendedResources;
      if (suspendedResources != null) {
      // 获取数据源,并与挂起的事务进行绑定
      doResume(transaction, suspendedResources);
      }
      // 挂起事务的同步信息
      List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
      if (suspendedSynchronizations != null) {
      // 恢复事务与线程的同步信息
      TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
      TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
      doResumeSynchronization(suspendedSynchronizations);
      }
      }
      }
      }

      // DataSourceTransactionManager
      public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
      implements ResourceTransactionManager, InitializingBean {
      // 主要是清除当前线程与数据库连接的绑定关系
      protected void doCleanupAfterCompletion(Object transaction) {
      DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

      // Remove the connection holder from the thread, if exposed.
      if (txObject.isNewConnectionHolder()) {
      TransactionSynchronizationManager.unbindResource(obtainDataSource());
      }

      // 获取数据库连接
      Connection con = txObject.getConnectionHolder().getConnection();
      try {
      if (txObject.isMustRestoreAutoCommit()) {
      // 自动提交置为true
      con.setAutoCommit(true);
      }
      // 重置数据库连接的相关设置
      DataSourceUtils.resetConnectionAfterTransaction(
      con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
      }
      catch (Throwable ex) {
      logger.debug("Could not reset JDBC Connection after transaction", ex);
      }

      if (txObject.isNewConnectionHolder()) {
      if (logger.isDebugEnabled()) {
      logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
      }
      // 释放连接
      DataSourceUtils.releaseConnection(con, this.dataSource);
      }
      // 清除当前线程与数据库连接的绑定关系
      txObject.getConnectionHolder().clear();
      }

      @Override
      protected void doResume(@Nullable Object transaction, Object suspendedResources) {
      // 获取数据源,并与挂起的事务进行绑定
      TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
      }
      }

      // TransactionSynchronizationManager
      public abstract class TransactionSynchronizationManager {

      // 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
      private static final ThreadLocal<Map<Object, Object>> resources =
      new NamedThreadLocal<>("Transactional resources");
      // 保存了线程绑定的事务同步信息TransactionSynchronization
      private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
      new NamedThreadLocal<>("Transaction synchronizations");

      // 保存了线程绑定的事务名称
      private static final ThreadLocal<String> currentTransactionName =
      new NamedThreadLocal<>("Current transaction name");
      // 保存了线程绑定的事务只读状态
      private static final ThreadLocal<Boolean> currentTransactionReadOnly =
      new NamedThreadLocal<>("Current transaction read-only status");

      // 保存了线程绑定的事务隔离级别
      private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
      new NamedThreadLocal<>("Current transaction isolation level");
      // 保存了线程绑定的事务活跃状态
      private static final ThreadLocal<Boolean> actualTransactionActive =
      new NamedThreadLocal<>("Actual transaction active");

      /**
      * 清理事务与当前线程的各种同步状态
      */
      public static void clear() {
      // 清除当前线程绑定的事务同步信息TransactionSynchronization
      synchronizations.remove();
      // 清除当前线程绑定的事务名称
      currentTransactionName.remove();
      // 清除线程绑定的事务只读状态
      currentTransactionReadOnly.remove();
      // 清除线程绑定的事务隔离级别
      currentTransactionIsolationLevel.remove();
      // 清除线程绑定的事务活跃状态
      actualTransactionActive.remove();
      }
      }

总结

参考

【猫吻鱼】Spring源码分析:全集整理

Spring版本:5.2.5.RELEASE

【Spring】事务的执行原理(二)

Posted on 2022-04-03

前置知识

事务的执行步骤如下:

  1. 获取事务管理器
  2. 创建事务
  3. 执行目标方法
  4. 捕捉异常,如果出现异常进行回滚
  5. 提交事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取TransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 1. 获取事务管理器
final TransactionManager tm = determineTransactionManager(txAttr);
// 省略部分代码
// ...
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 2. 创建事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 3. 执行方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 4. 回滚事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除事务
cleanupTransactionInfo(txInfo);
}
// 省略代码
// ...

// 5. 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// 省略代码
// ...
}
}
}

接下来,详细看一下事务的创建过程。

TransactionManager

事务管理器是Srping对事务进行管理的核心,PlatformTransactionManager里面定义了获取事务、提交事务、回滚事务的接口,不同的数据源可以有自己的实现,比如常见的JDBC数据源事务管理器DataSourceTransactionManager以及分布式事务管理器JtaTransactionManager:

PlatformTransactionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface PlatformTransactionManager extends TransactionManager {
/**
* 获取事务状态
*/
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;

/**
* 提交事务
*/
void commit(TransactionStatus status) throws TransactionException;

/**
* 回滚
*/
void rollback(TransactionStatus status) throws TransactionException;
}

TransactionStatus

PlatformTransactionManager的getTransaction返回的是TransactionStatus,TransactionStatus是一个接口,主要的实现在DefaultTransactionStatus中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

/**
* TransactionStatus可以获取事务的状态,也可以在发生异常时用于回滚
*/
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {

/**
* 返回事务是否设置了保存点
*/
boolean hasSavepoint();

/**
* flush所有的session
*/
@Override
void flush();

}

/**
* DefaultTransactionStatus
*/
public class DefaultTransactionStatus extends AbstractTransactionStatus {
// 事务
@Nullable
private final Object transaction;

// 是否新事务
private final boolean newTransaction;

private final boolean newSynchronization;

// 是否只读
private final boolean readOnly;

private final boolean debug;

// 挂起的事务
@Nullable
private final Object suspendedResources;

/**
* 构造函数
*/
public DefaultTransactionStatus(
@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;
this.readOnly = readOnly;
this.debug = debug;
this.suspendedResources = suspendedResources;
}
}

TransactionInfo

事务的创建方法createTransactionIfNecessary返回的是TransactionInfo对象,后续回滚事务、提交事务等操作,传入的都是TransactionInfo这个对象,它是TransactionAspectSupport的内部类,对事务管理器transactionManager、事务属性transactionAttribute、事务的状态transactionStatus等事务的相关信息进行了封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* TransactionAspectSupport的内部类TransactionInfo
*/
protected static final class TransactionInfo {
// 事务管理器
@Nullable
private final PlatformTransactionManager transactionManager;

// 事务属性
@Nullable
private final TransactionAttribute transactionAttribute;

private final String joinpointIdentification;

// 事务状态
@Nullable
private TransactionStatus transactionStatus;

// 旧的事务信息
@Nullable
private TransactionInfo oldTransactionInfo;

private void bindToThread() {
// 从Holder中获取当前线程绑定的事务信息
this.oldTransactionInfo = transactionInfoHolder.get();
// 更新当前线程对应的事务信息
transactionInfoHolder.set(this);
}
}

事务的创建

事务的创建分为两大部分:

  1. 调用事务管理器的getTransaction获取事务状态,返回的是TransactionStatus类型的对象
  2. 预处理事务,进行事务相关信息的封装以及事务和线程的绑定,返回的是TransactionInfo对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
/**
* 创建事务
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// 如果事务属性不为空,但是名称为空
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 事务状态
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 获取事务状态
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 预处理
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
}

获取事务状态

事务状态获取是调用PlatformTransactionManager事务管理器的getTransaction获取的,具体实现在AbstractPlatformTransactionManager中:

  1. 调用doGetTransaction方法获取事务,它是一个抽象方法,需要子类实现,数据源的不同具体的实现类也不同,接下来以常见的DataSourceTransactionManager为例,查看获取事务的具体实现逻辑
  2. 当前线程经存在事务,判断方式是通过当前线程是否持有数据库连接并且事务处于活跃状态
    • 如果存在事务,需要根据事务传播行为进行不同的处理
  3. 当前线程不存在事务
    • 如果事务的传播行为是PROPAGATION_MANDATORY,当前线程没有事务会抛出异常
    • 如果传播行为是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW或者PROPAGATION_NESTED,先挂起一个空事务,然后新建事务
    • 其他情况,调用prepareTransactionStatus创建TransactionStatus并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

/**
* 获取事务
*/
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 获取事务定义
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取事务
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 当前的线程是否已经存在事务
if (isExistingTransaction(transaction)) {
// 如果已经存在事务,根据事务传播行为的设置进行不同的处理
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 当前线程不存在事务
// 检查事务是否超时
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}

// 如果事务的传播行为是PROPAGATION_MANDATORY,表示当前线程没有事务会抛出异常,而走到这里说明当前没有事务,所以抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}// 如果传播行为是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW或者PROPAGATION_NESTED
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 因为当前线程不存在事务,所以先挂起一个空事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启新事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 其他情况,创建TransactionStatus
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
}
doGetTransaction获取事务

DataSourceTransactionManager实现了doGetTransaction获取事务的方法:

  1. 创建DataSourceTransactionObject对象,它是DataSourceTransactionManager内部类,持有一个ConnectionHolder对象,里面记录了数据库的连接
  2. 调用obtainDataSource获取数据源
  3. 从ThreadLocal中获取当前线程对应的Map资源数据集合,根据第2步中获取到的数据源从Map中获取对应的ConnectionHolder,也就是说当前线程绑定了某个数据源的连接,从ThreadLocal获取到数据库连接之后,将数据库连接设置到DataSourceTransactionObject中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {

/**
* 获取事务,返回的是DataSourceTransactionObject
*/
@Override
protected Object doGetTransaction() {
// 创建DataSourceTransactionObject
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 设置是否允许保存点
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 根据数据源获取当前线程绑定的ConnectionHolder对象,ConnectionHolder中存有数据库连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 设置ConnectionHolder对象
txObject.setConnectionHolder(conHolder, false);
return txObject;
}

/**
* 获取数据源
*/
protected DataSource obtainDataSource() {
// 获取数据源
DataSource dataSource = getDataSource();
Assert.state(dataSource != null, "No DataSource set");
return dataSource;
}

/**
* 内部类,DataSourceTransactionObject,记录了数据源信息
*/
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

private boolean newConnectionHolder;

private boolean mustRestoreAutoCommit;

// 设置连接信息,connectionHolder在父类JdbcTransactionObjectSupport中
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
super.setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
}
}
}

// JdbcTransactionObjectSupport
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
// ConnectionHolder,记录了数据库连接
@Nullable
private ConnectionHolder connectionHolder;
}

// ConnectionHolder
public class ConnectionHolder extends ResourceHolderSupport {
// 数据库连接
@Nullable
private Connection currentConnection;
}

TransactionSynchronizationManager

TransactionSynchronizationManager中保存了线程绑定的各种信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public abstract class TransactionSynchronizationManager {
// 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 保存了线程绑定的事务同步信息TransactionSynchronization
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");

// 保存了线程绑定的事务名称
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
// 保存了线程绑定的事务只读状态
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");

// 保存了线程绑定的事务隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
// 保存了线程绑定的事务活跃状态
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");

/**
* 根据key(数据源dataSource)获取当前线程绑定的资源,也就是ConnectionHoler
*/
@Nullable
public static Object getResource(Object key) {
// 构建KEY
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// 获取资源
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}

/**
* 根据KEY获取资源
*/
@Nullable
private static Object doGetResource(Object actualKey) {
// 获取当前线程对应的数据,其中KEY为根据数据源构建出的KEY,value为ConnectionHoler
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
// 根据KEY获取数据源
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
}
isExistingTransaction是否存在事务

isExistingTransaction同样在DataSourceTransactionManager中实现,具体是是通过DataSourceTransactionObject是否持有数据库连接并且事务处于活跃状态来判断是否存在事务的:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
/**
* 是否存在事务
*/
@Override
protected boolean isExistingTransaction(Object transaction) {
// 转换为DataSourceTransactionObject
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 通过判断DataSourceTransactionObject是否持有ConnectionHolder并且事务处于活跃状态
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
}
handleExistingTransaction当程已存在事务情况下的处理

在当前线程已存在事务的情况下:

  1. 如果事务传播行为设是PROPAGATION_NEVER,表示以非事务的方式执行,如果当前存在事务,将抛出异常
  2. 如果事务的传播行为是PROPAGATION_NOT_SUPPORTED,表示以非事务的方式执行,如果当前存在事务,则挂起当前的事务,不使用事务
  3. 如果事务的传播行为是PROPAGATION_REQUIRES_NEW,需要挂起当前事务,创建一个自己的事务
  4. 如果事务的传播行为是PROPAGATION_NESTED,判断是否使用保存点
    • 如果是使用嵌套事务
    • 如果不是,开启一个新事务
  5. 非以上四种情况,使用当前的事务,调用prepareTransactionStatus方法创建TransactionStatus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

/**
* 获取事务
*/
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
/**
* 处理存在的事务,主要是对事务传播行为的判断和处理
*/
private TransactionStatus handleExistingTransaction (
TransactionDefinition definition, Object transaction,boolean debugEnabled)
throws TransactionException {
// 如果事务传播行为设置的是PROPAGATION_NEVER,表示以非事务的方式执行,所以当前存在事务将抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 如果事务的传播行为是PROPAGATION_NOT_SUPPORTED,表示以非事务的方式执行,如果当前存在事务,则挂起当前事务,不使用事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 创建TransactionStatus,第二个参数传入的是事务,可以看到这里设置的为NULL,不使用事务
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果事务的传播行为是PROPAGATION_REQUIRES_NEW,需要挂起当前事务创建新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 新建事务
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 如果是PROPAGATION_NESTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 是否允许嵌套事务
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 是否使用保存点
if (useSavepointForNestedTransaction()) {
// 创建DefaultTransactionStatus,开启嵌套事务
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 创建保存点
status.createAndHoldSavepoint();
// 返回事务状态
return status;
} else {
// 新建事务
return startTransaction(definition, transaction, debugEnabled, null);
}
}

// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 校验存在事务的合法性
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建TransactionStatus
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
}
}
prepareTransactionStatus创建TransactionStatus

prepareTransactionStatus用于创建TransactionStatus,具体创建的是DefaultTransactionStatus类型的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

/**
* 创建TransactionStatus
*
* @param definition 事务定义
* @param transaction 事务
* @param newTransaction 是否新事务
* @param newSynchronization 是否需要同步线程绑定的信息
* @param debug
* @param suspendedResources 挂起的事务
* @return
*/
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 创建DefaultTransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// 初始化事务的同步信息
prepareSynchronization(status, definition);
return status;
}

/**
* 初始化事务的同步信息
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
// 设置当前线程绑定的线程活跃状态
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 设置绑定的事务隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 设置绑定的事务名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
}
suspend挂起事务

挂起事务其实是将当前事务的相关设置清除,并解绑当前线程对应的数据库连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/**
* 挂起事务
*/
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 判断活跃状态
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂起事务
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
// 清除当前事务的各种信息
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
}

// DataSourceTransactionManager挂起事务
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Override
protected Object doSuspend(Object transaction) {
// 转为DataSourceTransactionObject
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 将绑定的数据库连接置为null
txObject.setConnectionHolder(null);
// 解绑当前线程的对应的数据源连接
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
}

// 解绑资源
public abstract class TransactionSynchronizationManager {

// 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");

/**
* 解绑当前线程对应的资源
*/
public static Object unbindResource(Object key) throws IllegalStateException {
// 根据对象获取KEY
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// 解绑当前线程绑定的资源
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}

/**
* 解绑资源
*/
@Nullable
private static Object doUnbindResource(Object actualKey) {
// 获取当前线程对应的数据
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
// 从Map中移除资源
Object value = map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
}
startTransaction新建事务

startTransaction用于开启一个新事务,在AbstractPlatformTransactionManager中实现:

  1. 创建TransactionStatus,可以看到在构造函数中传入了事务定义、当前事务、是否新建事务(置为了true)、是否同步、是否debug、挂起的事务这些参数
  2. 调用doBegin方法设置事务的一些信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

/**
* 开启一个新事务
*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建新事务DefaultTransactionStatus,第三个参数表示是否新建事务,这里置为了true, suspendedResources记录了挂起的事务
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 设置事务的相关信息
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}

doBegin方法在DataSourceTransactionManager中实现:

  1. 判断当前事务是否持有数据库连接获取事务与数据库的同步状态为true,从数据源中新建一个数据库连接,并与当前事务绑定
  2. 如果开启了自动提交,将自动提交置为false
  3. 如果是新创建的连接,将数据库连接ConnectionHolder绑定到当前线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// DataSourceTransactionManager实现了doBegin
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 转为DataSourceTransactionObject
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
// 如果未持有连接信息或者事务与数据库的同步状态为true
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 从数据源获取一个新的连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 将连接信息设置到当前的事务中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 将事务与数据库连接的同步状态置为true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 获取数据库连接
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// 是否是自动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 自动提交置为false
con.setAutoCommit(false);
}
// 处理连接
prepareTransactionalConnection(con, definition);
// 事务置为活跃状态
txObject.getConnectionHolder().setTransactionActive(true);
// 超时时间获取
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 是否是新建的连接
if (txObject.isNewConnectionHolder()) {
// 将数据库连接ConnectionHolder绑定到当前线程
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
}

事务预处理

  1. 创建TransactionInfo事务信息,它是TransactionAspectSupport的内部类,将事务管理器transactionManager、事务属性transactionAttribute、事务的状态transactionStatus等事务相关的信息进行了封装,并实现了将事务绑定到当前线程的方法
  2. 调用TransactionInfo的bindToThread方法将事务与当前线程绑定,是通过ThreadLocal实现的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
/**
* 准备事务
*/
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 创建事务信息
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 设置事务状态
txInfo.newTransactionStatus(status);
}
else {
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}

// 将事务绑定到当前线程
txInfo.bindToThread();
return txInfo;
}

/**
* 使用ThreadLocal记录线程绑定的事务
*/
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");

/**
* 内部类TransactionInfo
*/
protected static final class TransactionInfo {

@Nullable
private final PlatformTransactionManager transactionManager;

@Nullable
private final TransactionAttribute transactionAttribute;

private final String joinpointIdentification;

@Nullable
private TransactionStatus transactionStatus;

@Nullable
private TransactionInfo oldTransactionInfo;

private void bindToThread() {
// 从Holder中获取当前线程绑定的事务信息
this.oldTransactionInfo = transactionInfoHolder.get();
// 更新当前线程对应的事务信息
transactionInfoHolder.set(this);
}
}
}

总结

参考

【猫吻鱼】Spring源码分析:全集整理

Spring版本:5.2.5.RELEASE

【Spring】事务的执行原理(一)

Posted on 2022-04-02

在使用事务的时候需要添加@EnableTransactionManagement注解来开启事务,那么就从@EnableTransactionManagement入手查看一下事务的执行原理。

@EnableTransactionManagement

  1. Spring事务底层是通过AOP来完成的,而Spring AOP基于动态代理实现,可以看到mode方法默认返回了PROXY代理模式,我们只需关注代理模式下的执行流程即可
  2. 使用@Import导入了TransactionManagementConfigurationSelector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {

// 是否代理目标类
boolean proxyTargetClass() default false;

// 默认使用代理模式
AdviceMode mode() default AdviceMode.PROXY;

int order() default Ordered.LOWEST_PRECEDENCE;
}

TransactionManagementConfigurationSelector

在selectImports方法中可以看到对模式进行了判断:

  1. 如果是基于代理模式,返回AutoProxyRegistrar和ProxyTransactionManagementConfiguration类
  2. 如果是基于ASPECTJ,调用determineTransactionAspectClass方法

Spring默认使用的是代理模式,所以接下来看下AutoProxyRegistrar和ProxyTransactionManagementConfiguration里面都有什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {

@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// 如果基于代理模式
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
// 如果基于ASPECTJ
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}

private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
}

AutoProxyRegistrar

AutoProxyRegistrar实现了ImportBeanDefinitionRegistrar接口,ImportBeanDefinitionRegistrar可以向容器中注册Bean,跟着registerBeanDefinitions方法看下它会向容器中注册什么样的bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
for (String annType : annTypes) {
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (candidate == null) {
continue;
}
Object mode = candidate.get("mode");
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
candidateFound = true;
if (mode == AdviceMode.PROXY) {
// 调用AopConfigUtils的registerAutoProxyCreatorIfNecessary向容器中注册bean
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
// 省略...
}
}

AopConfigUtils

在AopConfigUtils中一共有三种自动代理创建器:

  1. InfrastructureAdvisorAutoProxyCreator
  2. AspectJAwareAdvisorAutoProxyCreator
  3. AnnotationAwareAspectJAutoProxyCreator

在registerAutoProxyCreatorIfNecessary方法中,可以看到事务使用的是InfrastructureAdvisorAutoProxyCreator类型的创建器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public abstract class AopConfigUtils {

/**
* 自动代理创建器BeanName
*/
public static final String AUTO_PROXY_CREATOR_BEAN_NAME =
"org.springframework.aop.config.internalAutoProxyCreator";

/**
* 所有的自动代理创建器集合
*/
private static final List<Class<?>> APC_PRIORITY_LIST = new ArrayList<>(3);

static {
// 初始化
APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class);
APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class); // AspectJ
APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class); // 注解
}

// AutoProxyRegistrar中调用的registerAutoProxyCreatorIfNecessary方法
@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry) {
// 会调用下面那个registerAutoProxyCreatorIfNecessary方法
return registerAutoProxyCreatorIfNecessary(registry, null);
}

@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(
BeanDefinitionRegistry registry, @Nullable Object source) {
// 调用registerOrEscalateApcAsRequired进行注册,这里传入的是InfrastructureAdvisorAutoProxyCreator类型的
return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

@Nullable
private static BeanDefinition registerOrEscalateApcAsRequired(
Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {

Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
// 判断容器中是否已经包含代理创建器
if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
// 从容器中获取
BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
// 判断容器中已经存在的创建器的优先级
int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
// 需要的创建器的优先级
int requiredPriority = findPriorityForClass(cls);
// 如果容器中已经存在的创建器的优先级小于需要创建的
if (currentPriority < requiredPriority) {
// 使用优先级高的
apcDefinition.setBeanClassName(cls.getName());
}
}
return null;
}
// 创建RootBeanDefinition
RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
// 设置source
beanDefinition.setSource(source);
beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
// 注册代理创建器
registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
return beanDefinition;
}
}

总结

AutoProxyRegistrar实现ImportBeanDefinitionRegistrar是为了向容器中注册代理创建器,事务默认使用的是InfrastructureAdvisorAutoProxyCreator类型的。

ProxyTransactionManagementConfiguration

1. AOP概念

Advice通知:定义在切点上需要执行什么样的操作

PointCut切点:定义在哪些方法上使用通知

Advisor:Advice和Pointcut加起来组成了Advisor

2. 事务中的Advisor

我们已经知道事务是基于AOP实现的,在transactionAdvisor方法中可以看到创建了Advisor,然后设置了事务属性TransactionAttributeSource和事务拦截器TransactionInterceptor:

  • TransactionAttributeSource,从名字上可以看出是和事务的属性设置相关的
  • TransactionInterceptor事务拦截器相当于Advice通知
  • BeanFactoryTransactionAttributeSourceAdvisor是Advisor

Advisor由Advice和PointCut组成,现在Advice已经有了,接下来看下Pointcut在哪里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
// 创建Advisor
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
// 设置TransactionAttributeSource,类型为AnnotationTransactionAttributeSource
advisor.setTransactionAttributeSource(transactionAttributeSource);
// 设置事务拦截器,相当于Advice
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
// 创建AnnotationTransactionAttributeSource
return new AnnotationTransactionAttributeSource();
}

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
// 创建事务拦截器
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}

BeanFactoryTransactionAttributeSourceAdvisor

BeanFactoryTransactionAttributeSourceAdvisor继承关系如下:

除了继承父类的属性和方法,它自己还有两个成员变量:

  1. transactionAttributeSource,实际传入的是AnnotationTransactionAttributeSource类型的对象
  2. TransactionAttributeSourcePointcut类型的切点pointcut
    • 切点在实例化时实现了getTransactionAttributeSource方法,返回了transactionAttributeSource,后面的方法中需要调用此方法获取transactionAttributeSource

由上面的分析可知,在创建BeanFactoryTransactionAttributeSourceAdvisor的时候,设置了TransactionInterceptor和TransactionAttributeSource,TransactionInterceptor相当于Advice,而这里我们看到了它还有一个TransactionAttributeSourcePointcut切点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

@Nullable
private TransactionAttributeSource transactionAttributeSource; // 实际传入的是AnnotationTransactionAttributeSource类型的

// 创建切点
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {

// 实现了getTransactionAttributeSource方法,返回的是AnnotationTransactionAttributeSource
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return transactionAttributeSource;
}
};

public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
// 设置TransactionAttributeSource
this.transactionAttributeSource = transactionAttributeSource;
}

@Override
public Pointcut getPointcut() {
return this.pointcut;
}
}

TransactionAttributeSourcePointcut

TransactionAttributeSourcePointcut是一个切点,它的继承关系如下:

Pointcut和MethodMatcher

Pointcut接口中定义了两个方法:

  1. 获取ClassFilter,ClassFilter是一个接口,里面定义了matches方法,检查切点是否与类匹配
  2. 获取MethodMatcher,它也是一个接口,并且定义了matches方法,检查切点是否与方法匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public interface Pointcut {
/**
* 返回类过滤器ClassFilter
*/
ClassFilter getClassFilter();

/**
* 返回MethodMatcher
*/
MethodMatcher getMethodMatcher();
}

public interface MethodMatcher {
/**
* 检查方法是否匹配pointcut
*/
boolean matches(Method method, Class<?> targetClass);

/**
* 检查方法是否匹配pointcut
*/
boolean matches(Method method, Class<?> targetClass, Object... args);
}

@FunctionalInterface
public interface ClassFilter {
/**
* 检查类是否与pointcut匹配
*/
boolean matches(Class<?> clazz);
}

TransactionAttributeSourcePointcut是Pointcut和MethodMatcher的子类:

  1. 在构造函数中设置了ClassFilter,类型为TransactionAttributeSourceClassFilter,它是TransactionAttributeSourcePointcut的一个内部类,实现了ClassFilter接口中定义的matches方法,检查pointcut与类是否匹配:
    • 如果是TransactionalProxy、PlatformTransactionManager或者PersistenceExceptionTranslator的子类,则不匹配
    • 获取TransactionAttributeSource,调用它的isCandidateClass方法判断是否匹配
  2. 实现了MethodMatcher接口中定义的matches方法,检查pointcut是否匹配当前的方法
    • 获取TransactionAttributeSource判断是否为空,如果不为空则调用getTransactionAttribute获取事务属性,TransactionAttributeSource为空,或者从TransactionAttributeSource获取到的事务属性不为空都会返回true

所以一个方法执行时开启事务,需要满足两个条件,当前的方法和类都需要与事务的pointcut匹配,对应的方法分别是MethodMatcher的matches和ClassFilter的matches方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {

protected TransactionAttributeSourcePointcut() {
// 设置ClassFilter
setClassFilter(new TransactionAttributeSourceClassFilter());
}

// 方法是否与切点匹配
@Override
public boolean matches(Method method, Class<?> targetClass) {
// 获取TransactionAttributeSource,由上面的步骤可知返回的是AnnotationTransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
// 如果TransactionAttributeSource为空,或者从TransactionAttributeSource获取到的事务属性不为空都会返回true
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

// ClassFilter过滤器
private class TransactionAttributeSourceClassFilter implements ClassFilter {

// 切点是否与类匹配
@Override
public boolean matches(Class<?> clazz) {
// 如果TransactionalProxy、PlatformTransactionManager或者PersistenceExceptionTranslator的子类,则不匹配
if (TransactionalProxy.class.isAssignableFrom(clazz) ||
PlatformTransactionManager.class.isAssignableFrom(clazz) ||
PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
return false;
}
// 获取TransactionAttributeSource,由上文可知是AnnotationTransactionAttributeSource类型的
TransactionAttributeSource tas = getTransactionAttributeSource();
// 调用isCandidateClass方法判断是否是匹配
return (tas == null || tas.isCandidateClass(clazz));
}
}
}

AnnotationTransactionAttributeSource

条件一:检查类是否匹配事务切点

上面分析可知,检查类是否与切点匹配时获取了TransactionAttributeSource,调用它的isCandidateClass方法进行判断,TransactionAttributeSource的具体实现是AnnotationTransactionAttributeSource:

  1. 在构造方法中,添加了注解解析器:
    • Spring事务注解解析器的实现类为SpringTransactionAnnotationParser,也是默认的注解解析器。
    • 如果开启了JTA或者EJB,将会分别添加对应的解析器。
  2. 实现了isCandidateClass方法,实际又是调用注解解析器的isCandidateClass判断是否是候选类的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@SuppressWarnings("serial")
public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource
implements Serializable {

private static final boolean jta12Present;

private static final boolean ejb3Present;

static {
ClassLoader classLoader = AnnotationTransactionAttributeSource.class.getClassLoader();
jta12Present = ClassUtils.isPresent("javax.transaction.Transactional", classLoader);
ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute", classLoader);
}

private final boolean publicMethodsOnly;
// 注解解析器集合
private final Set<TransactionAnnotationParser> annotationParsers;

public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
this.publicMethodsOnly = publicMethodsOnly;
// 添加注解解析器
if (jta12Present || ejb3Present) {
this.annotationParsers = new LinkedHashSet<>(4);
// 添加Spring事务注解解析器
this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {
// JTA事务注解解析器
this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {
// EJB3事务注解解析器
this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}
}
else {
// 添加Spring事务注解解析器
this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
}
}

// 判断是否是候选类
@Override
public boolean isCandidateClass(Class<?> targetClass) {
for (TransactionAnnotationParser parser : this.annotationParsers) {
// 调用解析器的isCandidateClass方法判断是否是候选类
if (parser.isCandidateClass(targetClass)) {
return true;
}
}
return false;
}
}

SpringTransactionAnnotationParser

  1. SpringTransactionAnnotationParser实现了isCandidateClass方法,它又调用了AnnotationUtils的isCandidateClass判断目标类是否是Transactional注解的候选类,AnnotationUtils中isCandidateClass的具体判断逻辑如下:

    • 如果注解类路径以java.开头,返回true,这里Transactional注解不是java.开头,它是Spring的注解类,所以这个条件不会成立

    • 如果目标类的类路径以java.开头,或者是Ordered类型,isCandidateClass返回false,说明目标类不是某个注解的候选类

    • 除去以上两种情况之外,isCandidateClass都返回true

      总结: 如果目标类的类路径不以java.开头(也就是说它不是java的相关类),也不是Ordered类型,说明目标类是Transactional注解的候选类。

  2. SpringTransactionAnnotationParser实现了parseTransactionAnnotation方法,里面包含对事物属性的解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {

@Override
public boolean isCandidateClass(Class<?> targetClass) {
// 是否是Transactional注解的候选类
return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}

// 解析注解属性
@Override
@Nullable
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
// 解析注解属性
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}

/**
* 解析事务注解
*/
public TransactionAttribute parseTransactionAnnotation(Transactional ann) {
return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false));
}

/**
* 解析事务注解
*/
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
// 事务的传播行为
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));

List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);

return rbta;
}

}

// AnnotationUtils
public abstract class AnnotationUtils {

/**
* 检查目标类clazz是否是注解的候选类
*/
public static boolean isCandidateClass(Class<?> clazz, Collection<Class<? extends Annotation>> annotationTypes) {
for (Class<? extends Annotation> annotationType : annotationTypes) {
// 传入目标类和注解,SpringTransactionAnnotationParser传入的注解是Transactional
if (isCandidateClass(clazz, annotationType)) {
return true;
}
}
return false;
}

/**
* 检查目标类clazz是否是注解的候选类
*/
public static boolean isCandidateClass(Class<?> clazz, Class<? extends Annotation> annotationType) {
// 传入目标类和注解类路径
return isCandidateClass(clazz, annotationType.getName());
}

/**
* 检查目标类clazz是否是注解的候选类
*/
public static boolean isCandidateClass(Class<?> clazz, String annotationName) {
// 注解类路径是否是java.开头
if (annotationName.startsWith("java.")) {
return true;
}
// 调用AnnotationsScanner的hasPlainJavaAnnotationsOnly方法判断
// hasPlainJavaAnnotationsOnly方法的判断逻辑是:如果目标类的类路径以java.开头或者是Ordered类会返回true
// 所以如果目标类的类路径以java.开头或者是Ordered类,isCandidateClass会返回false,说明目标类不是注解的候选类
if (AnnotationsScanner.hasPlainJavaAnnotationsOnly(clazz)) {
return false;
}
// 如果注解类路径不是java.开头,并且目标类的类路径不以java.开头,也不是Ordered类型,返回true,说明目标类是某个注解的候选类
return true;
}
}

// AnnotationsScanner
abstract class AnnotationsScanner {
static boolean hasPlainJavaAnnotationsOnly(Class<?> type) {
// 如果目标类的类路径以java.开头或者是Ordered类
return (type.getName().startsWith("java.") || type == Ordered.class);
}
}

条件二:检查方法是否匹配事务切点

AbstractFallbackTransactionAttributeSource

如果从TransactionAttributeSource获取到的事务属性不为空将会满足切点的匹配条件,获取事务属性的方法实现在AbstractFallbackTransactionAttributeSource类中:

  1. 如果当前方法是Object中的方法,返回空
  2. 根据当前的方法和类的信息构建缓存key,从缓存中获取
    • 如果获取不为空,判断是否为空的事务属性NULL_TRANSACTION_ATTRIBUTE,如果是则返回null,否则返回从缓存中获取到的事务属性
    • 如果获取为空,调用解析事务属性的方法进行解析,然后放入缓存中并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public abstract class AbstractFallbackTransactionAttributeSource implements TransactionAttributeSource {

// 空的TransactionAttribute
@SuppressWarnings("serial")
private static final TransactionAttribute NULL_TRANSACTION_ATTRIBUTE = new DefaultTransactionAttribute() {
@Override
public String toString() {
return "null";
}
};
// 缓存
private final Map<Object, TransactionAttribute> attributeCache = new ConcurrentHashMap<>(1024);

// 获取事务属性
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 如果当前方法所在的类是Object
if (method.getDeclaringClass() == Object.class) {
return null;
}
// 构建cacheKey
Object cacheKey = getCacheKey(method, targetClass);
// 首先根据cacheKey从缓存中获取
TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) {
// 判断是否为空的TRANSACTION_ATTRIBUTE
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
// 返回空
return null;
}
else {
// 返回事务属性TransactionAttribute
return cached;
}
}
else {
// 解析TransactionAttribute
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// 如果为空,也加入缓存,但是value是NULL_TRANSACTION_ATTRIBUTE
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}
else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
// 如果是DefaultTransactionAttribute类型的
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isTraceEnabled()) {
logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
}
// 加入缓存
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}

// 获取缓存KEY
protected Object getCacheKey(Method method, @Nullable Class<?> targetClass) {
return new MethodClassKey(method, targetClass);
}

/**
* 解析TransactionAttribute
*/
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 判断方法是否是public的
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}

// 获取目标方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

// 在方法上查找事务属性的设置,findTransactionAttribute方法在AnnotationTransactionAttributeSource中实现
TransactionAttribute txAttr = findTransactionAttribute方法(specificMethod);
if (txAttr != null) {
return txAttr;
}

// 在目标类上面查找事务属性的设置
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}

if (specificMethod != method) {
// 使用方法上配置的事务属性
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// 使用类上面配置的事务属性
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
}

总结

事务是基于AOP实现的,事务的Advisor是BeanFactoryTransactionAttributeSourceAdvisor,Advisor判断方法是否匹配时,是通过Pointcut的matches方法判断的,事务的Pointcut是TransactionAttributeSourcePointcut,里面实现了方法是否与事务切点匹配的判断:

  1. 对类的匹配是通过判断目标类是否是Transactional注解的候选类实现的,我们创建的类一般不会以java.开头,所以说可以与Transactional注解匹配成功。

  2. 对方法的匹配是通过解析方法上面配置的事务属性判断的,如果解析到了事务属性,则满足匹配条件。

TransactionInterceptor

TransactionInterceptor是事务Advisor的Advice,执行目标方法时,方法会被拦截,进入到TransactionInterceptor中,在TransactionInterceptor的invoke方法中实际是调用invokeWithinTransaction执行的:

1
2
3
4
5
6
7
8
9
10
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// 通过事务执行目标方法,实现在TransactionAspectSupport方法中
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}

TransactionAspectSupport

TransactionAspectSupport中实现了invokeWithinTransaction方法:

  1. 获取事务属性TransactionAttribute和TransactionManager事务管理器
  2. 对响应式事务、声明式事务和编程式事务分别进行判断,以声明式事务为例步骤如下:
    • 创建事务
    • 执行方法
    • 捕捉异常,如果抛出异常进行回滚
    • 清除事务信息
    • 提交事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 获取TransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
// 响应式事务处理
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务的处理
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// 这是一个环绕通知,将会在拦截链中执行下一个拦截
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 捕捉异常,进行回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除事务
cleanupTransactionInfo(txInfo);
}

if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 编程式事务的处理
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();

// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
// 获取事务信息
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
// 执行方法
Object retVal = invocation.proceedWithInvocation();
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
// 清除事务信息
cleanupTransactionInfo(txInfo);
}
});

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
}

总结

参考

【猫吻鱼】Spring源码分析:全集整理

Spring版本:5.2.5.RELEASE

【JAVA】IO访问方式

Posted on 2021-11-07

物理内存

计算机物理内存条的容量,比如我们买电脑会关注内存大小有多少G,这个容量就是计算机的物理内存。

虚拟内存

操作系统为每个进程分配了独立的虚拟地址空间,也就是虚拟内存,虚拟地址空间又分为用户空间和内核空间,操作系统的位数不同,虚拟地址空间的大小也不同,32位操作系统虚拟地址内核空间为1G,用户空间大小为3G,64位操作系统用户空间和内核空间大小各为128T:

既然每个进程都拥有一块独立的虚拟地址空间,那么所有进程的虚拟地址空间大小加起来必定大于物理内存的大小,所以虚拟地址空间只是一个虚拟的概念,只有需要分配内存的时候才会为虚拟内存分配物理内存,并通过内存映射来管理虚拟地址和物理内存地址之间的映射关系。

用户空间 / 内核空间

用户空间:是运行用户程序代码的地方,为了保证系统内核的安全,它不能直接访问内存等硬件设备,必须通过系统调用进入到内核空间来访问那些受限的资源。

内核空间:是运行内核代码的地方,可以执行任意的指令访问系统资源,既可以访问内核空间也可以访问用户空间。

用户态:进程运行在用户空间时处于用户态。

内核态:程运行在内核空间时处于内核态。

文件I/O

比如我们启动了一个java程序,此时运行在用户空间(用户态),接着准备做一个读取磁盘文件的操作,由于用户空间是无法直接从磁盘读取文件的,所以需要调用内核提供的接口来完成文件的读取,调用内核的接口的过程中由用户空间进入到了内核空间(内核态),通过DMA从磁盘读取文件到内核的缓冲区,之后再将数据从内核的缓冲区拷贝到用户空间完成文件的读取操作。

  1. 应用程序发起系统调用
  2. 从用户空间切换到内核空间,内核通过DMA从磁盘拷贝数据到内核缓冲区
  3. 将内核缓冲区的数据拷贝到用户空间的缓冲区

可以发现,整个读取过程发生了两次数据拷贝,一次是DMA将磁盘上的文件数据拷贝到内核缓冲区,一次是将内核缓冲区的数据拷贝到用户缓冲区。写操作与读取操作类似,只不过是将用户缓冲区的数据拷贝到内核缓冲区,再将内核缓冲区的数据拷贝到文件。

网络I/O

网络I/O与文件I/O的底层原理一致,只不过文件I/O是从磁盘读取文件,网络I/O是从网卡中读取数据,客户端与服务端建立连接,当有数据到达时,从网卡中读取数据到内核缓冲区,再将内核缓冲区的数据复制到用户空间的缓冲区。

缓存I/O

也称标准I/O,上面提到的文件I/O和网络I/O读取数据的方式都是使用的缓存I/O,需要将数据先拷贝到内核缓冲区,再将内核缓冲区的数据拷贝到用户缓冲区,数据经过两次拷贝,内核缓冲区和用户缓冲区分别指向不同的物理内存,其中内核缓存区是在Page Cache层:

标准I/O读取文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class InputStreamTest {
public static void main(String[] args) {
File file = new File("/Users/sml/test.txt");
try (InputStream is = new FileInputStream(file)){
// 建立用户空间缓冲区
byte[] bytes = new byte[1024];
int len;
while ((len = is.read(bytes)) != -1) {
System.out.println(new String(bytes));
}

} catch (IOException e) {
e.printStackTrace();
}
}
}

为什么需要缓存IO?

因为磁盘I/O是比较耗时的操作,如果每次都从磁盘上读取文件,性能将会大大下降,为了提升读取性能,增加了一层Page Cache,用于缓存读取的文件数据,Page Cache占用的是内存,从内存读取的速度远远大于从磁盘读取,内核缓冲区就是在Page Cache中开辟的一块内存,用户空间进行系统调用读取文件内容时,首先会判断Page Cache中是否缓存了文件的内容,如果缓存了直接读取即可,否则再从磁盘读取,所以缓存I/O可以减少磁盘I/O的次数提升性能。

文件的写操作同样如此,进行写操作时,将数据先写到Page Cache的缓冲区中,后续由操作系统将数据刷回到磁盘中。

缓存I/O的优缺点

优点:减少磁盘I/O次数,提升读写性能。

缺点:数据需要在内核空间和用户空间来回拷贝。

直接I/O

缓存I/O经过了Page Cache,读取过程中需要将数据从Page Cache的缓冲区中拷贝到用户空间的缓存区,那么有没有一种方式可以省去这个拷贝的过程?

答案是有的,那就是直接I/O,应用程序直接访问磁盘数据,绕过了Page Cache,省去了从内核缓冲区拷贝到用户缓冲区的过程。

目前JAVA并没有原生的直接/O操作方式,不过公众号博主Kirito提供了在JAVA中进行直接I/O操作的方法,具体参见【Kirito的技术分享】Java 文件 IO 操作之 DirectIO。

内存映射

内存映射就是将虚拟空间地址映射到物理空间地址,每个进程维护了一张页表,记录虚拟地址和物理地址之间的映射关系,当进程访问的虚拟地址在页表中无法查到映射关系时,系统产生缺页异常,进入内核空间为虚拟地址分配物理内存,并更新页表,记录映射关系。

文件映射

内存映射除了映射虚拟空间地址和物理空间地址,还包括将磁盘的文件内容映射到虚拟地址空间,称为文件映射,此时可以通过访问内存来访问文件里面的数据 。

mmap系统调用可以将文件映射到虚拟内存空间。文件映射的流程如下:

  1. 进行mmap系统调用,将文件和虚拟地址空间建立映射,注意此时还没有分配物理内存空间,只是在逻辑上建立了虚拟地址和文件之间的映射关系,物理内存只有真正使用的时候才会分配。
  2. 应用程序访问用户空间虚拟内存中的某个地址,发现无法在页表中查到数据,产生缺页异常,此时进入内核空间
  3. 因为不能直接使用物理地址,所以需要使用内核的虚拟地址临时建立与物理内存的映射关系,将文件内容读取到物理内存中,待数据读取完毕之后取消临时映射即可。
  4. 缺页异常处理完毕,物理内存中已经加载了文件的数据,此时用户空间就可以通过虚拟地址直接访问物理内存中映射的文件数据。

从文件映射的流程中可以看出它与缓存I/O相比,少了从内核缓冲区将数据拷贝到用户缓冲区的步骤,减少了一次拷贝。

Java NIO中提供了MappedByteBuffer来处理文件映射,下面是一个读取文件的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MappedByteBufferTest {

public static void main(String[] args) {

try (RandomAccessFile file = new RandomAccessFile(new File("/Users/sml/test.txt"), "r")) {
// 获取FileChannel
FileChannel fileChannel = file.getChannel();
long size = fileChannel.size();
// 调用map方法进行文件映射,返回MappedByteBuffer
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, size);
byte[] bytes = new byte[(int)size];
for (int i = 0; i < size; i++) {
// 读取数据
bytes[i] = mappedByteBuffer.get();
}
} catch (Exception e) {
e.printStackTrace();
}

}
}

零拷贝

首先看一下使用传统缓存I/O从磁盘文件读取数据并发送到网络上的过程:

  1. 用户发起系统调用,DMA从磁盘上读取数据到内核缓冲区

  2. CPU将内核缓冲区的数据拷贝到用户缓冲区

  3. CPU将用户缓冲区的数据拷贝到socket缓冲区
  4. DMA将socket缓冲区的数据拷贝到网卡

使用缓存I/O数据经过了四次拷贝,需要多次在内核空间和用户空间来回切换,影响系统性能。从数据拷贝的过程可以看到有些步骤其实是多余的,比如第二步,如果可以直接将内核缓存区的数据拷贝到socket缓冲区,或者直接将内核缓冲区的数据拷贝到网卡,岂不是减少了数据拷贝的次数?零拷贝就是这样一种致力于减少数据拷贝的技术。

Linux中的sendfile()函数可以实现将数据从一个文件描述符传输到另外一个文件描述符。Java NIO中的FileChannel也可以实现将数据从FileChannel直接传输到另一个Channel。

FileChannel

Java NIO中的FileChannel的transferTo方法将数据从FileChannel传输到另一个Channel:

1
2
3
4
5
6
 RandomAccessFile file = new RandomAccessFile(new File("/Users/shanmenglu/test.txt"), "r");
// 获取FileChannel
FileChannel fileChannel = file.getChannel();
long size = fileChannel.size();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
fileChannel.transferTo(0,size,socketChannel);

使用FileChanel后的数据拷贝:

Linux2.4版本之后通过对Socket Buffer添加一些Descriptor信息可以进一步减少数据的拷贝:

可以看到零拷贝并不是指的数据一次拷贝都没有发生,而是指没有通过CPU进行数据拷贝。Java中的堆外内存DirectByteBuffer和上面提到的mmap内存映射也是零拷贝中的一种,Netty中也提供了零拷贝相关的技术。

堆外内存

传统的缓存I/O,内核缓冲区和用户缓冲区分别占用不同的物理内存,其中用户缓冲区占用的是JVM堆内的内存:

DirectByteBuffer

Java NIO中提供了DirectByteBuffer,可以使用堆外内存,通过ByteBuffer的allocateDirect方法可以分配堆外内存,并返回内存的地址,之后就可以直接操作这块内存,不用将数据在内核缓冲区与用户缓冲区之间拷贝:

1
2
3
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

堆外内存和直接I/O的区别

直接I/O偏重的是绕过操作系统的Page Cache层,堆外内存偏重的是使用的JVM内存之外的内存,他们的侧重点不同,不过都可以减少数据的拷贝。

关于堆外内存是否在Page Cache中,这个待研究。

参考

【极客时间-倪朋飞】Linux性能优化实战

【极客时间-刘超】趣谈Linux操作系统

【拉勾教育-若地】Netty 核心原理剖析与 RPC 实践

【 Kirito的技术分享】文件IO操作的最佳实践

【小码农叔叔】java使用nio读写文件

【占小狼】深入浅出MappedByteBuffer

【零壹技术栈】深入剖析Linux IO原理和几种零拷贝机制的实现

【tomas家的小拨浪鼓】堆外内存 之 DirectByteBuffer 详解

网络IO和磁盘IO详解

【Redis】quicklist

Posted on 2021-09-20

Redis List

在Redis3.2版之前,Redis使用压缩列表和双向链表作为List的底层实现。当元素个数比较少并且元素长度比较小时,Redis使用压缩列表实现,否则Redis使用双向链表实现。

ziplist存在问题

  1. 不能保存过多的元素,否则查找复杂度高,性能降低。

  2. 由于每个节点保存了前一个节点的长度,不同长度使用的字节数不一样,所以在更新节点的时候有可能引起长度的变化导致连锁更新问题。

为了解决上面两个问题,在Redis3.2版之后,引入了quicklist。

quicklist

quicklist可以理解为是ziplist和链表的结合体,一个quicklist是一个双向链表,链表中的每一个节点是一个ziplist。

quicklist结构定义

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct quicklist {
// 头指针
quicklistNode *head;
// 尾指针
quicklistNode *tail;
unsigned long count; /* 列表中的元素总个数,也就是所有ziplist中包含的元素数量之和 */
unsigned long len; /* 链表中节点的个数 */
int fill : QL_FILL_BITS; /* 表示ziplist的大小 */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
  • head:指向头结点的指针

  • tail:指向尾节点的指针

  • count:列表中的元素总个数,等于所有节点的ziplist中包含的元素数量之和

  • len:quicklist中quicklistNode节点的个数

  • fill:用来限制quicklistNode中ziplist的大小,为正数时代表ziplist中最多能包含的元素个数,为负数时有以下几种情况:

    | 数值 | 含义 |
    | —- | ——————————- |
    | -1 | 表示ziplist的字节数不能超过4KB |
    | -2 | 表示ziplist的字节数不能超过8KB |
    | -3 | 表示ziplist的字节数不能超过16KB |
    | -4 | 表示ziplist的字节数不能超过32KB |
    | -5 | 表示ziplist的字节数不能超过64KB |

​除此之外,也可以通过list-max-ziplist-size参数配置最大的字节数。

quicklistNode结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct quicklistNode {
// 前一个节点
struct quicklistNode *prev;
// 下一个节点
struct quicklistNode *next;
// 指向ziplist压缩列表的指针
unsigned char *zl;
unsigned int sz; /* ziplist压缩列表的字节数 */
unsigned int count : 16; /* ziplist压缩列表的元素个数 */
unsigned int encoding : 2; /* 编码格式:RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* 是否被压缩 */
unsigned int attempted_compress : 1; /* 是否可以被压缩 */
unsigned int extra : 10; /* 预留bit位*/
} quicklistNode;

quicklist创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
// 分配空间
quicklist = zmalloc(sizeof(*quicklist));
// 初始化头尾节点
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
// 默认为-2,表示ziplist的字节数最大不能超过8KB
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}

添加元素

添加元素的时候可以在链表的头部或者尾部进行添加,以头部添加为例:

  1. 首先调用_quicklistNodeAllowInsert方法判断是否允许添加元素到ziplist,如果允许,调用ziplistPush方法进行添加
  2. 如果_quicklistNodeAllowInsert不允许添加元素,则需要新创建一个quicklistNode,然后将元素添加到新创建的quicklistNode的压缩列表中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 从头部添加元素
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
// 判断是否允许添加
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
// 将元素添加到ziplit
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
// 新创建quicklistNode节点
quicklistNode *node = quicklistCreateNode();
// 添加元素
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
// 更新数量
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

_quicklistNodeAllowInsert

_quicklistNodeAllowInsert方法用于判断是否允许在某个quicklistNode指向的压缩列表中添加元素。

在quicklist的结构体定义中,fill指定了ziplist中能包含的最大元素个数或者ziplist最大的字节数,_quicklistNodeAllowInsert方法就是判断ziplist中的元素个数或者ziplist的字节数是否超过了限制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// node:当前的quicklistNode节点
// fill:ziplist中能包含的最大元素个数或者ziplist最大的字节数
// sz:要添加元素的大小
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;

int ziplist_overhead;
/* 判断要添加元素的大小是否小于254 */
if (sz < 254)
ziplist_overhead = 1;
else
ziplist_overhead = 5;

/* 判断要添加元素的大小是否小于64 */
if (sz < 64)
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;

/* 计算添加元素后的当前的quicklistNode的大小 + 新加入元素的大小 + 插入元素后ziplit的prevlen占用大小 */
unsigned int new_sz = node->sz + sz + ziplist_overhead;
// 判断添加元素后的ziplist的字节数是否超过了fill中设置的大小
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
else if (!sizeMeetsSafetyLimit(new_sz))
return 0;
else if ((int)node->count < fill) // 判断ziplist的元素个数是否超过了fill设置的大小
return 1;
else
return 0;
}

总结

  1. 在Redis3.2版之前,Redis使用压缩列表和双向链表作为List的底层实现。当元素个数比较少并且元素长度比较小时,Redis使用压缩列表实现,否则Redis使用双向链表实现。

  2. 为了解决压缩列表在节点多的时候查找效率低的问题以及连锁更新问题,在Redis3.2版之后引入了quicklist,quicklist是一个双向链表,链表中的每一个节点是一个ziplist。

  3. quicklist中限定了ziplist的大小,如果超过了限制的大小,新加入元素的时候会生成一个新的quicklistNode节点。

  4. quicklist通过限定ziplist的大小来保证一个ziplist中的元素个数不会太多,如果需要连锁更新,也只在某个quicklistNode节点指向的ziplist中更新,不会引发整个链表的更新,以此来解决压缩列表存在的问题。

参考

陈雷《Redis5设计与源码分析》

极客时间 - Redis源码剖析与实战(蒋德钧)

Redis版本:redis-6.2.5

【Redis】skiplist跳跃表

Posted on 2021-09-16

有序集合Sorted Set

zadd

zadd用于向集合中添加元素,添加三门编程语言,分值分别为1、2、3:

1
2
3
4
5
6
127.0.0.1:6379> zadd language 1 java
(integer) 1
127.0.0.1:6379> zadd language 2 c++
(integer) 1
127.0.0.1:6379> zadd language 3 python
(integer) 1

zrange

zrange根据分值区间返回符合条件的数据:

1
2
3
4
5
127.0.0.1:6379> zrange language 1 3 withscores
1) "c++"
2) "2"
3) "python"
4) "3"

zscore

zscore根据key和元素值返回元素的分值

1
2
127.0.0.1:6379> zscore language python
"3"

Sorted Set是Redis中的一种数据结构,它可以用来存储带有分值的元素,并且根据分值进行排序,是一个有序的集合。

Sorted Set的结构定义如下,它包含了一个哈希表dict和一个跳跃表zskiplist,其中哈希表可以在O(1)的时间复杂度内进行元素查找,而跳跃表可以支持高效的范围查询:

1
2
3
4
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

跳跃表

如果一个有序集合中包含的元素数量比较多或者有序集合中的元素是比较长的字符串时,Redis就会使用跳跃表作为有序集合的底层实现。

跳跃表是一种多层的有序链表,在一个普通的有序链表中如果想要查找某个元素,必须遍历链表,时间复杂度为O(n),那么如何提高查找效率呢,可以使用跳跃表,从列表中抽出一些元素进行分层,比如每隔一个节点就抽出一层:

此时如果需要查找元素为9的节点:

  1. 从第三层开始查找,元素的值为8,因为9大于8并且8之后没有其他的节点所以接下来进入第二层
  2. 进入第二层,8的下一个节点为15,9小于15,所以进入第一层
  3. 进入第一层,获取8的下一个节点,等于要查找的值9,查找结束

结构定义

跳跃表的结构定义

  • header:指向跳跃表中节点的头指针,跳跃表中的节点定义为zskiplistNode,跳跃表实际上也是一个链表,所以会有一个头结点
  • tail:指向跳跃表中节点的尾指针
  • length:跳跃表中节点的数量
  • level:跳跃表的层级
1
2
3
4
5
6
7
8
9
// 跳跃表
typedef struct zskiplist {
// 指向跳跃表的头尾指针
struct zskiplistNode *header, *tail;
// 长度
unsigned long length;
// 层级
int level;
} zskiplist;

节点的结构定义

  • ele:一个sds类型的变量,存储实际的数据
  • score:存储数据的分值,跳跃表就是按照这个分值进行排序的
  • backward:一个指向前一个节点的指针,为了便于从后往前查找
  • zskiplistLevel:一个层级数组,因为跳跃表可以有多层,每一层中都有一个指向当前层级中的下一个节点的指针forward和span跨度,跨度代表了当前层级里面,当前节点与下一个节点直接跨越了几个节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//跳跃表中的节点结构定义
typedef struct zskiplistNode {
// 存储的元素
sds ele;
// 分值
double score;
// 后向指针,指向当前节点的前一个节点
struct zskiplistNode *backward;
// 层级数组
struct zskiplistLevel {
// 指向当前层级中的下一个节点
struct zskiplistNode *forward;
// 跨度
unsigned long span;
} level[];
} zskiplistNode;

跳跃表的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* 创建跳跃表节点*/
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
// 分配内存
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}

/* 创建跳跃表 */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 跳跃表分配内存
zsl = zmalloc(sizeof(*zsl));
// 层级初始化为1
zsl->level = 1;
// 长度为0
zsl->length = 0;
// 创建头结点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 初始化每一层的头结点
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
// 头结点的下一个节点指向NULL
zsl->header->backward = NULL;
// 尾节点
zsl->tail = NULL;
return zsl;
}

跳跃表层数的设置

跳跃表根据什么规则来进行层数划分呢,有以下几种方案:

方案一

每隔一个节点,就取出一个节点作为新的一层的节点,这样每一层上节点的数量大约是下一层节点数的一半,此时类似于二分查找,查找复杂度为O(logn)

优点:查找的时间复杂度降低了

缺点:由于需要维护每个层级的节点数,在节点进行插入或者删除的时候,要调整层级节点,带来额外的开销

方案二

新增加节点的时候,调用随机生成层数方法,随机生成一个当前跳跃表所需要的层数,如果生成的层数等于当前层数,新节点只需要加入跳跃表中即可,不需要额外的维护每一个层级的节点数,Redis中就是使用的随机生成层数的方式维护跳跃表的层级。

随机生成层数方法:

1
2
3
4
5
6
7
8
9
10
11
12
#define ZSKIPLIST_MAXLEVEL 32  // 最大层级不超过32
#define ZSKIPLIST_P 0.25

// 随机生成层数
int zslRandomLevel(void) {
int level = 1;
// 如果生成的随机数的值小于ZSKIPLIST_P,层数就+1
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
// 是否超过了最大层数,超过就使用最大层数
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

0xFFFF = 65535

random()&0xFFFF运算之后会生成一个0和65535之间的数,ZSKIPLIST_P 0xFFFF = 0.25 65535,所以random()&0xFFFF 小于 0.25 * 65535的概率为25%,也就是层数会增加1的概率不超过25%。

跳跃表增加节点

  1. 因为跳跃表有多层,所以需要遍历每一层,寻找每层要插入的位置,update[i]就记录了每一层要插入的位置
  2. 随机生成跳跃表的层数,如果层数有变化,则需要调整跳跃表的层高
  3. 创建节点,并将节点插入到跳跃表中
  4. 设置backward,新插入节点的前一个节点是update[0],如果update[0]为头结点,当前节点的前一个节点设为null,否则backward设置为update[0]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
//获取头结点
x = zsl->header;
/* 寻找每层要插入的位置,从高层开始向下遍历 */
for (i = zsl->level-1; i >= 0; i--) {
// rank[i]记录了当前层从header节点到update[i]节点所经历的步长
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 如果当前层级下一个节点不为空 并且 下一个节点的score小于要插入节点的分值 或者 下一个节点的score等于要插入节点的score并且对比两个节点存储的元素值之后小于0(字符串比较)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
// 更新rank[i]的值
rank[i] += x->level[i].span;
// 获取下一个节点
x = x->level[i].forward;
}
// 记录每层需要插入的位置
update[i] = x;
}
// 随机生成跳跃表的层数
level = zslRandomLevel();
// 如果大于当前的层数
if (level > zsl->level) {
// 调整层数
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
// 更新层数
zsl->level = level;
}
// 创建节点
x = zslCreateNode(level,score,ele);
// 循环每一层,添加节点
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* 更新跨度 */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 设置当前节点的前一个节点,如果update[0]为头结点,当前节点的前一个节点设为null,否则backward设置为update[0]
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 增加长度
zsl->length++;
return x;
}

总结

  1. Sorted Set支持在添加元素的时候为元素添加一个分值,并根据分值排序,是一个有序的集合。
  2. Sorted Set在数据比较少的时候采用ziplist存储,超过阈值后使用哈希表和跳跃表来提高查找效率,其中哈希表用于单值查询,跳跃表用于范围查询。
  3. 跳跃表是一个多层的有序链表,它采用了空间换时间的方式将查找的时间复杂度降到了O(logN)。

参考

黄健宏《Redis设计与实现》

陈雷《Redis5设计与源码分析》

极客时间 - Redis源码剖析与实战(蒋德钧)

【unix21】redis源码分析–zslRandomLevel位运算解析

【有梦想的肥宅】Redis5设计与源码分析读后感(三)跳跃表

Redis版本:redis-6.2.5

【Redis】ziplist压缩列表

Posted on 2021-09-12

压缩列表

压缩列表是列表和哈希表的底层实现之一:

  • 如果一个列表只有少量数据,并且数据类型是整数或者比较短的字符串,redis底层就会使用压缩列表实现。

  • 如果一个哈希表只有少量键值对,并且每个键值对的键和值数据类型是整数或者比较短的字符串,redis底层就会使用压缩列表实现。

Redis压缩列表是由连续的内存块组成的列表,主要包含以下内容:

  • zlbytes:记录压缩列表占用的总的字节数,占用4个字节(32bits)

  • zltail:记录压缩列表的起始位置到最后一个节点的字节数,假如知道压缩列表的起始地址,只需要假设zltail记录的偏移量即可定位到压缩列表中最后一个节点的位置,占用4个字节(32bits)

  • zllen:记录了压缩列表中节点的数量,占用2个字节(16bits)

  • entry:存储数据的节点,可以有多个

  • zlend:标记压缩列表的结尾,值为255,占用1个字节(8bits)

压缩列表的创建

列表在初始化的时候会计算需要分配的内存空间大小,然后进行内存分配,之后将内存空间的最后一个字节标记为列表结尾,内存空间的大小计算方式如下:

  1. 压缩列表头大小,包括zlbytes、zltail和zllen所占用的大小:32 bits * 2 + 16 bits

  2. 压缩列表结尾大小:8bits

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 压缩列表头大小,包括zlbytes、zltail和zllen所占用的大小:32 bits * 2 + 16 bits
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
// 压缩列表结尾大小:8bits
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
// 列表结尾标记
#define ZIP_END 255

unsigned char *ziplistNew(void) {
// 计算需要分配的内存大小
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
// 分配内存
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
// 将内存空间的最后一个字节标记为列表结尾
zl[bytes-1] = ZIP_END;
return zl;
}

所以在创建之后,内存布局如下,此时压缩列表中还没有节点:

之后如果如果需要添加节点,会进行移动,为新节点的插入腾出空间,所以还是占用的连续的空间:

压缩列表节点

压缩列表的节点可以存储字符串或者整数类型的值,为了节省内存,它采用了变长的编码方式,压缩列表的节点的结构定义如下:

1
2
3
4
5
6
7
8
9
typedef struct zlentry {
unsigned int prevrawlensize; /* 前一个节点长度编码所需要的字节数*/
unsigned int prevrawlen; /* 前一个节点的长度(占用的字节数)*/
unsigned int lensize; /* 当前节点长度编码所需要的字节数*/
unsigned int len; /* 当前节点的长度(占用的字节数)*/
unsigned int headersize; /* header的大小,headersize = prevrawlensize + lensize. */
unsigned char encoding; /* 记录了数据的类型和数据长度 */
unsigned char *p; /* 指向数据的指针 */
} zlentry;

prevrawlen:存储前一个节点的长度(占用的字节数),这样如果从后向前遍历,只需要当前节点的起始地址减去长度的偏移量prevrawlen就可以定位到上一个节点的位置,prevrawlen的长度可以是1字节或者5字节:

  • 如果前一项节点的长度小于254字节,那么prevrawlen的长度是1字节。
  • 如果前一项节点的长度大于254字节,那么prevrawlen的长度是5字节,其中第一个字节会被设置为0xFE(十进制254),之后的四个字节用于保存前一个节点的长度。

为什么没有255字节?

因为255用来标记为压缩列表的结尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* 节点编码所需要的字节数 */
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
if (p == NULL) {
return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(uint32_t) + 1;
} else {
// 判断长度是否小于254
if (len < ZIP_BIG_PREVLEN) {
p[0] = len;
// 使用1个字节
return 1;
} else {
return zipStorePrevEntryLengthLarge(p,len);
}
}
}

// 节点编码所需要的字节数
int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {
uint32_t u32;
if (p != NULL) {
// 将prevrawlen的第1个字节设置为254
p[0] = ZIP_BIG_PREVLEN;
u32 = len;
memcpy(p+1,&u32,sizeof(u32));
memrev32ifbe(p+1);
}
// 使用5个字节
return 1 + sizeof(uint32_t);
}

encoding:记录了节点的数据类型和内容的长度,因为压缩列表可以存储字符串或者整型,所以有以下两种情况:

  1. 存储内容为字符串

    C语言存储字符串底层使用的是字节数组,当内容为字符串时分为三种情况,encoding分别占用1字节、2字节、5字节,encoding占用字节大小的不同,代表存储不同长度的字节数组。

编码 编码长度 数据类型
00xxxxxx 占用1个字节,也就是8bits 长度小于等于63(2^6 - 1)字节的字节数组
01xxxxxx xxxxxxxx 占用2个字节,也就是16bits 长度小于等于16383(2^14 - 1)字节的字节数组
10xxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx 占用5个字节,40bits 长度小于等于4294967295(2^32 - 1)字节的字节数组
  1. 存储内容为整数

存储内容为整数时,encoding占用1个字节,最高位是11开头,后六位代表整数值的长度,其中当编码为1111xxxx时情况比较特殊,

后四位的值在0001和1101之间,此时直接代表数据的内容,是0到12之间的一个数字,并不是数据长度,因为它代表了数据内容,所以也不需要额外的空间存储数据内容。

编码 编码长度 数据类型
11000000 1个字节 int16_t类型的整数
11010000 1个字节 uint32_t类型的整数
11100000 1个字节 uint64_t类型的整数
11110000 1个字节 24位有符号整数
11111110 1个字节 8位有符号整数
1111xxxx 1个字节 特殊情况,后四位的值在0001和1101之间,此时代表的是数据内容,并不是数据长度

zipStoreEntryEncoding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 节点编码所需字节数判断
unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
unsigned char len = 1, buf[5];
// 如果是字符串
if (ZIP_IS_STR(encoding)) {
/* 根据字符串的长度判断使用几个字节数 */
if (rawlen <= 0x3f) { // 小于等于63字节
if (!p) return len;
buf[0] = ZIP_STR_06B | rawlen;
} else if (rawlen <= 0x3fff) { // 小于等于16383字节
len += 1; // 使用2个字节
if (!p) return len;
buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
buf[1] = rawlen & 0xff;
} else { // 字符串长度大于16383字节
len += 4; // 使用5个字节
if (!p) return len;
buf[0] = ZIP_STR_32B;
buf[1] = (rawlen >> 24) & 0xff;
buf[2] = (rawlen >> 16) & 0xff;
buf[3] = (rawlen >> 8) & 0xff;
buf[4] = rawlen & 0xff;
}
} else {
// 如果是整数,使用1个字节
if (!p) return len;
buf[0] = encoding;
}

/* 保存长度 */
memcpy(p,buf,len);
return len;
}

节点的插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// 添加节点
// zl:指向压缩列表的指针
// s:数据内容
// slen:数据的长度
// where:在哪个位置添加
// 调用例子:zl = ziplistPush(zl, (unsigned char*)"foo", 3, ZIPLIST_TAIL);
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
unsigned char *p;
// 判断是在头部或者尾部进行添加
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
// 插入节点
return __ziplistInsert(zl,p,s,slen);
}

// 插入节点
// zl:指向压缩列表的指针
// p:添加的位置
// s:数据内容
// slen:数据的长度
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789;
zlentry tail;

// 判断要添加的位置是否是结尾处
if (p[0] != ZIP_END) {// 如果不是尾部
// 计算前一个节点的长度prevlen
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else { // 如果是在尾部
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
// 计算前一个节点的长度
prevlen = zipRawEntryLengthSafe(zl, curlen, ptail);
}
}
// 判断节点是否可以被Encoding
if (zipTryEncoding(s,slen,&value,&encoding)) {
// 计算将字符串转换为整数后的长度
reqlen = zipIntSize(encoding);
} else {
// 直接使用原始长度
reqlen = slen;
}

// reqlen用来保存当前节点所占用的长度
// 加上前一个节点编码所需要的字节数
reqlen += zipStorePrevEntryLength(NULL,prevlen);
// 加上当前节点编码所需要的字节数
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

/* 这里用于判断节点加入的时候,后面的节点prevrawlen的字节数是否可以满足要插入节点的长度*/
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
offset = p-zl;
newlen = curlen+reqlen+nextdiff;
// 调整压缩列表的长度
zl = ziplistResize(zl,newlen);
p = zl+offset;
// 如果p不指向链表结尾,说明新加入的节点不是最后一个
if (p[0] != ZIP_END) {
/* 将p指向的节点和它之后的节点向后移动,为新节点腾出空间*/
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

/* 当前节点的长度编码后存储到后一个节点的prevrawlen*/
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);

/* 更新结尾的OFFSET */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1));
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
/* 新加入的节点是列表的最后一个节点时 */
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}

/* 这里判断是否需要连锁更新 */
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}

/* 插入节点*/
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
// 修改压缩列表节点的数量
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}

连锁更新

因为压缩列表中每个节点记录了前一个节点的长度:

  • 如果前一项节点的长度小于254字节,那么prevrawlen的长度是1字节。
  • 如果前一项节点的长度大于254字节,那么prevrawlen的长度是5字节,其中第一个字节会被设置为0xFE(十进制154),之后的四个字节用于保存前一个节点的长度。

假设有一种情况,一个压缩列表中,存储了多个长度是253字节的节点,因为节点的长度都在254字节以内,所以每个节点的prevrawlen只需要1个字节去存储长度的值:

此时在列表的头部需要新增加一个节点,并且节点的长度大于254,这个时候原先的头结点entry1 prevrawlen使用1字节已经不能满足当前的情况了,必须要使用5字节存储,因此entry1的prevrawlen变成了5字节,entry1的长度也会跟着增加4个字节,已经超过了254字节,因为大于254就需要使用5个字节存储,所以entry2的prevrawlen也需要改变为5字节,后面的以此类推,引发了连锁更新,这种情况称之为连锁更新:

总结

(1)Redis压缩列表使用了一块连续的内存,来节约内存空间。

(2)压缩列表的节点可以存储字符串或者整数类型的值,它采用了变长的编码方式,根据数据类型的不同以及数据长度的不同,选择不同的编码方式,每种编码占用的字节大小不同,以此来节约内存。

(3)压缩列表的每个节点中存储了前一个节点的字节长度,如果知道某个节点的地址,可以使用地址减去字节长度定位到上一个节点,不过新增节点的时候,由于前一个节点的长度大于254使用5个字节,小于254使用1个字节存储,在一些极端的情况下由于长度的变化会引起连锁更新。

参考

黄健宏《Redis设计与实现》

极客时间 - Redis源码剖析与实战(蒋德钧)

【张铁蕾】Redis内部数据结构详解(4)——ziplist

【_HelloBug】Redis-压缩表-__ziplistInsert详解

图解Redis之数据结构篇——压缩列表

Redis版本:redis-6.2.5

【JAVA】动态代理

Posted on 2021-09-07

动态代理

代理模式包含三个角色:

  1. Subject主题对象:一般是一个接口,定义一些方法。
  2. RealSubject 具体的主题实现对象:实现Subject中定义的方法。
  3. Proxy 代理对象:Proxy中包含一个RealSubject的引用,由代理对象实现RealSubject方法的调用。

Subject

1
2
3
public interface Subject {
void execute();
}

RealSubject

1
2
3
4
5
6
public class RealSubject implements Subject {
@Override
public void execute() {
System.out.println("realsubject方法执行");
}
}

代理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ProxyInvocationHandler implements InvocationHandler {

/**
* 代理的目标对象,也就是RealSubject
*/
private Object target;

/**
* 构造函数
*/
public ProxyInvocationHandler(Object target){
this.target = target;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("开始执行方法:" + method.getName());
// 执行RealSubject中的方法
Object result = method.invoke(target, args);
System.out.println("结束执行方法:" + method.getName());
return null;
}

public Object getProxy() {
// 创建代理对象
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), target.getClass().getInterfaces(), this);
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ProxyTest {
public static void main(String[] args) {
System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true");
// 创建实际的对象
Subject subject = new RealSubject();
// 创建InvocationHandler
ProxyInvocationHandler invocationHandler = new ProxyInvocationHandler(subject);
// 获取代理对象
Subject proxy = (Subject) invocationHandler.getProxy();
// 执行方法
proxy.execute();
}
}

运行结果:

1
2
3
开始执行方法:execute
realsubject方法执行
结束执行方法:execute

动态代理实现原理

在ProxyInvocationHandler中可以看到通过Proxy创建了一个代理对象,那么接下来就进入到Proxy中,看一下是如何创建代理对象的:

1
2
3
// 创建代理对象
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), target.getClass().getInterfaces(), this);

Proxy

在Proxy中newProxyInstance方法创建代理对象的时候,传入了类加载器、需要代理的Subject对象以及InvocationHandler:

  1. 根据类加载器和需要代理的Subject对象生成代理类的class
  2. 根据生成的代理类的class信息,通过构造器创建代理对象,并将InvocationHandler传入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Proxy implements java.io.Serializable {
// 创建代理对象
@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h) throws IllegalArgumentException {
Objects.requireNonNull(h);
final Class<?>[] intfs = interfaces.clone();
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
}
// 获取代理类的class
Class<?> cl = getProxyClass0(loader, intfs);
try {
if (sm != null) {
checkNewProxyPermission(Reflection.getCallerClass(), cl);
}
// 获取构造器
final Constructor<?> cons = cl.getConstructor(constructorParams);
// InvocationHandler
final InvocationHandler ih = h;
if (!Modifier.isPublic(cl.getModifiers())) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
cons.setAccessible(true);
return null;
}
});
}
// 通过构造器创建代理对象,并将InvocationHandler传入
return cons.newInstance(new Object[]{h});
} catch (IllegalAccessException|InstantiationException e) {
//...
}
}
}
生成代理类的class

getProxyClass0中首先会进行边界检查,然后根据加载器和需要代理的Subject信息从proxyClassCache缓存中获取生成的代理类的calss

,具体的实现在WeakCache的get方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 代理类的缓存
*/
private static final WeakCache<ClassLoader, Class<?>[], Class<?>>
proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());

/**
* 生成代理类的class
*/
private static Class<?> getProxyClass0(ClassLoader loader,
Class<?>... interfaces) {
// 边界检查
if (interfaces.length > 65535) {
throw new IllegalArgumentException("interface limit exceeded");
}
// 从proxyClassCache中获取class
return proxyClassCache.get(loader, interfaces);
}
WeakCache

WeakCache的get方法中如果根据缓存key获取对象为空,会创建一个Factory对象赋值给Supplier,Factory是WeakCache的一个内部类,它实现了Supplier接口,然后调用Supplier的get方法来生成代理类的class,接下来进入到Factory的get方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final class WeakCache<K, P, V> {
// 获取class
public V get(K key, P parameter) {
Objects.requireNonNull(parameter);
expungeStaleEntries();
// 获取缓存key
Object cacheKey = CacheKey.valueOf(key, refQueue);
// 根据key获取对象
ConcurrentMap<Object, Supplier<V>> valuesMap = map.get(cacheKey);
// 如果为空
if (valuesMap == null) {
// 创建一个ConcurrentMap
ConcurrentMap<Object, Supplier<V>> oldValuesMap
= map.putIfAbsent(cacheKey,
valuesMap = new ConcurrentHashMap<>());
if (oldValuesMap != null) {
valuesMap = oldValuesMap;
}
}
// 创建subKey
Object subKey = Objects.requireNonNull(subKeyFactory.apply(key, parameter));
Supplier<V> supplier = valuesMap.get(subKey);
Factory factory = null;
while (true) {
if (supplier != null) {
// 调用get方法获取class
V value = supplier.get();
if (value != null) {
return value;
}
}
// 如果为空,创建Factory
if (factory == null) {
factory = new Factory(key, parameter, subKey, valuesMap);
}
// 如果supplier为null
if (supplier == null) {
supplier = valuesMap.putIfAbsent(subKey, factory);
if (supplier == null) {
// 将factory赋值给supplier
supplier = factory;
}
} else {
if (valuesMap.replace(subKey, supplier, factory)) {
supplier = factory;
} else {
// retry with current supplier
supplier = valuesMap.get(subKey);
}
}
}
}
}
Factory

Factory是WeakCache的一个内部类,它实现了Supplier接口,在get方法中,又调用了valueFactory的apply方法创建class,valueFactory是WeakCache的一个成员变量,在WeakCache的构造函数中可以看到传入了valueFactory对象进行初始化,那么接下来就需要回到Proxy类中,看一下如何实例化WeakCache的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
final class WeakCache<K, P, V> {

private final BiFunction<K, P, V> valueFactory;

public WeakCache(BiFunction<K, P, ?> subKeyFactory,
BiFunction<K, P, V> valueFactory) {
this.subKeyFactory = Objects.requireNonNull(subKeyFactory);
// 初始化
this.valueFactory = Objects.requireNonNull(valueFactory);
}

// Factory
private final class Factory implements Supplier<V> {

private final K key;
private final P parameter;
private final Object subKey;
private final ConcurrentMap<Object, Supplier<V>> valuesMap;

Factory(K key, P parameter, Object subKey,
ConcurrentMap<Object, Supplier<V>> valuesMap) {
this.key = key;
this.parameter = parameter;
this.subKey = subKey;
this.valuesMap = valuesMap;
}

@Override
public synchronized V get() {
//
Supplier<V> supplier = valuesMap.get(subKey);
if (supplier != this) {
return null;
}

V value = null;
try {
// 调用valueFactory的apply方法创建class
value = Objects.requireNonNull(valueFactory.apply(key, parameter));
} finally {
if (value == null) { // remove us on failure
valuesMap.remove(subKey, this);
}
}

......

return value;
}
}
}
ProxyClassFactory

Proxy中WeakCache初始化的时候使用的是ProxyClassFactory类型的factory:

1
2
3
4
5
6
7
public class Proxy implements java.io.Serializable {  
/**
* WeakCache初始化
*/
private static final WeakCache<ClassLoader, Class<?>[], Class<?>>
proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());
}

所以调用valueFactory的apply方法的时候会进入到ProxyClassFactory的apply方法,在apply方法中会通过ProxyGenerator动态生成代理类并加载类,然后将实例化的代理类返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static final class ProxyClassFactory
implements BiFunction<ClassLoader, Class<?>[], Class<?>>
{
// 前缀
private static final String proxyClassNamePrefix = "$Proxy";

// next number to use for generation of unique proxy class names
private static final AtomicLong nextUniqueNumber = new AtomicLong();

@Override
public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {

Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
......

long num = nextUniqueNumber.getAndIncrement();
String proxyName = proxyPkg + proxyClassNamePrefix + num;

/*
* 生成代理类
*/
byte[] proxyClassFile = ProxyGenerator.generateProxyClass(
proxyName, interfaces, accessFlags);
try {
// 加载代理,并返回对象
return defineClass0(loader, proxyName,
proxyClassFile, 0, proxyClassFile.length);
} catch (ClassFormatError e) {

throw new IllegalArgumentException(e.toString());
}
}
}
ProxyGenerator

ProxyGenerator是Proxy的一个内部类,用于动态生成class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 private static final class ProxyClassFactory implements BiFunction<ClassLoader, Class<?>[], Class<?>> {  
public static byte[] generateProxyClass(final String var0, Class<?>[] var1, int var2) {
ProxyGenerator var3 = new ProxyGenerator(var0, var1, var2);
// 生成class
final byte[] var4 = var3.generateClassFile();
// 是否保存到文件,如果开启了之后,运行程序之后会在包下面生成class文件
if(saveGeneratedFiles) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
try {
int var1 = var0.lastIndexOf(46);
Path var2;
if(var1 > 0) {
Path var3 = Paths.get(var0.substring(0, var1).replace('.', File.separatorChar), new String[0]);
Files.createDirectories(var3, new FileAttribute[0]);
var2 = var3.resolve(var0.substring(var1 + 1, var0.length()) + ".class");
} else {
var2 = Paths.get(var0 + ".class", new String[0]);
}

Files.write(var2, var4, new OpenOption[0]);
return null;
} catch (IOException var4x) {
throw new InternalError("I/O exception saving generated file: " + var4x);
}
}
});
}
return var4;
}
}
代理类的生成

由于设置了sun.misc.ProxyGenerator.saveGeneratedFiles为true,所以可以在包下面看到生成的代理类$Proxy0:

  1. 它继承了Proxy并实现了Subject,并且在构造函数中需要传入InvocationHandler对象
  2. 当执行$Proxy0中的execute方法时,实际上调用的是InvocationHandler的invoke方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

package com.sun.proxy;

import com.example.demo.bean.Subject;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

// 动态生成了一个$Proxy0类,它继承了Proxy并实现了Subject
public final class $Proxy0 extends Proxy implements Subject {
private static Method m1;
private static Method m2;
private static Method m3;
private static Method m0;
// 传入InvocationHandler对象
public $Proxy0(InvocationHandler var1) throws {
super(var1);
}

public final boolean equals(Object var1) throws {
try {
return ((Boolean)super.h.invoke(this, m1, new Object[]{var1})).booleanValue();
} catch (RuntimeException | Error var3) {
throw var3;
} catch (Throwable var4) {
throw new UndeclaredThrowableException(var4);
}
}

public final String toString() throws {
try {
return (String)super.h.invoke(this, m2, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

// Subject的execute方法
public final void execute() throws {
try {
// 调用了InvocationHandler的invoke方法
super.h.invoke(this, m3, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

public final int hashCode() throws {
try {
return ((Integer)super.h.invoke(this, m0, (Object[])null)).intValue();
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

static {
try {
m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[]{Class.forName("java.lang.Object")});
m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
m3 = Class.forName("com.example.demo.bean.Subject").getMethod("execute", new Class[0]);
m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
} catch (NoSuchMethodException var2) {
throw new NoSuchMethodError(var2.getMessage());
} catch (ClassNotFoundException var3) {
throw new NoClassDefFoundError(var3.getMessage());
}
}
}

总结

JDK的动态代理实现原理是在运行中动态生成代理类,这个代理类实现了Subject接口,在对代理类进行实例化的时候,需要传入InvocationHandler,当调用代理类的方法时,会执行InvocationHandler的invoke方法,从而完成代理功能。

参考

【拉勾教育】Dubbo源码解读与实战-代理模式与常见实现

【Dubbo】Dubbo SPI机制

Posted on 2021-09-02

Dubbo SPI

在Dubbo的源码中,可以看到很多地方使用了ExtensionLoader来获取具体的扩展类,以Protocol为例,Protocol是一个接口,它可以有多种协议,那么具体选择哪一种协议呢,就是通过ExtensionLoader的getExtensionLoader方法获取Protocol对应的ExtensionLoader对象,然后调用其getAdaptiveExtension方法获取具体的实现类的,它可以根据配置选择不同的协议:

1
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

可以看到Protocol使用到了@SPI注解,默认使用dubbo协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SPI("dubbo")
public interface Protocol {

int getDefaultPort();

@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

void destroy();

default List<ProtocolServer> getServers() {
return Collections.emptyList();
}

}

在META-INF/dubbo下面可以看到dubbo对应的类为org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol:

1
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

因为META-INF下dubbo对应的扩展类为DubboProtocol,所以通过ExtensionLoader获取Protocol具体的实现类时,如果没有指定协议,将默认使用DubboProtocol。

Dubbo还使用了自适应扩展机制,也就是@Adaptive,它可以加载类上,也可以加在某个方法上,加在某个方法中时,会动态生成字节码创建自适应对象Protocol$Adaptive,Protocol$Adaptive中会根据url中的协议选择不同的实现类。接下来就进入ExtensionLoader中看一下SPI机制的实现原理。

ExtensionLoader

getExtensionLoader方法

EXTENSION_LOADERS:缓存每个class对象对应的ExtensionLoader,dubbo会为每个class都创建一个ExtensionLoader对象。

getExtensionLoader方法用来根据class对象获取对应的ExtensionLoader,就是从EXTENSION_LOADERS获取的,如果为空,会创建一个ExtensionLoader对象并放入EXTENSION_LOADERS中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ExtensionLoader<T> {

// 一个map,key为class类型,value为对应的ExtensionLoader对象
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);

// 根据class类型获取ExtensionLoader
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 创建一个ExtensionLoader对象并放入EXTENSION_LOADERS中
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
// 返回ExtensionLoader对象
return loader;
}
}

ExtensionLoader中还有一个getExtension方法,它可以根据名称来获取对应的实现类,比如自定义了一个Protocol的实现类MyDubboProtocol:

1
2
3
public class MyDubboProtocol implements Protocol {

}

在META-INF/dubbo下面配置一个自定义的协议,key为mydubbo,value为MyDubboProtocol全限定类名:

1
mydubbo=org.apache.dubbo.rpc.protocol.dubbo.MyDubboProtocol

接下来通过ExtensionLoader的getExtension方法中传入mydubbo就可以获取到MyDubboProtocol:

1
2
// 此时获取的Protocol类型就是MyDubboProtocol
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("mydubbo");

getExtension方法

getExtension方法,会根据传入的名称获取对应的实例化对象,如果对象为空,会调用createExtension方法创建扩展对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 根据名称获取对象
@SuppressWarnings("unchecked")
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
// 获取实例对象
Object instance = holder.get();
// 如果对象为空
if (instance == null) {
synchronized (holder) {
// 再次获取判断是否为空
instance = holder.get();
if (instance == null) {
// 创建createExtension对象
instance = createExtension(name);
// 放入holder
holder.set(instance);
}
}
}
return (T) instance;
}

createExtension

createExtension方法用于创建扩展对象:

  1. 它会调用getExtensionClasses获取所有的class
  2. 根据传入的名称name获取class对象
  3. 根据第2步获取的class对象,从EXTENSION_INSTANCES(缓存了每个class对应的实例化对象)中获取class对应的实例化对象,如果为空,通过newInstance实例化一个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 缓存每个class对象对应的实例化对象
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);

// 创建对象
private T createExtension(String name) {
// 调用getExtensionClasses,根据名称获取对应的class
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
// 根据class获取对应的实例
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 如果为空,创建一个对象
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 初始化
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}

getExtensionClasses

getExtensionClasses用于获取所有的扩展类的class信息,如果为空调用loadExtensionClasses方法加载所有的class信息,并放入一个map中,将map封装为一个Holer对象,也就是cachedClasse:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

// 获取所有的class
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 调用loadExtensionClasses加载所有的class
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

比如dubbo协议,key为dubbo,value为org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol,在getExtensionClasses方法中就会将这些信息放入到cachedClasses中:

1
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

loadExtensionClasses

loadExtensionClasses加载所有的扩展类的class信息,它会遍历所有的加载策略,从对应的目录下加载class信息:

1
2
3
4
5
6
7
8
9
10
11
12
// 加载class
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 遍历所有的加载策略
for (LoadingStrategy strategy : strategies) {
// 从目录下加载class信息
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
LoadingStrategy
1
2
3
4
5
6
7
8
// loadLoadingStrategies获取所有的加载策略
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
// 获取所有的加载策略
private static LoadingStrategy[] loadLoadingStrategies() {
return stream(load(LoadingStrategy.class).spliterator(), false)
.sorted()
.toArray(LoadingStrategy[]::new);
}

LoadingStrategy是一个接口,它有四个实现类:

以DubboLoadingStrategy为例,可以看到directory的值为”META-INF/dubbo/“,所以loadExtensionClasses中会加载META-INF/dubbo/下的所有配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DubboLoadingStrategy implements LoadingStrategy {

@Override
public String directory() {
return "META-INF/dubbo/";
}

@Override
public boolean overridden() {
return true;
}

@Override
public int getPriority() {
return NORMAL_PRIORITY;
}
}

Adaptive自适应扩展

在最开始,获取Protocol的时候调用的是getAdaptiveExtension方法,并且Protocol的export和refer上添加了@Adaptive注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

@SPI("dubbo")
public interface Protocol {

int getDefaultPort();

@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

void destroy();

default List<ProtocolServer> getServers() {
return Collections.emptyList();
}

}

getAdaptiveExtension

getAdaptiveExtension方法用来获取自适应扩展对象,如果为空,将会调用createAdaptiveExtension创建扩展对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final Holder<Object> cachedAdaptiveInstance = new Holder<>();

@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
// 从缓存中获取
Object instance = cachedAdaptiveInstance.get();
// 如果为空
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
// 加锁
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建自适应Extension对象
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}

return (T) instance;
}

createAdaptiveExtension

createAdaptiveExtension用于创建自适应扩展对象,它会通过AdaptiveClassCodeGenerator动态生成字节码,创建代理对象XXX$Adaptive:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 private T createAdaptiveExtension() {
try {
// 创建自适应Extension对象
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
// 获取自适应Extension class
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 创建自适应Extension Class
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

private Class<?> createAdaptiveExtensionClass() {
// 创建AdaptiveClassCodeGenerator自适应代码生成对象,动态生成字节码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

以Protocol为例,会动态生成Protocol$Adaptive类,在Protocol$Adaptive中实现了Protocol接口中的方法,以export为例,它根据url中的协议获取扩展名,再根据名称调用上面提到的getExtension方法获取具体的实现类,所以在这里才会生成DubboProtocol,之后调用DubboProtocol的export方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
// 根据url中的协议获取扩展名
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
// 根据扩展名获取扩展类
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}

public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}

public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}

public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
}

参考

Dubbo - Dubbo的SPI机制

【拉勾教育】Dubbo源码解读与实战

dubbo版本:2.7.7

123…6

shan

53 posts
12 tags
© 2022 shan
Powered by Hexo
|
Theme — NexT.Muse v5.1.4