Bootstrap

Greenplum内核源码分析-分布式事务(三)

目录

  • 前言

  • 初始化和begin命令

前言

这一篇我们会正式开始介绍Greenplum分布式事务的源码。涉及到一些基本的概念,比如Gang,Slice,读者可以参考相关文档。

作者对Greenplum源码的分析会使用,读者可以去github上面自行获取。

初始化和begin命令

这一篇讲的内容对应sql命令就是begin关键字。

$ psql -d gpadmin
psql (8.3.23)
Type "help" for help.

gpadmin=# select version();

                  version

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
--------------------------------------------
 PostgreSQL 8.3.23 (Greenplum Database 5.0.0 build dev) on x86_64-pc-linux-gnu,
compiled by GCC gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit compiled on
Sep 23 2021 20:14:48 (with assert checking)
(1 row)

gpadmin=# begin;
BEGIN
gpadmin=# commit;
COMMIT

上一篇介绍了PostgreSQL本地事务的一个简单例子,我们先贴在这里。

     /  StartTransactionCommand;
    /       StartTransaction;
1) <    ProcessUtility;                 << BEGIN
    \       BeginTransactionBlock;
     \  CommitTransactionCommand;

每一个SQL命令包括这里的Begin,都被 StartTransactionCommand 和CommitTransactionCommand 包裹起来,里面还有StartTransaction 和 BeginTransactionBlock 函数调用。在Greenplum里面,这几个函数的执行过程中,会被加上其他的和分布式相关的代码。

关于几个参与者之间的通信关系,因为只有begin命令,所以也都只用了libpq来做通信。如果后面有复杂的Query,会使用Greenplum自研的Interconnect机制做数据交互。

结合这个图我们简单描述一遍通信的过程。

首先,用"psql -d gpadmin" 连接master node,这个命令会先连接 postgres主进程,然后会fork出一个子进程出来,这个也是PostgreSQL单机版的常规动作。然后从客户端执行 "begin;" 命令。这个命令是通过libpq的协议发送到master node的。

case 'Q':			/* simple query */

在master node上,被新fork出来的进程里面,会一直在"ReadCommand" 这个函数这里等待命令。

/*
 * (3) read a command (loop blocks here)
 */
firstchar = ReadCommand(&input_message);

/* ----------------
 *              ReadCommand reads a command from either the frontend or
 *              standard input, places it in inBuf, and returns the
 *              message type code (first byte of the message).
 *              EOF is returned if end of file.
 * ----------------
 */
static int
ReadCommand(StringInfo inBuf)
{
        int                     result;

        SIMPLE_FAULT_INJECTOR(BeforeReadCommand);

        if (whereToSendOutput == DestRemote)
                result = SocketBackend(inBuf);
        else
                result = InteractiveBackend(inBuf);
        return result;
}

ReadCommand是从SocketBackend 读入数据的。读到以后,发现"firstchar == 'Q'",所以这是一个libpq的simple query,经过query statement的parsing工作,发现是一个单独的begin命令,就在本地执行了 BeginTransactionBlock。

这之后,开始调用 sendDtxExplicitBegin,开始做分布式的工作。首先会到这个函数

dtmPreCommand("sendDtxExplicitBegin", "(none)", NULL,
                        /* is two-phase */ true, /* withSnapshot */ true, /* inCursor */ false );

这个函数是用来mark目前的分布式事务是否要使用两阶段提交协议的,也就是修改 currentGxact->state的状态。

begin命令是不需要的,所以简单的返回了。 但是,后面的commit 是需要的。这个函数在以前的Greenplum版本里面以前叫做 setCurrentDtxTwoPhase,后来加了一些其他功能在里面,然后改名了。

接着后面,到了dispatchDtxCommand函数,再到cdbdisp_dispatchCommandInternal,然后又调用了

dtmPreCommand("cdbdisp_dispatchCommandOrSerializedQuerytree", strCommand, NULL, needTwoPhase, withSnapshot, false /* inCursor */);

最后到 AllocateWriterGang,这里面检测到Gang是没有的,然后开始创建gang

writerGang = createGang(GANGTYPE_PRIMARY_WRITER, PRIMARY_WRITER_GANG_ID, nsegdb, -1);

Gang是Greenplum 里面工作在不同segment 上面,但是为了同一个Slice而生成的一组内存资源。看下面这段code 就能大概了解Gang的物理存在形式,这里的Gang是master node上面的Gang。

if (writerGang == NULL)
        {
                int nsegdb = getgpsegmentCount();

                insist_log(IsTransactionOrTransactionBlock(),
                                "cannot allocate segworker group outside of transaction");

                if (GangContext == NULL)
                {
                        GangContext = AllocSetContextCreate(TopMemoryContext,
                                        "Gang Context",
                                        ALLOCSET_DEFAULT_MINSIZE,
                                        ALLOCSET_DEFAULT_INITSIZE,
                                        ALLOCSET_DEFAULT_MAXSIZE);
                }
                Assert(GangContext != NULL);
                oldContext = MemoryContextSwitchTo(GangContext);

                writerGang = createGang(GANGTYPE_PRIMARY_WRITER,
                                PRIMARY_WRITER_GANG_ID, nsegdb, -1);
                writerGang->allocated = true;

                /*
                 * set "whoami" for utility statement.
                 * non-utility statement will overwrite it in function getCdbProcessList.
                 */
                for(i = 0; i < writerGang->size; i++)
                        setQEIdentifier(&writerGang->db_descriptors[i], -1, writerGang->perGangContext);

                MemoryContextSwitchTo(oldContext);
        }

我们接着从createGang开始讲。有个GUC叫做 ,这个GUC能决定是使用多线程的方式去建立master到segment的数据库连接,还是用异步的方式来建立连接。default的数值是0,如果是0就是用异步方式,大概就是用connect做连接,然后poll做socket的异步监控,直到最后把所有连接都建立好,把fd存好。如果这个GUC的数值不是0,那么就会用多线程的方式来连接segment,起多少个线程需要根据GUC的值和segment的数量来计算。这是同一的目标的两种不同实现,code也比较清楚,可以自己翻看源码。

因为我们用的是default的值,所以用的是异步的方式,通过createGang_async 最后调用了 PQconnectStartParams,这个函数就相当于psql客户端执行(psql -d gpadmin)会去连接每个segment数据库的postgres进程。

这些进程也会fork子进程出来,然后开始准备环境,执行后续sql 命令。代码到了这里,master连接segments的工作就完成了。

后面的函数就是在发送具体的命令,就是Begin命令。cdbdisp_dispatchToGang发送,因为是异步,所以cdbdisp_waitDispatchFinish等待发送完成,然后cdbdisp_getDispatchResults等待segments回复结果。

cdbdisp_dispatchToGang发送用到了一个新的消息协议,就是firstchar = 'M'。回顾前面的协议,我们讲过Greenplum在PostgreSQL的通信协议基础上增加了两个新的协议。

case 'M':     /* MPP dispatched stmt from QD */
case 'T':     /* MPP dispatched dtx protocol command from QD */

master node上面发送了消息以后,就开始等待所有的segments的回复。现在我们再转到segments上面继续看逻辑顺序。 segment在启动以后,还是和一般的PostgreSQL应用一样,用ReadCommand等待输入。

这里等到的是firstchar = 'M'。针对这个分支,segment会进入一个函数叫做 setupQEDtxContext,在这个函数里面完成了单机版begin命令的所有操作。然后用ReadyForQuery函数给master node回复 ready for query信息,这个信息也是libpq协议定义过的。

case 'Z':		/* backend is ready for new query */

仔细看code 实现,能看出来发回的是一个firstchar = 'Z'的消息。

我们再切换回master node,通过函数cdbdisp_returnResults 把结果返回,然后清理Gang里面的context,这里因为是用的异步的方式,所以函数是cdbdisp_destroyDispatcherState,如果是多线程的方式,函数应该是cdbdisp_destroyDispatchThreads。 接着master node用CommitTransactionCommand结束这个begin命令,然后用和segment 相同的方式返回ready for query信息给psql 客户端,整个过程结束。

总结,回顾一下这个简单的begin命令的执行过程,psql永远是发起命令的客户端,master node对于psql是server端,但是Greenplum集群里面,它还扮演了libpq客户端的角色去访问每个segment。如果只传送关键字命令,比如begin/commit/rollback 之类的,segment上面的逻辑略微简单,后面会分析比较复杂的SQL语句,整个过程会变得更加复杂,但是命令逻辑变化还是不大的。

相关阅读: