Bootstrap

分布式事务解决方案Seata源码解析

Seata源码整体结构

- seata
	+ all // seata所用到的依赖库
	+ bom // seata依赖库的版本管理
	+ common // Executor、Loader、Thread、Compress、NetUtil等公用功能
	+ compressor // 7z、bzip2、gzip、lz4、zip等压缩算法的封装
	+ config // 配置中心,目前支持Apollo、Consul、Etcd3、Nacos、SpringCloud、Zookeeper等
	+ core // Compressor、Event、Lock、Model、Protocol、RPC、Serializer、Store等核心骨架功能
	+ discovery // 服务发现,目前支持Consul、Etcd3、Eureka、Nacos、Redis、Sofa、Zookeeper等
	+ distribution // 打包、发布Seata
	+ integration // 支持Dubbo、gRPC、HTTP、Motan、Sofa等拦截器,从而获取XID
	+ metrics // 监控等相关功能
	+ rm // Resource Manager等相关功能
	+ rm-datasource // Resource Manager的数据源,包含Undo Log等
	+ saga // 支持Saga模式等相关功能
	+ script // Seata服务初始化、启动等脚本,还包含DB Schema
	+ seata-spring-boot-starter // Spring Boot Starter
	+ serializer // 序列化、反序列化组件,目前支持FST、Hessian、Kryo、Protobuf等
	+ server // Transaction Coordinator等相关功能
	+ spring // 提供@GlobalLock、@GlobalTransactional等注解
	+ sqlparser // 解析SQL等相关功能,目前只支持使用Druid
	+ style // 代码风格检查的配置
	+ tcc // 支持TCC模式等相关功能
	+ test // 测试用例
	+ tm // Transaction Manager等相关功能

接下来分别看一下各个核心组件的实现。

Seata Server(Transaction Coordinator)

核心类说明

TC、RM跟TM之间通信协议的类图如下所示:

可以看出,消息主要有两大类:

  • :RM跟TM往TC发的消息:开始全局事务请求:注册分支事务请求:分支事务状态报告请求:全局锁请求:获取全局事务状态请求:全局事务回滚请求:全局事务提交请求:Saga模式下,TM上报全局事务状态

  • :TC往RM发的消息:分支事务回滚请求:分支事务提交请求:删除Undo日志请求

Seata Server的核心类图:

:定义事务提交、回滚等操作接口

:实现事务提交、回滚等操作的抽象类

:AT模式下的事务核心操作类

:Saga模式下的事务核心操作类

:TCC模式下的事务核心操作类

:XA模式下的事务核心操作类

:默认的TC事务操作实现

:定义了TC处理请求的接口

:处理TC请求的抽象类

:Transaction Coordinator的默认实现

Session跟Store的核心类图:

:实现该接口的类能够被锁定,用于实现隔离性

:实现该接口的类能够被存储

:事务生命周期接口

:代表分支事务,在Seata中,Session代表一个事务

:代表全局事务,在Seata中,Session代表一个事务

:事务生命周期监听器

:事务管理器,支持 , 等功能

:抽象的事务管理器,实现了

:基于数据库存储事务信息的事务管理器

:基于文件存储事务信息的事务管理器

:基于Redis存储事务信息的事务管理器

:定义存储事务信息的相关接口

:存储事务信息抽象类

:基于数据库存储事务信息的事务管理器

:基于文件存储事务信息的事务管理器

:基于Redis存储事务信息的事务管理器

全局事务状态转换图

在全局事务的第一阶段,全局事务的状态是Begin;在第二阶段全局事务的状态可能是Committing、Rollbacking跟TimeoutRollbacking等;最后,全局事务结束时的状态可能是Committed、Rollbacked等。

分支事务状态转换图

分支事务在第一阶段的状态是Registered,进入第二阶段时的状态是PhaseOne_Done,最终结束时的状态可能是PhaseTwo_committed。

Seata Server存储事务信息的数据结构

Seata Server支持三种方式存储事务信息:File、Redis跟Database,我们通过Database的Schema来看看存储了哪些信息:

-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

可以看出,Transaction Coordinator主要在DB里存储了全局事务、分支事务跟锁等相关的信息。

Seata Server定时任务

:重试回滚事务超时的定时任务

:重试提交事务的定时任务

:异步提交事务的定时任务

:事务超时检查定时任务,如果事务超时了,那就执行回滚操作

:删除RM上的Undo Log的定时任务

事务Id生成方式

事务Id生成方式:

的生成方式有点类似Snowflake。

Transaction Manager

:定义全局事务跟操作全局事务的管理接口

:默认的全局事务操作管理类

:定义全局事务的接口

:默认的全局事务类

:全局事务的执行逻辑

:标注类的代理类,使用AOP实现了动态代理,做到了对业务无侵入

:Transaction Manager的客户端RPC请求类

:代表Transaction Manager客户端

Resource Manager

Resource Manager核心类的类结构图:

:定义了 , 等相关的接口

:默认的 实现

:Resource Manager相关RPC请求的处理接口

:抽象的Resource Manager相关RPC请求的处理接口,实现了 、 等公用接口

:Resource Manager的RPC客户端

:代表Resource Manager客户端

:封装了在获取到全局锁后执行业务逻辑的接口

:标注类的代理类,使用AOP实现了动态代理,做到了对业务无侵入

:抽象资源管理类

:资源管理类

Resource Manager Datasource核心类的类结构图:

:JDBC中定义的数据源,是对数据库跟相关操作的抽象

:JDBC中定义的连接,代表对数据库连接的抽象

:JDBC中定义的对数据库的一个操作

:对 的一个抽象实现

:Seata中的数据源代理类,通过该代理类解析用户SQL,并生成Undo Log,然后会在同一个事务中存进数据库

:Undo Log相关的管理类

:Undo Log存MySQL相关的类

: 的实现类

: 上下文,主要保存了 , 等相关信息

: 的实现类

:执行SQL操作相关的帮助类

:执行事务相关的类

:执行DELETE SQL相关的类,会生成相应的 跟

:执行UPDATE SQL相关的类,会生成相应的 跟

AT模式的DB Schema:

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

Saga模式的DB Schema:

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
    `id`               VARCHAR(32)  NOT NULL COMMENT 'id',
    `name`             VARCHAR(128) NOT NULL COMMENT 'name',
    `tenant_id`        VARCHAR(32)  NOT NULL COMMENT 'tenant id',
    `app_name`         VARCHAR(32)  NOT NULL COMMENT 'application name',
    `type`             VARCHAR(20)  COMMENT 'state language type',
    `comment_`         VARCHAR(255) COMMENT 'comment',
    `ver`              VARCHAR(16)  NOT NULL COMMENT 'version',
    `gmt_create`       DATETIME(3)  NOT NULL COMMENT 'create time',
    `status`           VARCHAR(2)   NOT NULL COMMENT 'status(AC:active|IN:inactive)',
    `content`          TEXT COMMENT 'content',
    `recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
    PRIMARY KEY (`id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
    `id`                  VARCHAR(128)            NOT NULL COMMENT 'id',
    `machine_id`          VARCHAR(32)             NOT NULL COMMENT 'state machine definition id',
    `tenant_id`           VARCHAR(32)             NOT NULL COMMENT 'tenant id',
    `parent_id`           VARCHAR(128) COMMENT 'parent id',
    `gmt_started`         DATETIME(3)             NOT NULL COMMENT 'start time',
    `business_key`        VARCHAR(48) COMMENT 'business key',
    `start_params`        TEXT COMMENT 'start parameters',
    `gmt_end`             DATETIME(3) COMMENT 'end time',
    `excep`               BLOB COMMENT 'exception',
    `end_params`          TEXT COMMENT 'end parameters',
    `status`              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `is_running`          TINYINT(1) COMMENT 'is running(0 no|1 yes)',
    `gmt_updated`         DATETIME(3) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
    `id`                       VARCHAR(48)  NOT NULL COMMENT 'id',
    `machine_inst_id`          VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
    `name`                     VARCHAR(128) NOT NULL COMMENT 'state name',
    `type`                     VARCHAR(20)  COMMENT 'state type',
    `service_name`             VARCHAR(128) COMMENT 'service name',
    `service_method`           VARCHAR(128) COMMENT 'method name',
    `service_type`             VARCHAR(16) COMMENT 'service type',
    `business_key`             VARCHAR(48) COMMENT 'business key',
    `state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
    `state_id_retried_for`     VARCHAR(50) COMMENT 'state retried for',
    `gmt_started`              DATETIME(3)  NOT NULL COMMENT 'start time',
    `is_for_update`            TINYINT(1) COMMENT 'is service for update',
    `input_params`             TEXT COMMENT 'input parameters',
    `output_params`            TEXT COMMENT 'output parameters',
    `status`                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `excep`                    BLOB COMMENT 'exception',
    `gmt_end`                  DATETIME(3) COMMENT 'end time',
    PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

AT模式时序图

可以看出,AT模式下的分布式事务本质上还是一个两阶段提交。在第一阶段,分支事务会先往TC注册分支事务,TC会先检查要操作的数据是否被加锁,如果没加锁的话RM就会在同一个本地事务中提交对数据的修改跟Undo Log,最后上报分支事务状态给TC。在第二阶段,TM提交事务后,TC会依次往各个RM请求删除Undo Log,在所有的分支事务都提交完后整个分布式事务就成功了。

参考