Bootstrap

SRS流媒体服务器源码分析--RTMP消息play

推荐视频

SRS源码Play流程图

进入play流程

由于之前的博客中已经梳理了系统启动监听以及推流客户端连接流程、如何创建协程、如何进入stream_service_cycle。所以本章内容直接从SrsRtmpConn::stream_service_cycle()方法中进入开始梳理。

switch (type) {
        case SrsRtmpConnPlay: {
            srs_verbose("start to play stream %s.", req->stream.c_str());
            
            // response connection start play
            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to play stream failed. ret=%d", ret);
                return ret;
            }
			// 回调接口通知vhost开始play
            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
                srs_error("http hook on_play failed. ret=%d", ret);
                return ret;
            }
            
            srs_info("start to play stream %s success", req->stream.c_str());
            ret = playing(source);
			// 回调接口通知vhost play停止
            http_hooks_on_stop();
            
            return ret;
        }
}
 

在接受流程处理当中,客户类型是:SrsRtmpConnFMLEPublish “fmle publish”。在转发流程当中,客户类型是:SrsRtmpConnPlay。

在http_hooks_on_play()方法中回调on_play()方法通知vhost,xxx用户已经开始play。

在http_hooks_on_stop()方法中回调on_stop()方法通知vhost,xxx用户已经停止play。

最重要的是:

ret = playing(source);

进入该函数。

int SrsRtmpConn::playing(SrsSource* source)
{
    int ret = ERROR_SUCCESS;
    
    // create consumer of souce.
    SrsConsumer* consumer = NULL;
    if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
        srs_error("create consumer failed. ret=%d", ret);
        return ret;
    }
    SrsAutoFree(SrsConsumer, consumer);
    srs_verbose("consumer created success.");
 
    // use isolate thread to recv, 
    // @see: refine the recv message for performance #217 · Issue #217 · ossrs/srs
    SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
    
    // start isolate recv thread.
    if ((ret = trd.start()) != ERROR_SUCCESS) {
        srs_error("start isolate recv thread failed. ret=%d", ret);
        return ret;
    }
    
    // delivery messages for clients playing stream.
    wakable = consumer;
    ret = do_playing(source, consumer, &trd);
    wakable = NULL;
    
    // stop isolate recv thread
    trd.stop();
    
    // warn for the message is dropped.
    if (!trd.empty()) {
        srs_warn("drop the received %d messages", trd.size());
    }
    
    return ret;
}

在函数中

1.1、根据客户端,创建消费者对象

create_consumer(this, consumer)

1.2、为该消费者开启一个独立协程

trd.start() //此处一直不太明白,在play流程中创建一个协程用来做什么?

1.3、进入play主流程

do_playing(source, consumer, &trd);

进入主play循环

进入该函数,do_playing()函数内容非常多,也是非常重要,所以将该函数内容全部列出。

int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd)
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(consumer != NULL);
    
    if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) {
        srs_error("check play_refer failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check play_refer success.");
    
    // initialize other components
    SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
    SrsAutoFree(SrsPithyPrint, pprint);
 
    SrsMessageArray msgs(SRS_PERF_MW_MSGS);
    bool user_specified_duration_to_stop = (req->duration > 0);
    int64_t starttime = -1;
    
    // setup the realtime.
    realtime = _srs_config->get_realtime_enabled(req->vhost);
    // setup the mw config.
    // when mw_sleep changed, resize the socket send buffer.
    mw_enabled = true;
    change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
    // initialize the send_min_interval
    send_min_interval = _srs_config->get_send_min_interval(req->vhost);
    
    // set the sock options.
    set_sock_options();
    
    srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
        send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay);
    // 连接是否已经断开
    while (!disposed) {
        // collect elapse for pithy print.
        pprint->elapse();
        //连接是否已经过期
        // when source is set to expired, disconnect it.
        if (expired) {
            ret = ERROR_USER_DISCONNECT;
            srs_error("connection expired. ret=%d", ret);
            return ret;
        }
	// 使用单独协程去接受,能提高大约33%的性能
        // to use isolate thread to recv, can improve about 33% performance.
        // @see: RTMP protocol stack, recv never send, send never recv #196 · Issue #196 · ossrs/srs
        // @see: refine the recv message for performance #217 · Issue #217 · ossrs/srs
        while (!trd->empty()) {
	    //获取Message,
            SrsCommonMessage* msg = trd->pump();
			
            srs_verbose("pump client message to process.");
            // 执行播放控制信息, 比如开始播放,暂停播放等等
            if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
                if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
                    srs_error("process play control message failed. ret=%d", ret);
                }
                return ret;
            }
        }
        
        // quit when recv thread error.
        if ((ret = trd->error_code()) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
                srs_error("recv thread failed. ret=%d", ret);
            }
            return ret;
        }
        
#ifdef SRS_PERF_QUEUE_COND_WAIT
        // for send wait time debug
        srs_verbose("send thread now=%"PRId64"us, wait %dms", srs_update_system_time_ms(), mw_sleep);
        
        // wait for message to incoming.
        // @see refine the send msg performance  · Issue #251 · ossrs/srs
        // @see 降低服务器RTMP的最低延迟 · Issue #257 · ossrs/srs
        if (realtime) {
            // for realtime, min required msgs is 0, send when got one+ msgs.
            consumer->wait(0, mw_sleep);
        } else {
            // for no-realtime, got some msgs then send.
            consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
        }
        
        // for send wait time debug
        srs_verbose("send thread now=%"PRId64"us wakeup", srs_update_system_time_ms());
#endif
        //从消费者列表当中获取接受到的rtmp信息
        // get messages from consumer.
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        // @remark when enable send_min_interval, only fetch one message a time.
        int count = (send_min_interval > 0)? 1 : 0;
        if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
            srs_error("get messages from consumer failed. ret=%d", ret);
            return ret;
        }
 
        // reportable
        if (pprint->can_print()) {
            kbps->sample();
            srs_trace("caojj_player-> "SRS_CONSTS_LOG_PLAY
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
                pprint->age(), count,
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
                mw_sleep
            );
        }
        
        // we use wait timeout to get messages,
        // for min latency event no message incoming,
        // so the count maybe zero.
        if (count > 0) {
            srs_verbose("mw wait %dms and got %d msgs %d(%"PRId64"-%"PRId64")ms", 
                mw_sleep, count, 
                (count > 0? msgs.msgs[count - 1]->timestamp - msgs.msgs[0]->timestamp : 0),
                (count > 0? msgs.msgs[0]->timestamp : 0), 
                (count > 0? msgs.msgs[count - 1]->timestamp : 0));
        }
        
        if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
            srs_info("mw sleep %dms for no msg", mw_sleep);
            st_usleep(mw_sleep * 1000);
#else
            srs_verbose("mw wait %dms and got nothing.", mw_sleep);
#endif
            // ignore when nothing got.
            continue;
        }
        srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);
        
        // only when user specifies the duration, 
        // we start to collect the durations for each message.
        if (user_specified_duration_to_stop) {
            for (int i = 0; i < count; i++) {
                SrsSharedPtrMessage* msg = msgs.msgs[i];
                
                // foreach msg, collect the duration.
                // @remark: never use msg when sent it, for the protocol sdk will free it.
                if (starttime < 0 || starttime > msg->timestamp) {
                    starttime = msg->timestamp;
                }
                duration += msg->timestamp - starttime;
                starttime = msg->timestamp;
            }
        }
        //发送message,这里play总出口。
        // sendout messages, all messages are freed by send_and_free_messages().
        // no need to assert msg, for the rtmp will assert it.
        printf("send message output gate\n");
        if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("send messages to client failed. ret=%d", ret);
            }
            return ret;
        }
        //如果指定的持续时间超过它,停止播放
        // if duration specified, and exceed it, stop play live.
        // @see: rtmpdump指定duration后SRS没有断开连接 · Issue #45 · ossrs/srs
        if (user_specified_duration_to_stop) {
            if (duration >= (int64_t)req->duration) {
                ret = ERROR_RTMP_DURATION_EXCEED;
                srs_trace("stop live for duration exceed. ret=%d", ret);
                return ret;
            }
        }
        
        // apply the minimal interval for delivery stream in ms.
        if (send_min_interval > 0) {
            st_usleep((int64_t)(send_min_interval * 1000));
        }
    }
    
    return ret;
}

该函数有非常重要的3点:

通知消费者准备play

// 执行播放控制信息, 比如开始播放,暂停播放等等
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
    if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
        srs_error("process play control message failed. ret=%d", ret);
    }
    return ret;
}

从消费者列表中取出Rtmp信息(SrsMessageQueue)

if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
            srs_error("get messages from consumer failed. ret=%d", ret);
            return ret;
}

进入play入口

/发送message,这里play总出口。
// sendout messages, all messages are freed by send_and_free_messages().
// no need to assert msg, for the rtmp will assert it.
//printf("send message output gate\n");
if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) {
      if (!srs_is_client_gracefully_close(ret)) {
           srs_error("send messages to client failed. ret=%d", ret);
      }
      return ret;
}

进入SRS发送接口(play)

在int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)函数中,

进入int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs),该函数有一个#ifdef SRS_PERF_COMPLEX_SEND宏定义,一般rtmp协议都是要混合音视频数据,在做转发。在往后面看,

 // when c0c3 cache dry,
 // sendout all messages and reset the cache, then send again.
 //printf("do_send_messages do_iovs_send: iov_index = %d", iov_index);
 if ((ret = do_iovs_send(out_iovs, iov_index)) != ERROR_SUCCESS) {
          return ret;
 }

最后进入

int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)
{
    int ret = ERROR_SUCCESS;
    
    // the limits of writev iovs.
    // for srs-librtmp, @see srs-librtmp supports windows #213 · Issue #213 · ossrs/srs
#ifndef _WIN32
    // for linux, generally it's 1024.
    static int limits = (int)sysconf(_SC_IOV_MAX);
#else
    static int limits = 1024;
#endif
    
    // send in a time.
    if (size < limits) {
		
        if ((ret = skt->writev(iovs, size, pnwrite)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("send with writev failed. ret=%d", ret);
            }
            return ret;
        }
        return ret;
    }
    
    // send in multiple times.
    int cur_iov = 0;
    while (cur_iov < size) {
        int cur_count = srs_min(limits, size - cur_iov);
        if ((ret = skt->writev(iovs + cur_iov, cur_count, pnwrite)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("send with writev failed. ret=%d", ret);
            }
            return ret;
        }
        cur_iov += cur_count;
    }
    
    return ret;
}

在该函数中,最重要的一点是send message 总出口writen()函数。它负责将转发给直播用户的流转发出去。

最后:play总结

(1)通知client开始play

(2)从消费者列表中取出Rtmp数据

(3)从总出口writev()函数中转发出去

整理了一些个人觉得比较好的学习书籍、大厂面试题、和热门技术教学视频资料共享在里面(包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等等.),有需要的可以自行添加哦!~

以上有不足的地方欢迎指出讨论,同时可以持续关注我,每天分享干货内容!