本文共 10780 字,大约阅读时间需要 35 分钟。
使用SIP做过VOIP通话的同学,肯定被RTP丢包弄的焦头烂额,必定尝试过不少的办法,比方: 1、丢包重传(NACK) 2、前向纠错(FEC) 3、丢帧处理 但效果往往不尽如人意,那有没有一劳永逸的方法?确实,网络发展到现在,家庭带宽随随便便都是百兆的今天,为什么还有让人困扰不已的丢包问题?为何不换成TCP传输,假如延时容许的情况下,UDT就是为了解决这个问题的。 UDT建立于UDP之上,并引入新的拥塞控制和数据可靠性控制机制。UDT是面向连接的双向的应用层协议。它同时支持可靠的数据流传输和部分可靠的数据报传输,UDT的特点是不用进行开发,直接利用库的发送函数就可以实现可靠的数据传输。
Doubango rtp接收和发送函数修改:
-
- #include "udt_api.h"
-
- void *udt_transport_mainthread(void *param)
- {
- trtp_manager_t *self = param;
-
-
- if(!self){
- TSK_DEBUG_ERROR("udt_transport_mainthread Invalid parameter");
- return 0;
- }
- TSK_DEBUG_INFO("ii udt_transport_mainthread enter, self->is_originated:%d", self->is_originated);
-
- int rcv_buf = (int)tmedia_defaults_get_rtpbuff_size();
- int snd_buf = (int)tmedia_defaults_get_rtpbuff_size();
-
- int clientFD = -1;
- int socketfDUDT = -1;
- int clientSocket = -1;
- if (self->is_originated){
-
- int ret = init_udt(&socketfDUDT, (self->rtp.public_port), 0);
- self->serverSocket = socketfDUDT;
- TSK_DEBUG_INFO("socketfDUDT:%d listent", socketfDUDT);
-
- set_connect_buff_udt(&socketfDUDT, rcv_buf, snd_buf);
-
- listen_udt(&socketfDUDT);
- clientFD = accept_udt(&socketfDUDT);
- if (clientFD == -1){
- TSK_DEBUG_ERROR("clientFd is failed");
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
-
- goto FAILED;
- }
- TSK_DEBUG_INFO("clientFD:%d enter", clientFD);
-
- if (!tmedia_defaults_get_single_connect()){
-
- ret = init_udt(&clientSocket, (self->rtp.public_port + 3), 0);
- set_connect_buff_udt(&clientSocket, rcv_buf, snd_buf);
-
- if (0 != ret || clientSocket == -1){
- TSK_DEBUG_ERROR("clientSocket connect init is failed");
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
-
- goto FAILED;
- }
- ret = connect_udt(&clientSocket, self->rtp.remote_ip, self->rtp.remote_port);
- if (-1 == ret){
- TSK_DEBUG_ERROR("clientSocket connect is failed");
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
-
- goto FAILED;
- }
- TSK_DEBUG_INFO("clientSocket:%d enter", clientSocket);
-
- self->clientSocket = clientSocket;
- }
- }else{
-
- int ret = init_udt(&clientSocket, (self->rtp.public_port + 3), 0);
- if (0 != ret || clientSocket == -1){
- TSK_DEBUG_ERROR("ii clientSocket connect init is failed");
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
-
- goto FAILED;
- }
- set_connect_buff_udt(&clientSocket, rcv_buf, snd_buf);
- ret = connect_udt(&clientSocket, self->rtp.remote_ip, self->rtp.remote_port);
- if (-1 == ret){
- TSK_DEBUG_ERROR("clientSocket connect is failed");
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
-
- goto FAILED;
- }
- self->clientSocket = clientSocket;
- TSK_DEBUG_INFO("ii clientSocket:%d enter", clientSocket);
-
- if (!tmedia_defaults_get_single_connect()){
-
- ret = init_udt(&socketfDUDT, (self->rtp.public_port), 0);
- self->serverSocket = socketfDUDT;
- TSK_DEBUG_INFO("ii socketfDUDT:%d listent", socketfDUDT);
-
- set_connect_buff_udt(&socketfDUDT, rcv_buf, snd_buf);
- listen_udt(&socketfDUDT);
- clientFD = accept_udt(&socketfDUDT);
- if (clientFD == -1){
- TSK_DEBUG_ERROR("clientFd is failed");
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
-
- goto FAILED;
- }
- TSK_DEBUG_INFO("ii clientFD:%d enter", clientFD);
- }else{
- tsk_safeobj_lock(self);
- self->mainThreadId[0] = NULL;
- tsk_safeobj_unlock(self);
- }
- }
-
- if (tmedia_defaults_get_single_connect()){
- if (self->is_originated){
- char buffer[4096];
- while(self->is_started){
- int received = recvdata_udt(&clientFD, buffer, sizeof(buffer), 0);
-
- if (received <= -1) {
- continue;
- }
-
- _trtp_manager_recv_data(self, buffer, received, -1, NULL);
- }
- }
-
- }else{
- char buffer[4096];
- while(self->is_started){
- int received = recvdata_udt(&clientFD, buffer, sizeof(buffer), 0);
-
- if (received <= -1) {
- continue;
- }
-
- _trtp_manager_recv_data(self, buffer, received, -1, NULL);
- }
- }
-
- FAILED:
- if (-1 !=clientFD){
- close_udt(clientFD);
- }
- if (-1 !=socketfDUDT){
- close_udt(socketfDUDT);
- }
-
- TSK_DEBUG_INFO("udt_transport_mainthread end.");
- return 0;
- }
-
- int udt_transport_send(trtp_manager_t* self, char* encodedData, int encodedDataLen){
- TSK_DEBUG_INFO("senddata_udt START:%d", encodedDataLen);
-
- int ret = senddata_udt(&self->clientSocket, encodedData, encodedDataLen, 0);
- if (-1 == ret){
- TSK_DEBUG_ERROR("senddata_udt failed");
- return -1;
- }
- TSK_DEBUG_INFO("senddata_udt ret:%d", ret);
-
- return ret;
- }
-
启动:在 int trtp_manager_start(trtp_manager_t* self)中: - if(!self->transport || !self->transport->master){
-
- if (tmedia_defaults_get_use_udt_socket() != 0){
-
- self->clientSocket = -1;
- self->serverSocket = -1;
- self->is_started = tsk_true;
-
- if ((ret = tsk_thread_create(self->mainThreadId, udt_transport_mainthread, self))){
- TSK_DEBUG_FATAL("Failed to create udt main thread [%d]", ret);
- }else{
- TSK_DEBUG_INFO("udt thread create success.");
-
- if !TNET_UNDER_APPLE
- ret = tsk_thread_set_priority(self->mainThreadId[0], TSK_THREAD_PRIORITY_TIME_CRITICAL);
- endif
- tsk_safeobj_unlock(self);
-
- return 0;
- }
- }
-
-
- TSK_DEBUG_ERROR("RTP/RTCP manager not prepared");
- ret = -2;
- goto bail;
- }
-
UDT协议封装:
- int init_udt(int *usock, int port, bool rendezvous);
- int connect_udt(void* usocket, char *peerip, int port);
- int set_connect_buff_udt(void* usocket, int send_buf_size, int recv_buf_size);
- int recvdata_udt(void* usocket, char* buf, int size, int flags);
- int senddata_udt(void* usocket, char* buf, int size, int flags);
- int listen_udt(void* usocket);
- int accept_udt(void* usocket);
- int close_udt(int usock);
实现: - #include <winsock2.h>
- #include <ws2tcpip.h>
- #include <wspiapi.h>
- #include <iostream>
- #include <udt.h>
- #include <arpa/inet.h>
- #include <android/log.h>
- #define TAG "UDT_API"
-
- extern "C" int init_udt(int *usock, int port, bool rendezvous);
- extern "C" int connect_udt(void* usocket, char *peerip, int port);
- extern "C" int set_connect_buff_udt(void* usocket, int send_buf_size, int recv_buf_size);
- extern "C" int recvdata_udt(void* usocket, char* buf, int size, int flags);
- extern "C" int senddata_udt(void* usocket, char* buf, int size, int flags);
- extern "C" int listen_udt(void* usocket);
- extern "C" int accept_udt(void* usocket);
- extern "C" int close_udt(int usock);
-
- using namespace std;
-
- const int g_IP_Version = AF_INET;
- const int g_Socket_Type = SOCK_DGRAM;
- const char g_Localhost[] = "127.0.0.1";
-
- int init_udt(int *usock, int port, bool rendezvous){
- addrinfo hints;
- addrinfo* res;
- memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_flags = AI_PASSIVE;
- hints.ai_family = AF_INET;
- hints.ai_socktype = g_Socket_Type;
-
- if (usock == NULL){
- return -1;
- }
-
- char service[16];
- sprintf(service, "%d", port);
-
- if (0 != getaddrinfo(NULL, service, &hints, &res))
- {
- cout << "illegal port number or port is busy.\n" << endl;
- __android_log_print(ANDROID_LOG_WARN, TAG, "illegal port number or port is busy:%s", UDT::getlasterror().getErrorMessage() );
-
- return -1;
- }
-
- UDTSOCKET retFD = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-
-
- int snd_buf = 16000;
- int rcv_buf = 16000;
- UDT::setsockopt(retFD, 0, UDT_SNDBUF, &snd_buf, sizeof(int));
- UDT::setsockopt(retFD, 0, UDT_RCVBUF, &rcv_buf, sizeof(int));
-
-
- bool reuse = true;
- UDT::setsockopt(retFD, 0, UDT_REUSEADDR, &reuse, sizeof(bool));
- UDT::setsockopt(retFD, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
-
- if (UDT::ERROR == UDT::bind(retFD, res->ai_addr, res->ai_addrlen))
- {
- __android_log_print(ANDROID_LOG_WARN, TAG, "err, init failed:%s", UDT::getlasterror().getErrorMessage() );
- cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
- return -1;
- }
-
- *usock = retFD;
-
- freeaddrinfo(res);
- return 0;
- }
-
- int connect_udt(void* usocket, char *peerip, int port){
- if (usocket == NULL || peerip == NULL){
- return -1;
- }
- UDTSOCKET usock = *(UDTSOCKET*)usocket;
-
- addrinfo hints, *peer;
- memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_flags = AI_PASSIVE;
- hints.ai_family = g_IP_Version;
- hints.ai_socktype = g_Socket_Type;
-
- char buffer[16];
- sprintf(buffer, "%d", port);
-
- if (0 != getaddrinfo(peerip, buffer, &hints, &peer))
- {
- return -1;
- }
-
- int newsock = UDT::connect(usock, peer->ai_addr, peer->ai_addrlen);
-
- freeaddrinfo(peer);
- return newsock;
- }
-
- int recvdata_udt(void* usocket, char* buf, int size, int flags){
- if (usocket == NULL){
- return -1;
- }
- UDTSOCKET recver = *(UDTSOCKET*)usocket;
- int rs;
-
- if (UDT::ERROR == (rs = UDT::recvmsg(recver, buf, size)))
- {
- cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
- if (UDT::getlasterror().getErrorCode() == 2001 || UDT::getlasterror().getErrorCode() == 5004){
- return -1;
- }
- __android_log_print(ANDROID_LOG_WARN, TAG, "err, recvdata_udt failed:%s", UDT::getlasterror().getErrorMessage() );
- return -1;
- }
- return rs;
- }
-
- int set_connect_buff_udt(void* usocket, int send_buf_size, int recv_buf_size){
- if (usocket == NULL){
- return -1;
- }
- UDTSOCKET sock = *(UDTSOCKET*)usocket;
- UDT::setsockopt(sock, 0, UDT_SNDBUF, &send_buf_size, sizeof(int));
- UDT::setsockopt(sock, 0, UDT_RCVBUF, &recv_buf_size, sizeof(int));
-
- return 0;
- }
-
- int senddata_udt(void* usocket, char* buf, int size, int flags){
-
- if (usocket == NULL){
- return -1;
- }
- UDTSOCKET client = *(UDTSOCKET*)usocket;
-
- int sent = 0;
- while (sent < size){
-
- int ret = UDT::sendmsg(client, (((const char*)buf) + sent), (int)(size - sent), -1, true);
- if (sent < 0)
- {
- __android_log_print(ANDROID_LOG_WARN, TAG, "err, senddata_udt failed:%s", UDT::getlasterror().getErrorMessage() );
-
- cout << "send: " << UDT::getlasterror().getErrorMessage() << endl;
- return -1;
- }
- else{
- __android_log_print(ANDROID_LOG_WARN, TAG, " senddata_udt ret:%d, size:%d", ret, size);
-
- sent += ret;
- }
- }
-
- return sent;
- }
-
- int listen_udt(void* usocket){
- if (usocket == NULL){
- return -1;
- }
- UDTSOCKET serv = *(UDTSOCKET*)usocket;
-
- return UDT::listen(serv, 1024);
- }
-
- int accept_udt(void* usocket){
- if (usocket == NULL){
- return -1;
- }
- UDTSOCKET serv = *(UDTSOCKET*)usocket;
- sockaddr_in clientaddr;
- int addrlen = sizeof(clientaddr);
- UDTSOCKET new_sock = UDT::accept(serv, (sockaddr*)&clientaddr, &addrlen);
-
- if (new_sock == UDT::INVALID_SOCK)
- {
- return -1;
- }
- char ip[16];
- cout << "new connection: " << inet_ntoa(clientaddr.sin_addr) << ":" << ntohs(clientaddr.sin_port) << endl;
-
-
- return new_sock;
- }
-
- int close_udt(int usock){
- UDT::close(usock);
- return 0;
- }
转载地址:http://eczoi.baihongyu.com/