linphone-developers
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Linphone-developers] measuring end to end delay


From: Iban Lopetegi Zinkunegi
Subject: [Linphone-developers] measuring end to end delay
Date: Tue, 21 Sep 2010 09:41:29 +0000

Hi all,

I am not sure whether this is the right place to ask this, so apologise in advance if this is the wrong place.

I am trying to measure the end to end delay or RoundTrip time for RTP packets. So I ve got computer A with a RTP_Sender that sends packets to another computer B which simply Receives the packet and sends it back. Both computers have a sender and a receiver programed with threads. Now, if i leave the computer to handle the threads, the scheduling of them appears to be faulty, meaning faulty that spends too long periods on a thread, causing many packets to be lost. To avoid this, I use a jumping system where each time I receive a packet I send a packet. In order to do that, i create a queue where all received payloads are copied and taken by the receiver and sender respectively.   

The problem is that in a 10Mbps ethernet I am getting end to end delays of about 2 seconds which is far too much, and it is obvious that I am some programming is wrong. I have tried changing rtp_session_set_scheduling_mode and rtp_session_set_blocking_mode by putting the to 0 but there is no difference.

The pseudo code for the proxy is below. The code has been checked with Valgrind for memory allocation faults, so ignore variables that i havent posted here. My main, concern lies in "my_rtp_session_recv_with_ts_v2" which is virtually a "rtp_session_recv_with_ts " function but adapted so that i can get the sequence number.This is important for me here so that i can keep a track of the packets.

Thanks


/*------------Proxy pseudo code-----------*/
/*----Global Variables---*/
volatile sig_atomic_t runcond;

pthread_mutex_t token_mutex;
pthread_cond_t ReceiveData_cond;//=PTHREAD_COND_INITIALIZER;
int breceive;
pthread_cond_t SendData_cond;//=PTHREAD_COND_INITIALIZER;
int bsend;
pthread_cond_t Start_cond;

struct _cdn_buffer{
        unsigned char * buf_f;
    uint16_t cseq_f;
};
typedef struct     _cdn_buffer cdn_buffer;

/*----END Global Variables---*/
int main (int argc, char ** argv)
{

breceive=0;
bsend=0;
pthread_cond_init(&ReceiveData_cond,NULL);
pthread_cond_init(&SendData_cond,NULL);
pthread_cond_init(&Start_cond,NULL);
...
runcond=1;
pthread_create (&pid_rtp_sendera, NULL, &send_rtpa, (void *) &channels);
pthread_create (&pid_rtp_receiver, NULL, &receive_rtp_ntwcdn, (void *) &channels);

pthread_join (pid_rtp_sendera,NULL);
pthread_join (pid_rtp_receiver,NULL);
...
}


int my_rtp_session_recv_with_ts_v2 (uint16_t * cseq_tmp,RtpSession * session, uint8_t * buffer,
                   int len, uint32_t ts, int * have_more){
    mblk_t *mp=NULL;
    int plen=0,blen=0;
    *have_more=0;
    rtp_header_t *hdr;

    while(1){
        if (session->pending){
            mp=session->pending;
            session->pending=NULL;
        }else {
            mp=rtp_session_recvm_with_ts(session,ts);
            
            if (mp!=NULL) {
                    hdr=(rtp_header_t*)mp->b_rptr;
                    rtp_get_payload(mp,&mp->b_rptr);
            }
        }
        if (mp){
            plen=mp->b_wptr-mp->b_rptr;
            if (plen<len){
                memcpy(buffer,mp->b_rptr,plen);
                buffer+=plen;
                blen+=plen;
                len-=plen;
                freemsg(mp);
                mp=NULL;
            }else{
                memcpy(buffer,mp->b_rptr,len);
                *cseq_tmp=(hdr->seq_number);
                //if (DEBUG){printf("Tcsq %d",*cseq_tmp);fflush(stdout);}
                mp->b_rptr+=len;
                buffer+=len;
                blen+=len;
                len=0;
                session->pending=mp;
                *have_more=1;
                break;
            }
        }else    
            break;
    }
    return blen;
}


void* receive_rtp_ntwcdn (void* unused)
{

int have_more;
int payloadsize=160;//PCM
RtpSession *session;
unsigned char * payload;
if (!(payload=malloc((payloadsize)*sizeof(char))))printf("malloc error for payload\n");
uint16_t diff_cseq,Lcseq,Rcseq,cseq_tmp;
diff_cseq=0;
Lcseq=0;
Rcseq=0;
cseq_tmp=0;
int clost=0;

...
recv_session_set=session_set_new();
session_rx=rtp_session_new(RTP_SESSION_RECVONLY);
rtp_session_set_scheduling_mode(session_rx,0);
rtp_session_set_blocking_mode(session_rx,0);
rtp_session_set_local_addr(session_rx,myip,nsrcport);
rtp_session_enable_adaptive_jitter_compensation(session_rx,adapt);
rtp_session_set_jitter_compensation(session_rx,jittcomp);
rtp_session_set_payload_type(session_rx,0);
rtp_session_signal_connect(session_rx,"ssrc_changed",(RtpCallback)ssrc_cb,0);
rtp_session_signal_connect(session_rx,"ssrc_changed",(RtpCallback)rtp_session_reset,0);    


...
unsined char buffer[160];
while (runcond){
    have_more=1;
    while ((have_more==1))
    {
        pthread_mutex_lock(&token_mutex);
        err=my_rtp_session_recv_with_ts_v2(&cseq_tmp,session,payload,payloadsize,ts,&have_more);         
        if (err>0){
            cdnbuffer.cseq_f=cseq_tmp;
            Rcseq=cdnbuffer.cseq_f;
            diff_cseq=Rcseq-Lcseq;
            if (diff_cseq>1){
                clost=clost+diff_cseq-1;
                printf("rXA Lost %d with Lcseq %d ",clost,Lcseq);fflush(stdout);
                Lcseq=Rcseq;
            }else Lcseq=Rcseq;

            /*Add to a queue*/
            strncpy(cdnbuffer.buf_f, payload, payloadsize);
            if (q_position_a[pair]>10)//to keep buffer small
                q_position_a[pair]=q_delete(queue_a[pair],q_position_a[pair],payloadsize);
            q_position_a[pair] =q_add(queue_a[pair],q_position_a[pair],&cdnbuffer,payloadsize);

            /*Jump to the sender*/
            bsend=1;
            pthread_cond_signal(&SendData_cond);
            breceive=0;        
            while (!breceive)
                pthread_cond_wait(&ReceiveData_cond,&token_mutex);
              pthread_mutex_unlock(&token_mutex);        
                                
            
        }    
        
        ts+=payloadsize;

    }
}
   

void* send_rtpa (void* unused)
{
int have_more;
int payloadsize=160;//PCM
RtpSession *session;
unsigned char * payload;
if (!(payload=malloc((payloadsize)*sizeof(char))))printf("malloc error for payload\n");
...

send_session_set=session_set_new();
session=rtp_session_new(RTP_SESSION_SENDONLY);
rtp_session_set_scheduling_mode(session,1);
rtp_session_set_blocking_mode(session,1);
rtp_session_set_symmetric_rtp(session,FALSE);
rtp_session_set_remote_addr(session,destip,ndestport);
rtp_session_set_local_addr (session, myip, nsrcport);
rtp_session_set_payload_type(session,0);

...

while (runcond){
    pthread_mutex_lock(&token_mutex);

        /*Wait until packet received*/
        while(!bsend)
            pthread_cond_wait(&SendData_cond,&token_mutex);
        if (q_position_a[pair]>0){
            while(q_position_a[pair]!=0){
                q_position_a[pair]=q_get(queue_a[pair], q_position_a[pair], & cdnbuffer_a, payloadsize);
                strncpy(buffer_ex, cdnbuffer_a.buf_f,payloadsize);
                add_to_payload_padding(buffer_ex,payloadsize,cdnbuffer_a.cseq_f,0);
                rtp_session_send_with_ts(session,buffer_ex,payloadsize,user_ts);
                user_ts+=payloadsize;
            }
            
        }
        
        breceive=1;
        pthread_cond_signal(&ReceiveData_cond);
        bsend=0;


    pthread_mutex_unlock(&token_mutex);
    }


}

;


reply via email to

[Prev in Thread] Current Thread [Next in Thread]