前置步骤和录屏是一样的,见我的上一篇文章
https://www.cnblogs.com/billin/p/17219558.html
bool obs_output_actual_start(obs_output_t *output)在上文的这个函数中,如果是启用推流直播,函数指针会转到这里
//碧麟精简批注版 //rtmp推流开始 static bool rtmp_stream_start(void *data) { struct rtmp_stream *stream = data; os_atomic_set_bool(&stream->connecting, true); //开启推流线程 return pthread_create(&stream->connect_thread, NULL, connect_thread, stream) == 0; }
推流使用的是obs-outputs插件
上面函数就是新开一个线程,来执行
这个函数static void *connect_thread(void *data),线程传入参数是stream
先看一下rtmp_stream结构
//rtmp stream结构 struct rtmp_stream { //obs_output obs_output_t *output; //packet信息 struct circlebuf packets; bool sent_headers; bool got_first_video; int64_t start_dts_offset; volatile bool connecting; //连接线程地址 pthread_t connect_thread; //发送线程地址 pthread_t send_thread; os_sem_t *send_sem; //推流地址,key //比如b站,推流地址是rtmp://live-push.bilivideo.com/live-bvc/ struct dstr path, key; //用户名,密码 struct dstr username, password; //编码器名字: //我用的是FMLE/3.0 (compatible; FMSc/1.0) struct dstr encoder_name; struct dstr bind_ip; int64_t last_dts_usec; uint64_t total_bytes_sent; int dropped_frames; pthread_mutex_t dbr_mutex; struct circlebuf dbr_frames; size_t dbr_data_size; long audio_bitrate; long dbr_est_bitrate; long dbr_orig_bitrate; long dbr_prev_bitrate; long dbr_cur_bitrate; long dbr_inc_bitrate; bool dbr_enabled; // RTMP结构对象 RTMP rtmp; pthread_t socket_thread; uint8_t *write_buf; size_t write_buf_len; size_t write_buf_size; pthread_mutex_t write_buf_mutex; };
rtmp_stream结构里保存了推流的所有关键信息,包括推流地址,key,流的编码器等参数
还保存了几个关键的指针,用于多线程中调度。有连接线程connect_thread,也有发送线程pthread_t send_thread;
//rtmp连接线程 static void *connect_thread(void *data) { struct rtmp_stream *stream = data; int ret; //设置线程名 os_set_thread_name("rtmp-stream: connect_thread"); //初始化 if (!silently_reconnecting(stream)) { if (!init_connect(stream)) { obs_output_signal_stop(stream->output, OBS_OUTPUT_BAD_PATH); os_atomic_set_bool(&stream->silent_reconnect, false); return NULL; } } else { struct encoder_packet packet; peek_next_packet(stream, &packet); stream->start_dts_offset = get_ms_time(&packet, packet.dts); } //连接 ret = try_connect(stream); if (ret != OBS_OUTPUT_SUCCESS) { obs_output_signal_stop(stream->output, ret); info("Connection to %s failed: %d", stream->path.array, ret); } if (!stopping(stream)) pthread_detach(stream->connect_thread); os_atomic_set_bool(&stream->silent_reconnect, false); os_atomic_set_bool(&stream->connecting, false); return NULL; }
上面的方法是在单独的线程中执行的,主要是做了下面几件事:
1 设置线程名为“rtmp-stream :connect_thread”
2 初始化 init_connect
3 通过调用try_connect(stream)执行实际连接逻辑
//rtmp connect static int try_connect(struct rtmp_stream *stream) { info("Connecting to RTMP URL %s...", stream->path.array); //rtmp初始化 RTMP_Init(&stream->rtmp); //设置URL if (!RTMP_SetupURL(&stream->rtmp, stream->path.array)) return OBS_OUTPUT_BAD_PATH; RTMP_EnableWrite(&stream->rtmp); //设置流编码格式 dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)"); //设置用户名密码 set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username); set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password); set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name); stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl; stream->rtmp.Link.customConnectEncode = add_connect_data; if (dstr_is_empty(&stream->bind_ip) || dstr_cmp(&stream->bind_ip, "default") == 0) { memset(&stream->rtmp.m_bindIP, 0, sizeof(stream->rtmp.m_bindIP)); } else { bool success = netif_str_to_addr(&stream->rtmp.m_bindIP.addr, &stream->rtmp.m_bindIP.addrLen, stream->bind_ip.array); if (success) { int len = stream->rtmp.m_bindIP.addrLen; bool ipv6 = len == sizeof(struct sockaddr_in6); info("Binding to IPv%d", ipv6 ? 6 : 4); } } RTMP_AddStream(&stream->rtmp, stream->key.array); stream->rtmp.m_outChunkSize = 4096; stream->rtmp.m_bSendChunkSizeInfo = true; stream->rtmp.m_bUseNagle = true; //连接 if (!RTMP_Connect(&stream->rtmp, NULL)) { set_output_error(stream); return OBS_OUTPUT_CONNECT_FAILED; } if (!RTMP_ConnectStream(&stream->rtmp, 0)) return OBS_OUTPUT_INVALID_STREAM; info("Connection to %s successful", stream->path.array); //到这里说明连接成功,开始初始化send逻辑,准备推流 return init_send(stream); }
当连接成功,开始调用init_send函数,初始化send,准备推流
需要注意,send也是在单独的线
程处理的,因为传视频比较大,如果不单开线程,肯定会造成阻塞。
//初始化rtmp send逻辑 static int init_send(struct rtmp_stream *stream) { int ret; obs_output_t *context = stream->output; //创建send线程,执行send_thread方法,参数是stream ret = pthread_create(&stream->send_thread, NULL, send_thread, stream); if (stream->new_socket_loop) { int one = 1; #ifdef _WIN32 if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) { stream->rtmp.last_error_code = WSAGetLastError(); #else if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) { stream->rtmp.last_error_code = errno; #endif warn("Failed to set non-blocking socket"); return OBS_OUTPUT_ERROR; } os_event_reset(stream->send_thread_signaled_exit); info("New socket loop enabled by user"); if (stream->low_latency_mode) info("Low latency mode enabled by user"); if (stream->write_buf) bfree(stream->write_buf); int total_bitrate = 0; obs_encoder_t *vencoder = obs_output_get_video_encoder(context); if (vencoder) { obs_data_t *params = obs_encoder_get_settings(vencoder); if (params) { int bitrate = obs_data_get_int(params, "bitrate"); if (!bitrate) { warn("Video encoder didn't return a " "valid bitrate, new network " "code may function poorly. " "Low latency mode disabled."); stream->low_latency_mode = false; bitrate = 10000; } total_bitrate += bitrate; obs_data_release(params); } } obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 0); if (aencoder) { obs_data_t *params = obs_encoder_get_settings(aencoder); if (params) { int bitrate = obs_data_get_int(params, "bitrate"); if (!bitrate) bitrate = 160; total_bitrate += bitrate; obs_data_release(params); } } // to bytes/sec int ideal_buffer_size = total_bitrate * 128; if (ideal_buffer_size < 131072) ideal_buffer_size = 131072; stream->write_buf_size = ideal_buffer_size; stream->write_buf = bmalloc(ideal_buffer_size); #ifdef _WIN32 ret = pthread_create(&stream->socket_thread, NULL, socket_thread_windows, stream); #else warn("New socket loop not supported on this platform"); return OBS_OUTPUT_ERROR; #endif if (ret != 0) { RTMP_Close(&stream->rtmp); warn("Failed to create socket thread"); return OBS_OUTPUT_ERROR; } stream->socket_thread_active = true; stream->rtmp.m_bCustomSend = true; stream->rtmp.m_customSendFunc = socket_queue_data; stream->rtmp.m_customSendParam = stream; } os_atomic_set_bool(&stream->active, true); if (!send_meta_data(stream)) { warn("Disconnected while attempting to send metadata"); set_output_error(stream); return OBS_OUTPUT_DISCONNECTED; } obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 1); if (aencoder && !send_additional_meta_data(stream)) { warn("Disconnected while attempting to send additional " "metadata"); return OBS_OUTPUT_DISCONNECTED; } if (obs_output_get_audio_encoder(context, 2) != NULL) { warn("Additional audio streams not supported"); return OBS_OUTPUT_DISCONNECTED; } if (!silently_reconnecting(stream)) obs_output_begin_data_capture(stream->output, 0); return OBS_OUTPUT_SUCCESS; }
核心推流线程,在单独的线程里完成推流逻辑
//推流核心线程 static void *send_thread(void *data) { struct rtmp_stream *stream = data; //设置线程名 os_set_thread_name("rtmp-stream: send_thread"); //设定buffersize #if defined(_WIN32) // Despite MSDN claiming otherwise, send buffer auto tuning on // Windows 7 doesn't seem to work very well. if (get_win_ver_int() == 0x601) { DWORD cur_sendbuf_size; DWORD desired_sendbuf_size = 524288; socklen_t int_size = sizeof(int); if (!getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&cur_sendbuf_size, &int_size) && cur_sendbuf_size < desired_sendbuf_size) { setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&desired_sendbuf_size, sizeof(desired_sendbuf_size)); } } log_sndbuf_size(stream); #endif //推流主循环 while (os_sem_wait(stream->send_sem) == 0) { struct encoder_packet packet; struct dbr_frame dbr_frame; if (stopping(stream) && stream->stop_ts == 0) { break; } if (!get_next_packet(stream, &packet)) continue; if (stopping(stream)) { if (can_shutdown_stream(stream, &packet)) { obs_encoder_packet_release(&packet); break; } } if (!stream->sent_headers) { if (!send_headers(stream)) { os_atomic_set_bool(&stream->disconnected, true); break; } } /* silent reconnect signal received from server, reconnect on * next keyframe */ if (silently_reconnecting(stream) && packet.type == OBS_ENCODER_VIDEO && packet.keyframe) { reinsert_packet_at_front(stream, &packet); break; } if (stream->dbr_enabled) { dbr_frame.send_beg = os_gettime_ns(); dbr_frame.size = packet.size; } if (send_packet(stream, &packet, false, packet.track_idx) < 0) { os_atomic_set_bool(&stream->disconnected, true); break; } if (stream->dbr_enabled) { dbr_frame.send_end = os_gettime_ns(); pthread_mutex_lock(&stream->dbr_mutex); dbr_add_frame(stream, &dbr_frame); pthread_mutex_unlock(&stream->dbr_mutex); } } bool encode_error = os_atomic_load_bool(&stream->encode_error); if (disconnected(stream)) { info("Disconnected from %s", stream->path.array); } else if (encode_error) { info("Encoder error, disconnecting"); } else if (silently_reconnecting(stream)) { info("Silent reconnect signal received from server"); } else { info("User stopped the stream"); } #if defined(_WIN32) log_sndbuf_size(stream); #endif if (stream->new_socket_loop) { os_event_signal(stream->send_thread_signaled_exit); os_event_signal(stream->buffer_has_data_event); pthread_join(stream->socket_thread, NULL); stream->socket_thread_active = false; stream->rtmp.m_bCustomSend = false; } set_output_error(stream); if (silently_reconnecting(stream)) { /* manually close the socket to prevent librtmp from sending * unpublish / deletestream messages when we call RTMP_Close, * since we want to re-use this stream when we reconnect */ RTMPSockBuf_Close(&stream->rtmp.m_sb); stream->rtmp.m_sb.sb_socket = -1; } RTMP_Close(&stream->rtmp); /* reset bitrate on stop */ if (stream->dbr_enabled) { if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) { stream->dbr_cur_bitrate = stream->dbr_orig_bitrate; dbr_set_bitrate(stream); } } if (!stopping(stream)) { pthread_detach(stream->send_thread); if (!silently_reconnecting(stream)) obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED); } else if (encode_error) { obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR); } else { obs_output_end_data_capture(stream->output); } if (!silently_reconnecting(stream)) { free_packets(stream); os_event_reset(stream->stop_event); os_atomic_set_bool(&stream->active, false); } stream->sent_headers = false; /* reset bitrate on stop */ if (stream->dbr_enabled) { if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) { stream->dbr_cur_bitrate = stream->dbr_orig_bitrate; dbr_set_bitrate(stream); } } if (silently_reconnecting(stream)) { rtmp_stream_start(stream); } return NULL; }