Bootstrap

Greenplum 内核源码分析 - 分布式事务 (四)

目录

  • 前言

  • insert命令

前言

上一篇文章我们介绍了begin命令和初始化工作。begin命令的主要工作是初始化,创建Gang,把master node上面的状态机走一走。然后begin命令也会发送到segment上面,segment上面的实例也开始初始化,准备接收新的命令。

后面文中的术语:

QD---> master node

QE--->segment node

QE被分为QE writer gang和QE reader gang两种,这是在创建Gang的时候就确定了的。writer gang和相关的reader gang属于一个segMates组,一个组里面有一个writer 和 一个或多个reader。自然,writer gang负责的工作最全面,QE上面数据库事务的功能基本都是writer gang完成的。

本篇文章我们介绍insert命令,也就是在begin以后,客户端执行"insert into t2 values (1,11); "。

请大家注意,这个时候还没有开始做两阶段提交。只是master node发信息,更新状态,segment node接收信息,更新状态。

对应的PostgreSQL的函数逻辑再贴在这里

    /   StartTransactionCommand;
3) /    ProcessQuery;                   << INSERT ...
   \    CommitTransactionCommand;
    \       CommandCounterIncrement;

作者对 Greenplum 源码的分析会使用,读者可以去 github 上面自行获取。

insert命令

结合这个图,我们简单描述一下执行的过程。

在begin以后,客户端执行"insert into t2 values (1,10);" psql 给QD发送信息,ReadCommand返回 firstchar为'Q'的query。 QD的PortalRun进入ExecutorStart函数,dtmPreCommand("ExecutorStart")没有向QE writer发送信息,只是在QD上修改了状态机的状态。

QD在ExecutorStart函数里面调用 CdbDispatchPlan 把执行计划发送到各个QE writer里面,这些QE writer都是在begin命令的时候启动的,启动以后都在监听来自QD上的libpq客户端发出来的信息。 QD发送完执行计划以后,就进入ExecutorEnd函数,等待QE writer发送结果回来。具体的等待函数是 CdbCheckDispatchResult。

在QE writer里,ReadCommand得到 firstchar 为'M'的信息,按照这个信息,开始进行ExecutorRun函数进行操作。这个函数会调用 ExecutePlan函数,这个就是开始执行执行计划的具体地方。

在ExecInsert 被执行后,有两个分支,一个是写数据本身,函数是

heap_insert-->RelationPutHeapTuple-->PageAddItem;另外一个分支是 XLogInsert,也就是在写WAL log。这两个函数被调用完后,就会按照正常的流程返回ready for query message 给QD,所有的QE writer都会做这样的操作,QD也会收到所有的QE writer返回的ready for query message。

当QD返回ready for query message给psql 客户端以后,整个过程就结束了。

QD上面的函数栈信息

Breakpoint 1, cdbdisp_dispatchX (pQueryParms=0x16e11e0, cancelOnError=1 '\001', sliceTbl=0x16dccb8, ds=0x16dcba8) at cdbdisp_query.c:1175
1175		SliceVec *sliceVector = NULL;
(gdb) bt
#0  cdbdisp_dispatchX (pQueryParms=0x16e11e0, cancelOnError=1 '\001', sliceTbl=0x16dccb8, ds=0x16dcba8) at cdbdisp_query.c:1175
#1  0x0000000000ad344a in CdbDispatchPlan (queryDesc=0x16dc518, planRequiresTxn=1 '\001', cancelOnError=1 '\001', ds=0x16dcba8)
    at cdbdisp_query.c:287
#2  0x00000000006f1170 in ExecutorStart (queryDesc=0x16dc518, eflags=0) at execMain.c:686
#3  0x00000000008f0d5f in ProcessQuery (portal=0x15782a8, stmt=0x16c7e50, params=0x0, dest=0x16c7a98, completionTag=0x7ffc82337180 "")
    at pquery.c:291
#4  0x00000000008f2ba9 in PortalRunMulti (portal=0x15782a8, isTopLevel=1 '\001', dest=0x16c7a98, altdest=0x16c7a98,
    completionTag=0x7ffc82337180 "") at pquery.c:1467
#5  0x00000000008f20dc in PortalRun (portal=0x15782a8, count=9223372036854775807, isTopLevel=1 '\001', dest=0x16c7a98, altdest=0x16c7a98,
    completionTag=0x7ffc82337180 "") at pquery.c:1029
#6  0x00000000008ea459 in exec_simple_query (query_string=0x160fc18 "insert into t2 values (10,11);", seqServerHost=0x0, seqServerPort=-1)
    at postgres.c:1776
#7  0x00000000008eef4f in PostgresMain (argc=1, argv=0x1572f40, dbname=0x1572d78 "postgres", username=0x1572d38 "gpadmin") at postgres.c:4975
#8  0x00000000008822e5 in BackendRun (port=0x15834a0) at postmaster.c:6732
#9  0x0000000000881971 in BackendStartup (port=0x15834a0) at postmaster.c:6406
#10 0x000000000087a7e1 in ServerLoop () at postmaster.c:2444
#11 0x00000000008790ea in PostmasterMain (argc=15, argv=0x154a0a0) at postmaster.c:1528
#12 0x0000000000791ba9 in main (argc=15, argv=0x154a0a0) at main.c:206

QE上面的两个函数栈

(gdb) c
Continuing.

Breakpoint 2, XLogInsert (rmid=10 '\n', info=0 '\000', rdata=0x7ffc3a2ac3b0) at xlog.c:778
778		return XLogInsert_Internal(rmid, info, rdata, GetCurrentTransactionIdIfAny());
(gdb) bt
#0  XLogInsert (rmid=10 '\n', info=0 '\000', rdata=0x7ffc3a2ac3b0) at xlog.c:778
#1  0x00000000004c2bcb in heap_insert (relation=0x7f815d3a9788, tup=0x2ce4f18, cid=1, use_wal=1 '\001', use_fsm=1 '\001', xid=1257)
    at heapam.c:2432
#2  0x00000000006f5f68 in ExecInsert (slot=0x2ce4448, dest=0x2bf16d0, estate=0x2ce36f8, planGen=PLANGEN_PLANNER, isUpdate=0 '\000')
    at execMain.c:3372
#3  0x00000000006f56e7 in ExecutePlan (estate=0x2ce36f8, planstate=0x2ce3dc0, operation=CMD_INSERT, numberTuples=0,
    direction=ForwardScanDirection, dest=0x2bf16d0) at execMain.c:3092
#4  0x00000000006f17d1 in ExecutorRun (queryDesc=0x2bf5b88, direction=ForwardScanDirection, count=0) at execMain.c:912
#5  0x00000000008f0d75 in ProcessQuery (portal=0x2bf2a38, stmt=0x2be2cf0, params=0x0, dest=0x2bf16d0, completionTag=0x7ffc3a2acb00 "")
    at pquery.c:296
#6  0x00000000008f2ba9 in PortalRunMulti (portal=0x2bf2a38, isTopLevel=1 '\001', dest=0x2bf16d0, altdest=0x2bf16d0,
    completionTag=0x7ffc3a2acb00 "") at pquery.c:1467
#7  0x00000000008f20dc in PortalRun (portal=0x2bf2a38, count=9223372036854775807, isTopLevel=1 '\001', dest=0x2bf16d0, altdest=0x2bf16d0,
    completionTag=0x7ffc3a2acb00 "") at pquery.c:1029
#8  0x00000000008e9729 in exec_mpp_query (query_string=0x2be0f36 "insert into t2 values (10,11);", serializedQuerytree=0x0,
    serializedQuerytreelen=0, serializedPlantree=0x2be0f55 "\205\001", serializedPlantreelen=162, serializedParams=0x0, serializedParamslen=0,
    serializedQueryDispatchDesc=0x2be0ff7 "s", serializedQueryDispatchDesclen=72, seqServerHost=0x2be103f "127.0.0.1", seqServerPort=17722,
    localSlice=0) at postgres.c:1349
#9  0x00000000008ef6da in PostgresMain (argc=1, argv=0x2be8248, dbname=0x2be81a8 "postgres", username=0x2be8168 "gpadmin") at postgres.c:5159
#10 0x00000000008822e5 in BackendRun (port=0x2bf78f0) at postmaster.c:6732
#11 0x0000000000881971 in BackendStartup (port=0x2bf78f0) at postmaster.c:6406
#12 0x000000000087a7e1 in ServerLoop () at postmaster.c:2444
#13 0x00000000008790ea in PostmasterMain (argc=12, argv=0x2bbe5c0) at postmaster.c:1528
#14 0x0000000000791ba9 in main (argc=12, argv=0x2bbe5c0) at main.c:206

(gdb) b CommandCounterIncrement
Breakpoint 1 at 0x4fc262: file xact.c, line 894.
(gdb) c
Continuing.

Breakpoint 1, CommandCounterIncrement () at xact.c:894
894		if (currentCommandIdUsed)
(gdb) bt
#0  CommandCounterIncrement () at xact.c:894
#1  0x00000000004ffd9f in CommitTransactionCommand () at xact.c:3611
#2  0x00000000008ecd5c in finish_xact_command () at postgres.c:3228
#3  0x00000000008ea4a7 in exec_simple_query (query_string=0x160fc18 "insert into t2 values (10,11);", seqServerHost=0x0, seqServerPort=-1)
    at postgres.c:1807
#4  0x00000000008eef4f in PostgresMain (argc=1, argv=0x1572f40, dbname=0x1572d78 "postgres", username=0x1572d38 "gpadmin") at postgres.c:4975
#5  0x00000000008822e5 in BackendRun (port=0x15834a0) at postmaster.c:6732
#6  0x0000000000881971 in BackendStartup (port=0x15834a0) at postmaster.c:6406
#7  0x000000000087a7e1 in ServerLoop () at postmaster.c:2444
#8  0x00000000008790ea in PostmasterMain (argc=15, argv=0x154a0a0) at postmaster.c:1528
#9  0x0000000000791ba9 in main (argc=15, argv=0x154a0a0) at main.c:206
(gdb)

结合上面几个函数栈信息,我们把PostgreSQL 源代码上面的经典例子和解释再次贴出来,然后和Greenplum的执行流程做对比。

For example, consider the following sequence of user commands:

1)              BEGIN
2)              SELECT * FROM foo
3)              INSERT INTO foo VALUES (...)
4)              COMMIT

In the main processing loop, this results in the following function call
sequence:

         /      StartTransactionCommand;
        /               StartTransaction;
1) <            ProcessUtility;                         << BEGIN
        \               BeginTransactionBlock;
         \      CommitTransactionCommand;

        /       StartTransactionCommand;
2) /            ProcessQuery;                           << SELECT ...
   \            CommitTransactionCommand;
        \               CommandCounterIncrement;

        /       StartTransactionCommand;
3) /            ProcessQuery;                           << INSERT ...
   \            CommitTransactionCommand;
        \               CommandCounterIncrement;

         /      StartTransactionCommand;
        /       ProcessUtility;                         << COMMIT
4) <                    EndTransactionBlock;
        \       CommitTransactionCommand;
         \              CommitTransaction;

我们发现,QD上面做了所有的步骤和PostgreSQL的事务过程是一致的。有三个函数 ExecutorStart,ExecutorRun 和 ExecutorEnd。这三个函数,不管是在QD上还是在QE上,都会执行一遍。读者可以自行加断点进行调试和理解。

总结,本篇文章讲述了begin命令以后,insert命令在master node和segment node上面的执行过程。

在下一篇文章里面,我们会介绍Greenplum里面的状态机。状态机对于分布式事务非常重要,如果我们在分布式事务或者两阶段提交的过程中遇到异常,那么就要根据状态机的状态进行纠错,不同的状态下的纠错路径是不同的。

相关阅读: