include/ortp/rtpsession.h | 19 +++++- src/rtcpparse.c | 159 +++++++++++++++++++++++++++++++++++---------- src/rtpsession.c | 5 ++ src/rtpsession_inet.c | 50 +++++++++----- src/rtpsession_priv.h | 5 +- 5 files changed, 184 insertions(+), 54 deletions(-) diff --git a/include/ortp/rtpsession.h b/include/ortp/rtpsession.h index 64960b8..649f6fb 100644 --- a/include/ortp/rtpsession.h +++ b/include/ortp/rtpsession.h @@ -114,7 +114,7 @@ typedef struct _RtpStream struct sockaddr_in rem_addr; #endif int rem_addrlen; - void *QoSHandle; + void *QoSHandle; unsigned long QoSFlowID; JitterControl jittctl; uint32_t snd_time_offset;/*the scheduler time when the application send its first timestamp*/ @@ -167,6 +167,19 @@ typedef struct _RtcpStream typedef struct _RtpSession RtpSession; +typedef struct _RtpQosInfos { + int curx; + struct { + uint32_t seqnum; + uint32_t interval_losses; + uint32_t remote_jitter; + uint32_t total_losses; + uint32_t interval_packets; + uint32_t loss_fraction; + uint32_t rtt; + } data[2]; +} RtpQosInfo; + /** * An object representing a bi-directional RTP session. @@ -198,6 +211,9 @@ struct _RtpSession RtpSignalTable on_timestamp_jump; RtpSignalTable on_network_error; RtpSignalTable on_rtcp_bye; + RtpSignalTable on_qos_event; + RtpSignalTable on_rtcp_packet; + struct _OList *signal_tables; struct _OList *eventqs; msgb_allocator_t allocator; @@ -221,6 +237,7 @@ struct _RtpSession bool_t permissive; /*use the permissive algorithm*/ bool_t use_connect; /* use connect() on the socket */ bool_t ssrc_set; + RtpQosInfo qos; }; diff --git a/src/rtcpparse.c b/src/rtcpparse.c index 64df2c7..35d39c5 100644 --- a/src/rtcpparse.c +++ b/src/rtcpparse.c @@ -287,54 +287,143 @@ void rtcp_APP_get_data(const mblk_t *m, uint8_t **data, int *len){ } } +static int big_change(uint32_t v1, uint32_t v2, int big_thresold) +{ + int diff = v1 - v2; + uint32_t mean; + + mean = ((v1 + v2) / 2) ; + + if (!mean) + return 0; + + if (diff < 0) + diff = -diff; + + diff = diff*100 / mean; + + return diff > big_thresold; + +} + +#define ntp64_to_ntp32(ntp_sec, ntp_frac) \ + ((((ntp_sec) & 0x0000ffff) << 16) | \ + (((ntp_frac) & 0xffff0000) >> 16)) + +#define ntp32_sub(now, then) ((now) > (then)) ? ((now) - (then)) : ((now) - (then) + 0xffffffff) + +static double +rtp_rtt_calc(uint32_t arr, uint32_t dep, uint32_t delay) +{ + uint32_t delta; + + /* rtt = arr - dep - delay */ + delta = ntp32_sub(arr, dep); + delta = ntp32_sub(delta, delay); + /* + * 16 high order bits are seconds + * 16 low order bits are 1/65536 of sec + */ + return (double)((delta >> 16) & 0xffff) + + (double)(delta & 0xffff) / 65536.0; +} + +#define SECS_BETWEEN_1900_1970 2208988800u + /*old functions: deprecated, but some useful code parts can be reused */ -/* Start from now this source code file was written by Nicola Baldo as an extension of +/* Start from now this source code file was written by Nicola Baldo as an extension of the oRTP library. Copyright (C) 2005 Nicola Baldo address@hidden/ - void report_block_parse(RtpSession *session, report_block_t *rb, struct timeval rcv_time_tv) { - rb->ssrc = ntohl(rb->ssrc); + rb->ssrc = ntohl(rb->ssrc); - if ( rb->ssrc != session->snd.ssrc ) - - { - ortp_debug("Received rtcp report block related to unknown ssrc (not from us)... discarded"); - return; - } + if ( rb->ssrc != session->snd.ssrc ) - else + { + ortp_debug("Received rtcp report block related to unknown ssrc (not from us)... discarded"); + return; + } - { - uint32_t rcv_time_msw; - uint32_t rcv_time_lsw; - uint32_t rcv_time; - double rtt; + else - rcv_time_msw = rcv_time_tv.tv_sec; + { + uint32_t rcv_time_msw; + uint32_t rcv_time_lsw; + uint32_t rcv_time; + double rtt; + int newx,curx,generate_qos_event = 1; + + //doesn't work +/* rcv_time_msw = rcv_time_tv.tv_sec; #if defined(_WIN32_WCE) - rcv_time_lsw = (uint32_t) ((double)rcv_time_tv.tv_usec*(double)(((uint64_t)1)<<32)*1.0e-6); + rcv_time_lsw = (uint32_t) ((double)rcv_time_tv.tv_usec*(double)(((uint64_t)1)<<32)*1.0e-6); #else - rcv_time_lsw = (uint32_t) ((double)rcv_time_tv.tv_usec*(double)(1LL<<32)*1.0e-6); + rcv_time_lsw = (uint32_t) ((double)rcv_time_tv.tv_usec*(double)(1LL<<32)*1.0e-6); #endif - rcv_time = (rcv_time_msw<<16) | (rcv_time_lsw >> 16); + rcv_time = (rcv_time_msw<<16) | (rcv_time_lsw >> 16);*/ -/* - rb->cum_num_packet_lost = ntoh24(rb->cum_num_packet_lost); - rb->ext_high_seq_num_rec = ntohl(rb->ext_high_seq_num_rec); - rb->interarrival_jitter = ntohl(rb->interarrival_jitter); - rb->lsr = ntohl(rb->lsr); - rb->delay_snc_last_sr = ntohl(rb->delay_snc_last_sr); -*/ - - /* calculating Round Trip Time*/ - if (rb->lsr != 0) - { - rtt = (double) (rcv_time - rb->delay_snc_last_sr - rb->lsr); - rtt = rtt/65536; - //printf("RTT = %f s\n",rtt); - } + rcv_time_msw = rcv_time_tv.tv_sec + SECS_BETWEEN_1900_1970; + rcv_time_lsw = (rcv_time_tv.tv_usec << 12) + (rcv_time_tv.tv_usec << 8) - ((rcv_time_tv.tv_usec * 3650) >> 6); - } + rcv_time = ntp64_to_ntp32(rcv_time_msw, rcv_time_lsw); + + rb->fl_cnpl = ntohl(rb->fl_cnpl); + + rb->ext_high_seq_num_rec = ntohl(rb->ext_high_seq_num_rec); + rb->interarrival_jitter = ntohl(rb->interarrival_jitter); + rb->lsr = ntohl(rb->lsr); + rb->delay_snc_last_sr = ntohl(rb->delay_snc_last_sr); + + /* calculating Round Trip Time*/ + if (rb->lsr != 0) + { + rtt = rtp_rtt_calc(rcv_time,rb->lsr,rb->delay_snc_last_sr ); + //printf("RTT = %f s\n",rtt); + } + else + rtt = 100000.0; // This is an arbitrary BIG number + + curx = session->qos.curx; + newx = (!session->qos.data[curx].seqnum) ? 0 : !curx; + session->qos.data[newx].seqnum = rb->ext_high_seq_num_rec; + session->qos.data[newx].total_losses = rb->fl_cnpl & 0x00FFFFFF; + session->qos.data[newx].loss_fraction = (rb->fl_cnpl >> 24) & 0x0ff; + session->qos.data[newx].remote_jitter = rb->interarrival_jitter; + session->qos.data[newx].rtt = (uint32_t) rtt; + + if (newx != curx) { + session->qos.data[newx].interval_losses = + session->qos.data[newx].total_losses - session->qos.data[curx].total_losses; + + session->qos.data[newx].interval_packets = + session->qos.data[newx].seqnum - session->qos.data[curx].seqnum; + + + if (big_change(session->qos.data[newx].interval_losses, session->qos.data[curx].interval_losses, 10 )) + generate_qos_event++; + + if (big_change(session->qos.data[newx].remote_jitter, session->qos.data[curx].remote_jitter, 10 )) + generate_qos_event++; + + if (big_change(session->qos.data[newx].interval_packets, session->qos.data[curx].interval_packets, 10)) + generate_qos_event++; + + if (big_change(session->qos.data[newx].rtt, session->qos.data[curx].rtt, 10)) + generate_qos_event++; + + if (session->qos.data[newx].loss_fraction != session->qos.data[curx].loss_fraction) + generate_qos_event++; + + } + + + + if (generate_qos_event) + rtp_signal_table_emit(&session->on_qos_event); + + session->qos.curx = newx; + + } } diff --git a/src/rtpsession.c b/src/rtpsession.c index 126d8e7..b4e2eed 100644 --- a/src/rtpsession.c +++ b/src/rtpsession.c @@ -268,6 +268,9 @@ rtp_session_init (RtpSession * session, int mode) rtp_signal_table_init (&session->on_timestamp_jump,session,"timestamp_jump"); rtp_signal_table_init (&session->on_network_error,session,"network_error"); rtp_signal_table_init (&session->on_rtcp_bye,session,"rtcp_bye"); + rtp_signal_table_init (&session->on_qos_event,session,"qos_event"); + rtp_signal_table_init (&session->on_rtcp_packet,session,"rtcp_packet"); + wait_point_init(&session->snd.wp); wait_point_init(&session->rcv.wp); /*defaults send payload type to 0 (pcmu)*/ @@ -288,6 +291,7 @@ rtp_session_init (RtpSession * session, int mode) session->symmetric_rtp = FALSE; session->permissive=FALSE; msgb_allocator_init(&session->allocator); + rtp_qos_reset(&session->qos); } @@ -1399,6 +1403,7 @@ void rtp_session_resync(RtpSession *session){ rtp_session_set_flag(session, RTP_SESSION_RECV_SYNC); rtp_session_unset_flag(session,RTP_SESSION_FIRST_PACKET_DELIVERED); jitter_control_init(&session->rtp.jittctl,-1,NULL); + rtp_qos_reset(&session->qos); } /** diff --git a/src/rtpsession_inet.c b/src/rtpsession_inet.c index 8262d4c..5d8266d 100644 --- a/src/rtpsession_inet.c +++ b/src/rtpsession_inet.c @@ -861,6 +861,7 @@ rtp_session_rtp_send (RtpSession * session, mblk_t * m) struct sockaddr *destaddr=(struct sockaddr*)&session->rtp.rem_addr; socklen_t destlen=session->rtp.rem_addrlen; ortp_socket_t sockfd=session->rtp.socket; + int needresend = ( 0 != (session->flags & RTP_SESSION_DOUBLE_SEND)); hdr = (rtp_header_t *) m->b_rptr; /* perform host to network conversions */ @@ -875,26 +876,40 @@ rtp_session_rtp_send (RtpSession * session, mblk_t * m) destlen=0; } - if (rtp_session_using_transport(session, rtp)){ - error = (session->rtp.tr->t_sendto) (session->rtp.tr,m,0,destaddr,destlen); - }else{ + /* + * if packet retransmission is enabled we're using following to decide whether + * resend it or not: + * if x% of packets are lost we resend x*2% of packets + * (the modulo 256 is used to account for the fact + * that RTCP fraction lost variable is using 256 and not 100 as fractional base) + */ + if (needresend) { + if (session->qos.data[session->qos.curx].loss_fraction*2 < (random() % 256)) + needresend = 0; + } + + do { + if (rtp_session_using_transport(session, rtp)){ + error = (session->rtp.tr->t_sendto) (session->rtp.tr,m,0,destaddr,destlen); + }else{ #ifdef USE_SENDMSG - error=rtp_sendmsg(sockfd,m,destaddr,destlen); + error=rtp_sendmsg(sockfd,m,destaddr,destlen); #else - if (m->b_cont!=NULL) - msgpullup(m,-1); - error = sendto (sockfd, (char*)m->b_rptr, (int) (m->b_wptr - m->b_rptr), - 0,destaddr,destlen); + if (m->b_cont!=NULL) + msgpullup(m,-1); + error = sendto (sockfd, (char*)m->b_rptr, (int) (m->b_wptr - m->b_rptr), + 0,destaddr,destlen); #endif - } - if (error < 0){ - if (session->on_network_error.count>0){ - rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode())); - }else ortp_warning ("Error sending rtp packet: %s ; socket=%i", getSocketError(), sockfd); - session->rtp.send_errno=getSocketErrorCode(); - }else{ - update_sent_bytes(session,error); - } + } + if (error < 0){ + if (session->on_network_error.count>0){ + rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode())); + }else ortp_warning ("Error sending rtp packet: %s ; socket=%i", getSocketError(), sockfd); + session->rtp.send_errno=getSocketErrorCode(); + }else{ + update_sent_bytes(session,error); + } + } while(needresend--); freemsg (m); return error; } @@ -1017,6 +1032,7 @@ rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) } void rtp_session_notify_inc_rtcp(RtpSession *session, mblk_t *m){ + rtp_signal_table_emit2(&session->on_rtcp_packet,(long)(long)m); if (session->eventqs!=NULL){ OrtpEvent *ev=ortp_event_new(ORTP_EVENT_RTCP_PACKET_RECEIVED); OrtpEventData *d=ortp_event_get_data(ev); diff --git a/src/rtpsession_priv.h b/src/rtpsession_priv.h index 24e9556..e7fc267 100644 --- a/src/rtpsession_priv.h +++ b/src/rtpsession_priv.h @@ -33,7 +33,8 @@ typedef enum { RTP_SESSION_USING_EXT_SOCKETS=1<<7, /* the session is using externaly supplied sockets */ RTP_SOCKET_CONNECTED=1<<8, RTCP_SOCKET_CONNECTED=1<<9, - RTP_SESSION_USING_TRANSPORT=1<<10 + RTP_SESSION_USING_TRANSPORT=1<<10, + RTP_SESSION_DOUBLE_SEND=1<<11 }RtpSessionFlags; #define rtp_session_using_transport(s, stream) (((s)->flags & RTP_SESSION_USING_TRANSPORT) && (s->stream.tr != 0)) @@ -51,4 +52,6 @@ void rtp_session_rtcp_parse(RtpSession *session, mblk_t *mp); void rtp_session_dispatch_event(RtpSession *session, OrtpEvent *ev); +#define rtp_qos_reset(qos) { memset((qos), 0, sizeof(*(qos))); } + #endif