obs推流核心流程分析

 

前置步骤和录屏是一样的,见我的上一篇文章

https://www.cnblogs.com/billin/p/17219558.html

bool obs_output_actual_start(obs_output_t *output)在上文的这个函数中,如果是启用推流直播,函数指针会转到这里

//碧麟精简批注版 //rtmp推流开始 static bool rtmp_stream_start(void *data) { 	struct rtmp_stream *stream = data;  	os_atomic_set_bool(&stream->connecting, true); 	     //开启推流线程     return pthread_create(&stream->connect_thread, NULL, connect_thread, 			      stream) == 0; }

推流使用的是obs-outputs插件

上面函数就是新开一个线程,来执行

这个函数static void *connect_thread(void *data),线程传入参数是stream 

先看一下rtmp_stream结构

//rtmp stream结构 struct rtmp_stream { 	//obs_output     obs_output_t *output;          //packet信息 	struct circlebuf packets; 	bool sent_headers;  	bool got_first_video; 	int64_t start_dts_offset;  	volatile bool connecting;     //连接线程地址 	pthread_t connect_thread;     //发送线程地址 	pthread_t send_thread;  	os_sem_t *send_sem;          //推流地址,key     //比如b站,推流地址是rtmp://live-push.bilivideo.com/live-bvc/ 	struct dstr path, key;     //用户名,密码 	struct dstr username, password;     //编码器名字:     //我用的是FMLE/3.0 (compatible; FMSc/1.0) 	struct dstr encoder_name; 	struct dstr bind_ip;   	int64_t last_dts_usec;  	uint64_t total_bytes_sent; 	int dropped_frames;  	pthread_mutex_t dbr_mutex; 	struct circlebuf dbr_frames; 	size_t dbr_data_size;  	long audio_bitrate; 	long dbr_est_bitrate; 	long dbr_orig_bitrate; 	long dbr_prev_bitrate; 	long dbr_cur_bitrate; 	long dbr_inc_bitrate; 	bool dbr_enabled;          // RTMP结构对象 	RTMP rtmp;      	pthread_t socket_thread; 	uint8_t *write_buf; 	size_t write_buf_len; 	size_t write_buf_size; 	pthread_mutex_t write_buf_mutex; 	 };

rtmp_stream结构里保存了推流的所有关键信息,包括推流地址,key,流的编码器等参数

还保存了几个关键的指针,用于多线程中调度。有连接线程connect_thread,也有发送线程pthread_t send_thread;

 

//rtmp连接线程 static void *connect_thread(void *data) { 	struct rtmp_stream *stream = data; 	int ret;     //设置线程名 	os_set_thread_name("rtmp-stream: connect_thread");          //初始化 	if (!silently_reconnecting(stream)) { 		if (!init_connect(stream)) { 			obs_output_signal_stop(stream->output, 					       OBS_OUTPUT_BAD_PATH); 			os_atomic_set_bool(&stream->silent_reconnect, false); 			return NULL; 		} 	} else { 		struct encoder_packet packet; 		peek_next_packet(stream, &packet); 		stream->start_dts_offset = get_ms_time(&packet, packet.dts); 	}          //连接 	ret = try_connect(stream);  	if (ret != OBS_OUTPUT_SUCCESS) { 		obs_output_signal_stop(stream->output, ret); 		info("Connection to %s failed: %d", stream->path.array, ret); 	}  	if (!stopping(stream)) 		pthread_detach(stream->connect_thread);  	os_atomic_set_bool(&stream->silent_reconnect, false); 	os_atomic_set_bool(&stream->connecting, false); 	return NULL; }

上面的方法是在单独的线程中执行的,主要是做了下面几件事:

1 设置线程名为“rtmp-stream :connect_thread”

2 初始化 init_connect

3 通过调用try_connect(stream)执行实际连接逻辑

//rtmp connect static int try_connect(struct rtmp_stream *stream) { 	info("Connecting to RTMP URL %s...", stream->path.array);      //rtmp初始化 	RTMP_Init(&stream->rtmp);          //设置URL 	if (!RTMP_SetupURL(&stream->rtmp, stream->path.array)) 		return OBS_OUTPUT_BAD_PATH;  	RTMP_EnableWrite(&stream->rtmp);          //设置流编码格式 	dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)");          //设置用户名密码 	set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username); 	set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password); 	set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name); 	stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl; 	stream->rtmp.Link.customConnectEncode = add_connect_data;  	if (dstr_is_empty(&stream->bind_ip) || 	    dstr_cmp(&stream->bind_ip, "default") == 0) { 		memset(&stream->rtmp.m_bindIP, 0, 		       sizeof(stream->rtmp.m_bindIP)); 	} else { 		bool success = netif_str_to_addr(&stream->rtmp.m_bindIP.addr, 						 &stream->rtmp.m_bindIP.addrLen, 						 stream->bind_ip.array); 		if (success) { 			int len = stream->rtmp.m_bindIP.addrLen; 			bool ipv6 = len == sizeof(struct sockaddr_in6); 			info("Binding to IPv%d", ipv6 ? 6 : 4); 		} 	}  	RTMP_AddStream(&stream->rtmp, stream->key.array);  	stream->rtmp.m_outChunkSize = 4096; 	stream->rtmp.m_bSendChunkSizeInfo = true; 	stream->rtmp.m_bUseNagle = true;      //连接 	if (!RTMP_Connect(&stream->rtmp, NULL)) { 		set_output_error(stream); 		return OBS_OUTPUT_CONNECT_FAILED; 	}  	if (!RTMP_ConnectStream(&stream->rtmp, 0)) 		return OBS_OUTPUT_INVALID_STREAM;  	info("Connection to %s successful", stream->path.array);          //到这里说明连接成功,开始初始化send逻辑,准备推流 	return init_send(stream); }

当连接成功,开始调用init_send函数,初始化send,准备推流

需要注意,send也是在单独的线

程处理的,因为传视频比较大,如果不单开线程,肯定会造成阻塞。

//初始化rtmp send逻辑 static int init_send(struct rtmp_stream *stream) { 	int ret; 	obs_output_t *context = stream->output;      //创建send线程,执行send_thread方法,参数是stream 	ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);  	if (stream->new_socket_loop) { 		int one = 1; #ifdef _WIN32 		if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) { 			stream->rtmp.last_error_code = WSAGetLastError(); #else 		if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) { 			stream->rtmp.last_error_code = errno; #endif 			warn("Failed to set non-blocking socket"); 			return OBS_OUTPUT_ERROR; 		}  		os_event_reset(stream->send_thread_signaled_exit);  		info("New socket loop enabled by user"); 		if (stream->low_latency_mode) 			info("Low latency mode enabled by user");  		if (stream->write_buf) 			bfree(stream->write_buf);  		int total_bitrate = 0;  		obs_encoder_t *vencoder = obs_output_get_video_encoder(context); 		if (vencoder) { 			obs_data_t *params = obs_encoder_get_settings(vencoder); 			if (params) { 				int bitrate = 					obs_data_get_int(params, "bitrate"); 				if (!bitrate) { 					warn("Video encoder didn't return a " 					     "valid bitrate, new network " 					     "code may function poorly. " 					     "Low latency mode disabled."); 					stream->low_latency_mode = false; 					bitrate = 10000; 				} 				total_bitrate += bitrate; 				obs_data_release(params); 			} 		}  		obs_encoder_t *aencoder = 			obs_output_get_audio_encoder(context, 0); 		if (aencoder) { 			obs_data_t *params = obs_encoder_get_settings(aencoder); 			if (params) { 				int bitrate = 					obs_data_get_int(params, "bitrate"); 				if (!bitrate) 					bitrate = 160; 				total_bitrate += bitrate; 				obs_data_release(params); 			} 		}  		// to bytes/sec 		int ideal_buffer_size = total_bitrate * 128;  		if (ideal_buffer_size < 131072) 			ideal_buffer_size = 131072;  		stream->write_buf_size = ideal_buffer_size; 		stream->write_buf = bmalloc(ideal_buffer_size);  #ifdef _WIN32 		ret = pthread_create(&stream->socket_thread, NULL, 				     socket_thread_windows, stream); #else 		warn("New socket loop not supported on this platform"); 		return OBS_OUTPUT_ERROR; #endif  		if (ret != 0) { 			RTMP_Close(&stream->rtmp); 			warn("Failed to create socket thread"); 			return OBS_OUTPUT_ERROR; 		}  		stream->socket_thread_active = true; 		stream->rtmp.m_bCustomSend = true; 		stream->rtmp.m_customSendFunc = socket_queue_data; 		stream->rtmp.m_customSendParam = stream; 	}  	os_atomic_set_bool(&stream->active, true);  	if (!send_meta_data(stream)) { 		warn("Disconnected while attempting to send metadata"); 		set_output_error(stream); 		return OBS_OUTPUT_DISCONNECTED; 	}  	obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 1); 	if (aencoder && !send_additional_meta_data(stream)) { 		warn("Disconnected while attempting to send additional " 		     "metadata"); 		return OBS_OUTPUT_DISCONNECTED; 	}  	if (obs_output_get_audio_encoder(context, 2) != NULL) { 		warn("Additional audio streams not supported"); 		return OBS_OUTPUT_DISCONNECTED; 	}  	if (!silently_reconnecting(stream)) 		obs_output_begin_data_capture(stream->output, 0);  	return OBS_OUTPUT_SUCCESS; }

 

核心推流线程,在单独的线程里完成推流逻辑

 

//推流核心线程 static void *send_thread(void *data) { 	struct rtmp_stream *stream = data;          //设置线程名 	os_set_thread_name("rtmp-stream: send_thread");           //设定buffersize #if defined(_WIN32) 	// Despite MSDN claiming otherwise, send buffer auto tuning on 	// Windows 7 doesn't seem to work very well. 	if (get_win_ver_int() == 0x601) { 		DWORD cur_sendbuf_size; 		DWORD desired_sendbuf_size = 524288; 		socklen_t int_size = sizeof(int);  		if (!getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, 				SO_SNDBUF, (char *)&cur_sendbuf_size, 				&int_size) && 		    cur_sendbuf_size < desired_sendbuf_size) {  			setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, 				   SO_SNDBUF, (char *)&desired_sendbuf_size, 				   sizeof(desired_sendbuf_size)); 		} 	}  	log_sndbuf_size(stream); #endif          //推流主循环 	while (os_sem_wait(stream->send_sem) == 0) { 		struct encoder_packet packet; 		struct dbr_frame dbr_frame;  		if (stopping(stream) && stream->stop_ts == 0) { 			break; 		}  		if (!get_next_packet(stream, &packet)) 			continue;  		if (stopping(stream)) { 			if (can_shutdown_stream(stream, &packet)) { 				obs_encoder_packet_release(&packet); 				break; 			} 		}  		if (!stream->sent_headers) { 			if (!send_headers(stream)) { 				os_atomic_set_bool(&stream->disconnected, true); 				break; 			} 		}  		/* silent reconnect signal received from server, reconnect on 		 * next keyframe */ 		if (silently_reconnecting(stream) && 		    packet.type == OBS_ENCODER_VIDEO && packet.keyframe) { 			reinsert_packet_at_front(stream, &packet); 			break; 		}  		if (stream->dbr_enabled) { 			dbr_frame.send_beg = os_gettime_ns(); 			dbr_frame.size = packet.size; 		}  		if (send_packet(stream, &packet, false, packet.track_idx) < 0) { 			os_atomic_set_bool(&stream->disconnected, true); 			break; 		}  		if (stream->dbr_enabled) { 			dbr_frame.send_end = os_gettime_ns();  			pthread_mutex_lock(&stream->dbr_mutex); 			dbr_add_frame(stream, &dbr_frame); 			pthread_mutex_unlock(&stream->dbr_mutex); 		} 	}  	bool encode_error = os_atomic_load_bool(&stream->encode_error);  	if (disconnected(stream)) { 		info("Disconnected from %s", stream->path.array); 	} else if (encode_error) { 		info("Encoder error, disconnecting"); 	} else if (silently_reconnecting(stream)) { 		info("Silent reconnect signal received from server"); 	} else { 		info("User stopped the stream"); 	}  #if defined(_WIN32) 	log_sndbuf_size(stream); #endif  	if (stream->new_socket_loop) { 		os_event_signal(stream->send_thread_signaled_exit); 		os_event_signal(stream->buffer_has_data_event); 		pthread_join(stream->socket_thread, NULL); 		stream->socket_thread_active = false; 		stream->rtmp.m_bCustomSend = false; 	}  	set_output_error(stream);  	if (silently_reconnecting(stream)) { 		/* manually close the socket to prevent librtmp from sending 		 * unpublish / deletestream messages when we call RTMP_Close, 		 * since we want to re-use this stream when we reconnect */ 		RTMPSockBuf_Close(&stream->rtmp.m_sb); 		stream->rtmp.m_sb.sb_socket = -1; 	}  	RTMP_Close(&stream->rtmp);  	/* reset bitrate on stop */ 	if (stream->dbr_enabled) { 		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) { 			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate; 			dbr_set_bitrate(stream); 		} 	}  	if (!stopping(stream)) { 		pthread_detach(stream->send_thread); 		if (!silently_reconnecting(stream)) 			obs_output_signal_stop(stream->output, 					       OBS_OUTPUT_DISCONNECTED); 	} else if (encode_error) { 		obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR); 	} else { 		obs_output_end_data_capture(stream->output); 	}  	if (!silently_reconnecting(stream)) { 		free_packets(stream); 		os_event_reset(stream->stop_event); 		os_atomic_set_bool(&stream->active, false); 	}  	stream->sent_headers = false;  	/* reset bitrate on stop */ 	if (stream->dbr_enabled) { 		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) { 			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate; 			dbr_set_bitrate(stream); 		} 	}  	if (silently_reconnecting(stream)) { 		rtmp_stream_start(stream); 	}  	return NULL; }

 

发表评论

评论已关闭。

相关文章