Bootstrap

并发编程如何才能不再头疼:iOS中的协程

什么是协程

当前iOS并发编程的痛点

  • 网络、IO等耗时接口都是同步方法,一般都放在后台线程调用,然后主线程刷新UI。这里使用GCD等。

  • 缺乏统一的异步编程模型,只能使用delegate(逻辑割裂、共享则不安全)、closure(回调地狱,调试不便等)、Notification等,经常忘记调用completion等。

  • 多线程异步回调的复杂性和不可控性。

  • 多线程涉及到的锁、信号量、GCD Group等线程安全措施。还可能有野指针访问、容器操作、非主线程更新UI导致错误、过度释放等严重问题。

  • 基于Promise、GCD语法糖等第三方方案并不完美,且维护困难。

  • 响应式编程的概念复杂、调试困难、学习曲线高。

协程的基本概念

抢占式调度

当前iOS中的多线程调度都是抢占式的,每次阻塞、切换线程都需要系统调用,即CPU先执行调度程序,再切换对应的线程。而多个线程之间还会存在互斥加锁之类的开销。多线程还要非常小心地处理被抢占时的状态保存等操作。

非抢占式调度

一个进程包含多个线程,一个线程可以包含多个协程。但多个协程的执行是串行的。

多核CPU的场景是,多进程或一个进程内的多个线程可以并行运行,但是一个线程内的多个协程却只能串行执行。当一个协程运行时,其他协程必须挂起。

协程是基于线程,非抢占式调度,一般是执行完毕主动让出CPU。由协程调度器进行协程切换。

所以协程本质上是单线程的,没有线程切换的开销,也没有多线程同步加锁之类的开销。相对于线程而言,协程是非常轻量级的。

非主协程中执行的任务必须要是异步的。

协程的特点

  • 非抢占式:无需系统调用,协程调度是线程安全,无需lock等。

  • 挂起执行:保存寄存器和栈,不影响其他协程执行。

  • 恢复执行:恢复之前的寄存器和栈,无缝切换回之前的执行逻辑。

举个例子

co_launch(^{
    NSLog(@"1");
});

co_launch(^{
    co_delay(2);
    NSLog(@"2");
});

co_launch(^{
    co_delay(1);
    NSLog(@"2-delay-1");
});

co_launch(^{
    NSLog(@"3");
});

co_launch_onqueue(dispatch_queue_create(0, 0), ^{
    NSLog(@"4");
});

co_launch_onqueue(dispatch_queue_create(0, 0), ^{
    NSLog(@"5");
});

NSLog(@"0");
// 0 4 5 1 3 2-delay-1 2
// 4和5为后台线程。其中的4、5的顺序可能不一定。

其他编程语言中的协程

promise

promise在javascript非常常见。iOS也有类似的三方库。

async/await

js已经有了。

goroutine

go语言中的go routine和go channel可以说是非常独特的并发编程方式。

func goChannel() {
	ch2 := make(chan string, 1)

	// 关键字 go 后跟的就是需要被并发执行的代码块
	go func() {
		time.Sleep(time.Second * 2)
		fmt.Println("协程中:数据来了")
		time.Sleep(time.Second * 2)
		ch2 <- " 已到达!"
	}()
	
	fmt.Println("等待数据:已阻塞")

	// chan接收操作,在chan没有有效数据的情况下会被阻塞,
	// 所以这里会阻塞, 等到ch2 <- "已到达!"执行完毕, 可取出的时候继续代码.
	
	var value string = "测试数据"
	value = value + <-ch2

	// 重复关闭chan会导致panic
	close(ch2)

	fmt.Println(value)

	time.Sleep(time.Second)
}

执行 *go run demo-goroutine.go* 结果如下:

等待数据:已阻塞
协程中:数据来了
测试数据 已到达!

Python中的yield

Kotlin中的Generator

自己动手使用C语言实现简单的协程

ucontext

ucontext结构体是协程中的基本结构。

typedef struct ucontext {
  struct ucontext *uc_link; // 后继context
  sigset_t uc_sigmask; // 该context中的阻塞信号集合
  stack_t uc_stack; // 该context中使用的stack
  mcontext_t uc_mcontext; // 保存的context的特定机器表示,包括调用线程的特定寄存器等。
  ...
} ucontext_t;

其中,uc_link必不可少,用于指定后继context,这样才能实现context切换。

un_stack存储当前context的数据,切换时需要保存。

ucontext相关操作函数

相关操作函数其实非常简单

getcontext

int getcontext(ucontext_t *ucp);

初始化ucp结构体,保存当前context到ucp中。

setcontext

int setcontext(const ucontext_t *ucp);

设置当前的context为ucp,该ucp通过getcontext或makecontext取得。

getcontext和setcontext是这里的关键。请看这段C代码:

#ifdef __APPLE__
#define _XOPEN_SOURCE
#endif

#include 
#include 
#include 

int main(int argc, const char *argv[]){
    ucontext_t context;
    
    getcontext(&context);
    puts("Hello world");
    sleep(1);
    setcontext(&context);

    puts("1111");

    return 0;
}

这段代码执行的结果是一直循环输出hello world,不会终止。原因:

makecontext

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);

把func关联到context。

修改通过getcontext取得的context的ucp。即调用之前必须要先调用getcontext。然后给该context指定一个ucp->stack,设置后继的context即ucp->uc_link。

当该context通过setcontext或swapcontext激活后,执行func函数,func返回后,后继的context被激活,若后继context为NULL,则线程退出。

swapcontext

int swapcontext(ucontext_t *oucp, ucontext_t *ucp);

保存当前上下文到oucp结构体中,然后激活ucp上下文。

一般oucp不设置,即当前context是为main即可。

swap(&main, &child); // 切换到child context,保存当前context到main。

ucontext总结

以上四个函数的作用可以简要概括如下:

参考资料:

使用ucontext来模拟iOS中的多线程

iOS中多线程主要是main thread和global thread的使用和切换。

我们使用ucontext来模拟多线程切换,以加深理解。

#ifdef __APPLE__
#define _XOPEN_SOURCE
#endif

#include 
#include 
#include 

void global_func(void *arg) {
  puts("this is global_func");
  puts("1234567890");
}

void test_context() {
  char global_stack[1024*512];
  ucontext_t main, global;

  puts("this is main thread");

  getcontext(&global);
  global.uc_stack.ss_sp = global_stack;
  global.uc_stack.ss_size = sizeof(global_stack);
  global.uc_stack.ss_flags = 0;
  global.uc_link = &main;
  // global.uc_link = NULL;

  makecontext(&global, (void (*)(void))global_func, 0);

  // 切换到global,并保存当前上下文到main中。
  swapcontext(&main, &global);
  // 再次会导致segmentation fault
  // swapcontext(&main, &global);
  puts("this is main thread");
}

int main() {
  test_context();

  return 0;
}

输出结果:

this is main thread
this is global_func
1234567890
this is main thread

以上代码,模拟了iOS中最常见的多线程场景:

main thead -> 切换至global thread执行异步任务 -> 切换回main thread更新UI。

这段代码中,默认的ucontext可以视为main thread。

先创建了global的ucontext,初始化其参数,并指定其执行代码为global_func。

注意其中使用swapcontext(&main, &global);从main thread切换到global thread的代码。

在此基础之上,加入自行控制的调度逻辑,则可制作最小功能的协程库了。

coobjc

coobjc是阿里巴巴开源的iOS协程库。

提供了类似C#和Javascript语言中的Async/Await编程方式支持,在协程中通过调用await方法即可同步得到异步方法的执行结果,非常适合IO、网络等异步耗时调用的同步顺序执行改造。
提供了类似Kotlin中的Generator功能,用于懒计算生成序列化数据,非常适合多线程可中断的序列化数据生成和访问。
提供了Actor Model的实现,基于Actor Model,开发者可以开发出更加线程安全的模块,避免由于直接函数调用引发的各种多线程崩溃问题。
提供了元组的支持,通过元组Objective-C开发者可以享受到类似Python语言中多值返回的好处。

提供了对NSArray、NSDictionary等容器库的协程化扩展,用于解决序列化和反序列化过程中的异步调用问题。
提供了对NSData、NSString、UIImage等数据对象的协程化扩展,用于解决读写IO过程中的异步调用问题。
提供了对NSURLConnection和NSURLSession的协程化扩展,用于解决网络异步请求过程中的异步调用问题。
提供了对NSKeyedArchieve、NSJSONSerialization等解析库的扩展,用于解决解析过程中的异步调用问题。

实现原理

coobjc即采用第二种,coroutine_context.s文件。 使用汇编语言来模拟ucontext,即setcontext和getcontext等函数。

_coroutine_getcontext
_coroutine_begin
_coroutine_setcontext

coobjc的核心原理是控制调用栈的主动让出和恢复。

解决痛点

iOS业务开发过程中,存在很多网络请求、IO调用、数据解析等不能阻塞主线程的任务,可以放到后台线程执行。然而, 执行后又需要切换到主线程进行UI的更新。

一些复杂业务的线程切换代码复杂难懂。

同时,复杂并发任务需要引入GCD的group,semaphore,barrier等各种机制来配合完成,使得程序中很容易出现多线程竞争的问题,甚至出现死锁、崩溃等。

引入coobjc,在不改变原有业务代码的基础上,全局hook部分IO、数据解析方法,让原来主线程中同步执行的IO方法异步执行,且不影响原有业务逻辑。

一句话:*以同步方式编写异步逻辑代码,防止线程切换的开销,防止lock、信号量等引发的问题。*

使用方法

需要将后台线程执行的包装成异步操作!

使用Promise来包装异步操作

```Objective-C

// co_launch创建的协程默认在当前线程进行调度

// 使用await方法等待异步方法执行结束,得到异步执行结果

  • (void)testCoobjcAsyncPromise {

MBCTimer(@"before");

co_launch(^{

NSString *urlString = @"http://t1.hxzdhn.com/uploads/tu/201612/58/1231036404.jpg";

NSURL *url = [NSURL URLWithString:urlString];

// async downloadDataFromUrl

NSData *imageData = await([self asyncPromiseDownloadDataFromURL:url]);

// async transform data to image

UIImage *image = await([self asyncPromiseDecodeImageData:imageData]);

// set image to imageView

self.imgView.image = image;

});

MBCTimer(@"after");

}

  • (COPromise )asyncPromiseDownloadDataFromURL:(NSURL )url {

COPromise *promise = [COPromise promise];

// 模拟异步的事件

dispatchasync(dispatchget_global_queue(DISPATCHQUEUEPRIORITY_LOW, 0), ^{

NSData *imageData = [NSData dataWithContentsOfURL:url];

[promise fulfill:imageData];

// [promise reject:error];

});

return promise;

}

  • (COPromise )asyncPromiseDecodeImageData:(NSData )imageData {

COPromise *promise = [COPromise promise];

// 模拟异步的事件

dispatchasync(dispatchget_global_queue(DISPATCHQUEUEPRIORITY_LOW, 0), ^{

UIImage *image = [UIImage imageWithData:imageData];

[promise fulfill:image];

// [promise reject:error];

});

return promise;

}


Swift也可以使用:

```Swift
co_launch {
  let result = try await(channel: co_fetchSomething())
  //
}

co_launch {
  let result = try await(promise: co_fetchSomethingAsynchronous())
  switch resust {
    case .fulfilled(let data):
      //
    case .rejected(let error):
      //
  }
}

使用Channel

```Objective-C

  • (void)testCoobjcAsyncChannel {

MBCTimer(@"before");

co_launch(^{

NSString *urlString = @"http://t1.hxzdhn.com/uploads/tu/201612/58/1231036404.jpg";

NSURL *url = [NSURL URLWithString:urlString];

// async downloadDataFromUrl

NSData *imageData = await([self asyncChannelDownloadDataFromURL:url]);

// async transform data to image

UIImage *image = await([self asyncChannelDecodeImageData:imageData]);

// set image to imageView

self.imgView.image = image;

});

MBCTimer(@"after");

}

  • (COChan )asyncChannelDownloadDataFromURL:(NSURL )url {

COChan *chan = [COChan chan];

co_launch(^{

NSData *imageData = [NSData dataWithContentsOfURL:url];

[chan send:imageData];

});

return chan;

}

  • (COChan )asyncChannelDecodeImageData:(NSData )imageData {

COChan *chan = [COChan chan];

co_launch(^{

UIImage *image = [UIImage imageWithData:imageData];

[chan send:image];

});

return chan;

}


#### 错误处理

协程中的所有方法都是直接返回值的,并未返回错误,而在执行过程中的错误是通过co_getError()获取的。
没有复杂的错误处理机制。

```Objective-C
- (COPromise *)co_GET:(NSString *)url
           parameters:(NSDictionary *)parameters {
  COPromise *promise = [COPromise promise];
  [self GET:url
    parameters:parameters
    progress:nil
    success:^(NSURLSessionDataTask * _Nonnull task, id _Nullable responseObject) {
      [promise fulfill:responseObject];
    }
    failure:^(NSURLSessionDataTask * _Nullable task, NSError * _Nonnull error) {
      [promise reject:error];
  }];
}

如上的网络请求数据,失败时promise会reject:error。

使用方式:

```Objective-C

co_launch(^{

id response = await([self co_GET:feedModel.feedUrl parameters:nil]);

if (co_getError()) {

// handle error info

return;

}

// ...

});


#### 创建生成器

类似Python中的yield。生成器是一次生成一个值的特殊类型函数,可将其视为可恢复函数。

在函数的执行过程中,yield语句将你需要的值返回给调用生成器的地方,然后退出函数,下一次调用生成器函数的时候又从上次中断的地方开始执行。

而生成器内的所有变量参数都会被保存下来以供下一次使用。

```Objective-C
COCoroutine *co1 = co_sequence(^{
  int index = 0;
  while(co_isActive()) {
    yield_val(@(index)); // 调用生成器将NSValue返回
    index++;
  }
});

co_launch(^{
  for (int i = 0; i < 10; i++) {
    val = [[co1 next] intValue];
  }
});

类似Swift中的Sequence协议,对外封装一个next方法。

通过Generator,将传统的生产者加载数据->通知消费者,变成消费者需要数据->告诉生产者加载。

好处在哪里?

生成器可以在很多场景中进行使用,比如消息队列、批量下载文件、批量加载缓存等:
通过生成器,我们可以把传统的生产者加载数据->通知消费者模式,变成消费者需要数据->告诉生产者加载模式,避免了在多线程计算中,需要使用很多共享变量进行状态同步,消除了在某些场景下对于锁的使用。

```Objective-C

int unreadMessageCount = 10;

NSString *userId = @"xxx";

COSequence *messageSequence = sequenceOnBackgroundQueue(@"message_queue", ^{

//在后台线程执行

while(1){

yield(queryOneNewMessageForUserWithId(userId));

}

});

//主线程更新UI

co(^{

for(int i = 0; i < unreadMessageCount; i++){

if(!isQuitCurrentView()){

displayMessage([messageSequence take]);

}

}

});


#### Actor

消息-订阅机制。

Actor就是一个容器,用于存储状态、行为、Mailbox及子Actor与Supervisor策略。Actor之间并不通信,而是通过Mail来互通。跟Mach及Mach-msg机制类似。

```Objective-C
COActor *actor = co_actor_onqueue(^(COActorChan *channel) {
  // 定义actor的状态变量
  for (COActorMessage *message in channel) {
    // 处理消息
  }
}, q};

// 给actor发送消息
[actor send:@"send msg"];
[actor send:@(1)];

跟nodejs的订阅-发送机制一致。

Actor是一种并发模型,内部的状态由它自己维护,即它内部数据只能由它自己修改(通过消息传递来进行状态修改)。

所以使用Actor模型进行并发编程可以很好地避免加锁等线程问题。Actor由状态(State)、行为(Behavior)和邮箱(MailBox)三部分组成。

  • State:Actor的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题。

  • Behavior:Actor中的计算逻辑,通过Actor接收到的消息来改变Actor的状态。

  • MailBox:Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor的消息,接收方Actor从邮箱队列中获取消息。

coobjc的其他tip

可取消

COCoroutine *coroutine = co_launch({
  co_delay(2);
  if (co_isCancelled()) {
    return;
  }
  // 执行
});
[coroutine cancel];

可暂停

暂停当前的协程,不会影响线程中的其他协程的运行。

co_launch({
  co_delay(2);
});

co_launch({
  NSLog(@"上一个协程暂停,而这个协程可以运行");
});

注意,这里的delay可以理解为当前协程让出CPU,即挂起。此时该线程寻找所属的其他协程来执行。

错误处理

执行过程中的错误是通过co_getError()

使得OC代码支持元组

```Objective-C

COTuple *tuple = co_tuple(nil, @"1", @(2), @[@1,@2], @{@"key":@"value"});

// NSArray *array = tuple[3]; // 支持下标直接取出

id nonValue;

NSString *stringValue;

NSNumber *numberValue;

NSArray *arrayValue;

NSDictionary *dictValue;

co_unpack(&nonValue, &stringValue, &numberValue, &arrayValue, &dictValue) = tuple;

NSLog(@"testOCTuple");


## coobjc的源码分析

coobjc使用汇编语言实现了一套类似ucontext的基本操作。

### coroutine_ucontext

类似ucontext的结构。

```C
typedef struct coroutine_ucontext {
    uint64_t data[100];
} coroutine_ucontext_t;

struct coroutine_ucontext_re {
    struct GPRs {
        uint64_t __x[29]; // x0-x28
        uint64_t __fp;    // Frame pointer x29
        uint64_t __lr;    // Link register x30
        uint64_t __sp;    // Stack pointer x31
        uint64_t __pc;    // Program counter
        uint64_t padding; // 16-byte align, for cpsr
    } GR;
    double  VR[32];
};

类似ucontext的基本实现

extern int coroutine_getcontext (coroutine_ucontext_t *__ucp);

extern int coroutine_setcontext (coroutine_ucontext_t *__ucp);
extern int coroutine_begin (coroutine_ucontext_t *__ucp);

extern int coroutine_makecontext (coroutine_ucontext_t *__ucp, IMP func, void *arg, void *stackTop);

感兴趣的可以参考coobjc源码中的coroutine_context.s文件。

coroutine相关的一些结构体

/**
The structure store coroutine's context data.
*/
struct coroutine {
  coroutine_func entry;                   // Process entry.
  void *userdata;                         // Userdata.
  coroutine_func userdata_dispose;        // Userdata's dispose action.
  void *context;                          // Coroutine's Call stack data.
  void *pre_context;                      // Coroutine's source process's Call stack data.
  int status;                             // Coroutine's running status.
  uint32_t stack_size;                    // Coroutine's stack size
  void *stack_memory;                     // Coroutine's stack memory address.
  void *stack_top;                    // Coroutine's stack top address.
  struct coroutine_scheduler *scheduler;  // The pointer to the scheduler.
  
  struct coroutine *prev;
  struct coroutine *next;
  
  void *autoreleasepage;                  // If enable autorelease, the custom autoreleasepage.
  void *chan_alt;                         // If blocking by a channel, record the alt
  bool is_cancelled;                      // The coroutine is cancelled
  int8_t   is_scheduler;                  // The coroutine is a scheduler.
};
typedef struct coroutine coroutine_t;

/**
Define the linked list of scheduler's queue.
*/
struct coroutine_list {
  coroutine_t *head;
  coroutine_t *tail;
};
typedef struct coroutine_list coroutine_list_t;

/**
Define the scheduler.
One thread own one scheduler, all coroutine run this thread shares it.
*/
struct coroutine_scheduler {
  coroutine_t         *main_coroutine;
  coroutine_t         *running_coroutine;
  coroutine_list_t     coroutine_queue;
};
typedef struct coroutine_scheduler coroutine_scheduler_t;

coroutine相关的一些操作

/**
Create a new routine.
     
@param func main entrance
@return routine obj
*/
coroutine_t *coroutine_create(coroutine_func func);

/**
Add coroutine to scheduler, and resume the specified coroutine whatever.
*/
void coroutine_resume(coroutine_t *co); // 恢复

/**
Add coroutine to scheduler, and resume the specified coroutine if idle.
*/
void coroutine_add(coroutine_t *co); // 添加

/**
Yield the specified coroutine now.
*/
void coroutine_yield(coroutine_t *co); // 挂起

coroutine_t *coroutine_create(coroutine_func func) {
    coroutine_t *co = calloc(1, sizeof(coroutine_t));
    co->entry = func;
    co->stack_size = STACK_SIZE;
    co->status = COROUTINE_READY;
    
    // check debugger is attached, fix queue debugging.
    co_rebind_backtrace();
    return co;
}

void coroutine_resume(coroutine_t *co) {
    if (!co->is_scheduler) {
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        
        scheduler_queue_push(scheduler, co);
        
        if (scheduler->running_coroutine) {
            // resume a sub coroutine.
            scheduler_queue_push(scheduler, scheduler->running_coroutine);
            coroutine_yield(scheduler->running_coroutine);
        } else {
            // scheduler is idle
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

void coroutine_add(coroutine_t *co) {
    if (!co->is_scheduler) {
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        if (scheduler->main_coroutine->status == COROUTINE_DEAD) {
            coroutine_close_ifdead(scheduler->main_coroutine);
            coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
            main_co->is_scheduler = true;
            main_co->scheduler = scheduler;
            scheduler->main_coroutine = main_co;
        }
        scheduler_queue_push(scheduler, co);
        
        if (!scheduler->running_coroutine) {
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

// use optnone to keep the `skip` not be optimized.
__attribute__ ((optnone))
void coroutine_yield(coroutine_t *co)
{
    if (co == NULL) {
        // if null
        co = coroutine_self();
    }
    BOOL skip = false;
    coroutine_getcontext(co->context);
    if (skip) {
        return;
    }
#pragma unused(skip)
    skip = true;
    co->status = COROUTINE_SUSPEND;
    coroutine_setcontext(co->pre_context);
}

调度器coroutine_scheduler

coroutine_scheduler的相关操作

有了类似ucontext的基本数据结构和基本操作,自行设计一个调度器即可:

scheduler的调度:

// The main entry of the coroutine's scheduler
// The scheduler is just a special coroutine, so we can use yield.
void coroutine_scheduler_main(coroutine_t *scheduler_co) {
    
    coroutine_scheduler_t *scheduler = scheduler_co->scheduler;
    for (;;) {
        
        // Pop a coroutine from the scheduler's queue.
        coroutine_t *co = scheduler_queue_pop(scheduler);
        if (co == NULL) {
            // Yield the scheduler, give back cpu to origin thread.
            coroutine_yield(scheduler_co);
            
            // When some coroutine add to the scheduler's queue,
            // the scheduler will resume again,
            // then will resume here, continue the loop.
            continue;
        }
        // Set scheduler's current running coroutine.
        scheduler->running_coroutine = co;
        // Resume the coroutine
        coroutine_resume_im(co);
        
        // Set scheduler's current running coroutine to nil.
        scheduler->running_coroutine = nil;
        
        // if coroutine finished, free coroutine.
        if (co->status == COROUTINE_DEAD) {
            coroutine_close_ifdead(co);
        }
    }
}

// use optnone to keep the `skip` not be optimized.
__attribute__ ((optnone))
void coroutine_resume_im(coroutine_t *co) {
    switch (co->status) {
        case COROUTINE_READY:
        {
            co->stack_memory = coroutine_memory_malloc(co->stack_size);
            co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
            // get the pre context
            co->pre_context = malloc(sizeof(coroutine_ucontext_t));
            BOOL skip = false;
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            
            free(co->context);
            co->context = calloc(1, sizeof(coroutine_ucontext_t));
            coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
            // setcontext
            coroutine_begin(co->context);
            
            break;
        }
        case COROUTINE_SUSPEND:
        {
            BOOL skip = false;
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            // setcontext
            coroutine_setcontext(co->context);
            
            break;
        }
        default:
            assert(false);
            break;
    }
}

链表结构: 向队列中添加/删除coroutine对象。

#pragma mark - linked lists

void scheduler_queue_push(coroutine_scheduler_t *scheduler, coroutine_t *co) {
    coroutine_list_t *queue = &scheduler->coroutine_queue;
    if(queue->tail) {
        queue->tail->next = co;
        co->prev = queue->tail;
    } else {
        queue->head = co;
        co->prev = nil;
    }
    queue->tail = co;
    co->next = nil;
}

coroutine_t *scheduler_queue_pop(coroutine_scheduler_t *scheduler) {
    coroutine_list_t *queue = &scheduler->coroutine_queue;
    coroutine_t *co = queue->head;
    if (co) {
        queue->head = co->next;
        // Actually, co->prev is nil now.
        if (co->next) {
            co->next->prev = co->prev;
        } else {
            queue->tail = co->prev;
        }
    }
    return co;
}

void coroutine_makecontext (coroutine_ucontext_t *ctx, IMP func, void *arg, void *stackTop)
{
    struct coroutine_ucontext_re *uctx = (struct coroutine_ucontext_re *)ctx;
    uintptr_t stackBegin = (uintptr_t)stackTop - sizeof(uintptr_t);
    uctx->GR.__fp = stackBegin;
    uctx->GR.__sp = stackBegin;
    uctx->GR.__x[0] = (uintptr_t)arg;
    uctx->GR.__pc = (uintptr_t)func;
}

co_launch操作做了什么

co_launch操作用于建立一个协程,并将任务丢入其中执行。

co_launch({
    NSLog(@"1");
});

源码如下:

/**
 Create a coroutine, then resume it asynchronous on current queue.

 @param block the code execute in the coroutine
 @return the coroutine instance
 */
NS_INLINE COCoroutine * _Nonnull  co_launch(void(^ _Nonnull block)(void)) {
    COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
    return [co resume];
}

- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)queue stackSize:(NSUInteger)stackSize {
    self = [super init];
    if (self) {
        _execBlock = [block copy];
        _dispatch = queue ? [CODispatch dispatchWithQueue:queue] : [CODispatch currentDispatch];
        
        coroutine_t  *co = coroutine_create((void (*)(void *))co_exec);
        if (stackSize > 0 && stackSize < 1024*1024) {   // Max 1M
            co->stack_size = (uint32_t)((stackSize % 16384 > 0) ? ((stackSize/16384 + 1) * 16384) : stackSize);        // Align with 16kb
        }
        _co = co;
        coroutine_setuserdata(co, (__bridge_retained void *)self, co_obj_dispose);
    }
    return self;
}

- (COCoroutine *)resume {
    COCoroutine *currentCo = [COCoroutine currentCoroutine];
    BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
    [self.dispatch dispatch_async_block:^{
        if (self.isResume) {
            return;
        }
        if (isSubroutine) {
            self.parent = currentCo;
            [currentCo addChild:self];
        }
        self.isResume = YES;
        coroutine_resume(self.co);
    }];
    return self;
}

参考资料