[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r5088 - GNUnet/src/transports
From: |
gnunet |
Subject: |
[GNUnet-SVN] r5088 - GNUnet/src/transports |
Date: |
Sat, 16 Jun 2007 04:35:17 -0600 (MDT) |
Author: grothoff
Date: 2007-06-16 04:35:16 -0600 (Sat, 16 Jun 2007)
New Revision: 5088
Modified:
GNUnet/src/transports/Makefile.am
GNUnet/src/transports/http.c
Log:
rudimentary http working
Modified: GNUnet/src/transports/Makefile.am
===================================================================
--- GNUnet/src/transports/Makefile.am 2007-06-16 09:53:54 UTC (rev 5087)
+++ GNUnet/src/transports/Makefile.am 2007-06-16 10:35:16 UTC (rev 5088)
@@ -36,7 +36,7 @@
ip.c ip.h
libgnunetip_la_LIBADD = \
$(top_builddir)/src/util/libgnunetutil.la
-
+
libgnunetip6_la_SOURCES = \
ip6.c ip6.h
Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c 2007-06-16 09:53:54 UTC (rev 5087)
+++ GNUnet/src/transports/http.c 2007-06-16 10:35:16 UTC (rev 5088)
@@ -25,8 +25,9 @@
*
* TODO:
* - connection timeout (shutdown inactive connections)
- * - proper connection shutdown (free resources, especially
- * check completed CURL puts)
+ * - proper connection re-establishment
+ * - nothing copies TO wbuff, only from (see FIXMEs)
+ * - free resources allocated for PUT!
*/
#include "gnunet_util.h"
@@ -182,11 +183,6 @@
CURL * get;
/**
- * PUT operation
- */
- CURL * put;
-
- /**
* URL of the get and put operations.
*/
char * url;
@@ -197,6 +193,19 @@
} HTTPSession;
+struct HTTPPutData {
+ struct HTTPPutData * next;
+
+ char * msg;
+
+ CURL * curl_put;
+
+ unsigned int size;
+
+ unsigned int pos;
+
+};
+
/* *********** globals ************* */
/**
@@ -224,8 +233,14 @@
*/
static CURLM * curl_multi;
+/**
+ * Set to YES while the transport is running.
+ */
static int http_running;
+/**
+ * Thread running libcurl activities.
+ */
static struct PTHREAD * curl_thread;
/**
@@ -248,6 +263,11 @@
static UPnP_ServiceAPI * upnp;
/**
+ * List of active PUT requests.
+ */
+static struct HTTPPutData * putHead;
+
+/**
* Lock for access to mutable state of the module,
* that is the configuration and the tsessions array.
* Note that we ONLY need to synchronize access to
@@ -542,8 +562,9 @@
if ( (strlen(url) < 2) ||
(OK != enc2hash(&url[1],
- &client)) )
+ &client)) ) {
return MHD_NO;
+ }
/* check if we already have a session for this */
MUTEX_LOCK(httplock);
@@ -581,7 +602,6 @@
httpSession->lastUse = get_time();
httpSession->is_client = NO;
httpSession->cs.client.get = NULL;
- httpSession->cs.client.put = NULL;
tsession = MALLOC(sizeof(TSession));
tsession->ttype = HTTP_PROTOCOL_NUMBER;
tsession->internal = httpSession;
@@ -589,7 +609,7 @@
httpSession->tsession = tsession;
addTSession(tsession);
}
- if (0 == strcmp("get", method)) {
+ if (0 == strcmp("GET", method)) {
/* handle get */
response = MHD_create_response_from_callback(-1,
contentReaderCallback,
@@ -599,7 +619,7 @@
MHD_queue_response(session,
MHD_HTTP_OK,
response);
- } else if (0 == strcmp("put", method)) {
+ } else if (0 == strcmp("PUT", method)) {
/* handle put (upload_data!) */
MUTEX_LOCK(httpSession->lock);
poff = 0;
@@ -616,20 +636,16 @@
httpSession->rpos1 += cpy;
have -= cpy;
poff += cpy;
+ httpSession->rpos2 = 0;
}
if (httpSession->rpos1 < sizeof(MESSAGE_HEADER))
break;
hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
GROW(httpSession->rbuff2,
httpSession->rsize2,
- ntohs(hdr->size));
- memcpy(httpSession->rbuff2,
- httpSession->rbuff1,
- sizeof(MESSAGE_HEADER));
- GE_ASSERT(NULL,
- httpSession->rpos2 <= ntohs(hdr->size));
- if (httpSession->rpos2 < ntohs(hdr->size)) {
- cpy = ntohs(hdr->size) - httpSession->rpos2;
+ ntohs(hdr->size) - sizeof(MESSAGE_HEADER));
+ if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER)) {
+ cpy = ntohs(hdr->size) - sizeof(MESSAGE_HEADER) - httpSession->rpos2;
if (cpy > have)
cpy = have;
memcpy(&httpSession->rbuff2[httpSession->rpos2],
@@ -637,21 +653,25 @@
cpy);
have -= cpy;
poff += cpy;
+ httpSession->rpos2 += cpy;
}
- if (httpSession->rpos2 < ntohs(hdr->size))
+ if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER))
break;
mp = MALLOC(sizeof(P2P_PACKET));
mp->msg = httpSession->rbuff2;
mp->sender = httpSession->sender;
mp->tsession = httpSession->tsession;
+ mp->size = ntohs(hdr->size) - sizeof(MESSAGE_HEADER);
coreAPI->receive(mp);
httpSession->rbuff2 = NULL;
httpSession->rpos2 = 0;
httpSession->rsize2 = 0;
httpSession->rpos1 = 0;
}
- } else
+ MUTEX_UNLOCK(httpSession->lock);
+ } else {
return MHD_NO; /* must be get or put! */
+ }
return MHD_YES;
}
@@ -671,6 +691,8 @@
MESSAGE_HEADER * hdr;
P2P_PACKET * mp;
+ printf("Receiving %u bytes from GET\n",
+ have);
while (have > 0) {
if (httpSession->rpos1 < sizeof(MESSAGE_HEADER)) {
cpy = sizeof(MESSAGE_HEADER) - httpSession->rpos1;
@@ -682,20 +704,18 @@
httpSession->rpos1 += cpy;
have -= cpy;
poff += cpy;
+ httpSession->rpos2 = 0;
}
if (httpSession->rpos1 < sizeof(MESSAGE_HEADER))
return size * nmemb;
hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
GROW(httpSession->rbuff2,
httpSession->rsize2,
- ntohs(hdr->size));
- memcpy(httpSession->rbuff2,
- httpSession->rbuff1,
- sizeof(MESSAGE_HEADER));
- GE_ASSERT(NULL,
- httpSession->rpos2 <= ntohs(hdr->size));
- if (httpSession->rpos2 < ntohs(hdr->size)) {
- cpy = ntohs(hdr->size) - httpSession->rpos2;
+ ntohs(hdr->size) - sizeof(MESSAGE_HEADER));
+ printf("Expecting message of %u bytes via GET\n",
+ ntohs(hdr->size));
+ if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER)) {
+ cpy = ntohs(hdr->size) - sizeof(MESSAGE_HEADER) - httpSession->rpos2;
if (cpy > have)
cpy = have;
memcpy(&httpSession->rbuff2[httpSession->rpos2],
@@ -703,13 +723,16 @@
cpy);
have -= cpy;
poff += cpy;
+ httpSession->rpos2 += cpy;
}
- if (httpSession->rpos2 < ntohs(hdr->size))
+ if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER))
return size * nmemb;
mp = MALLOC(sizeof(P2P_PACKET));
mp->msg = httpSession->rbuff2;
mp->sender = httpSession->sender;
mp->tsession = httpSession->tsession;
+ mp->size = ntohs(hdr->size) - sizeof(MESSAGE_HEADER);
+ printf("Passing message from GET to core!\n");
coreAPI->receive(mp);
httpSession->rbuff2 = NULL;
httpSession->rpos2 = 0;
@@ -727,27 +750,15 @@
size_t size,
size_t nmemb,
void * ctx) {
- HTTPSession * httpSession = ctx;
+ struct HTTPPutData * put = ctx;
size_t max = size * nmemb;
- MUTEX_LOCK(httpSession->lock);
- if (max > httpSession->wpos)
- max = httpSession->wpos;
+ if (max > put->size - put->pos)
+ max = put->size - put->pos;
memcpy(ptr,
- &httpSession->wbuff[httpSession->woff],
+ &put->msg[put->pos],
max);
- httpSession->woff += max;
- httpSession->wpos -= max;
- if (httpSession->woff == httpSession->wpos) {
- httpSession->woff = 0;
- httpSession->wpos = 0;
- }
- if (max == 0) {
- /* if we have nothing to sent, this will terminate
- the session (CURL API requires this) */
- httpSession->cs.client.put = NULL;
- }
- MUTEX_UNLOCK(httpSession->lock);
+ put->pos += max;
return max;
}
@@ -771,6 +782,9 @@
char * url;
EncName enc;
+ /* FIXME: check if we have a GET pending for
+ this peer, and if so, use that! */
+
curl_get = curl_easy_init();
if (curl_get == NULL)
return SYSERR;
@@ -780,8 +794,9 @@
url = MALLOC(64 + sizeof(EncName));
SNPRINTF(url,
64 + sizeof(EncName),
- "http://%u.%u.%u.%u/%s",
+ "http://%u.%u.%u.%u:%u/%s",
PRIP(ntohl(*(int*)&haddr->ip.addr)),
+ ntohs(haddr->port),
&enc);
/* create GET */
@@ -818,8 +833,6 @@
httpSession);
if (ret != CURLE_OK)
goto cleanup;
-
- /* FIXME: should we queue here or wait until we have data!? */
mret = curl_multi_add_handle(curl_multi, curl_get);
if (mret != CURLM_OK) {
GE_LOG(coreAPI->ectx,
@@ -848,7 +861,6 @@
httpSession->lastUse = get_time();
httpSession->is_client = YES;
httpSession->cs.client.get = curl_get;
- httpSession->cs.client.put = NULL;
tsession = MALLOC(sizeof(TSession));
httpSession->tsession = tsession;
tsession->ttype = HTTP_PROTOCOL_NUMBER;
@@ -865,7 +877,9 @@
}
static CURL *
-create_curl_put(HTTPSession * httpSession) {
+create_curl_put(HTTPSession * httpSession,
+ struct HTTPPutData * put,
+ unsigned int size) {
CURL * curl_put;
CURLcode ret;
@@ -899,13 +913,13 @@
150L);
CURL_EASY_SETOPT(curl_put,
CURLOPT_INFILESIZE_LARGE,
- 0);
+ size);
CURL_EASY_SETOPT(curl_put,
CURLOPT_READFUNCTION,
&sendContentCallback);
CURL_EASY_SETOPT(curl_put,
CURLOPT_READDATA,
- httpSession);
+ put);
if (ret != CURLE_OK) {
curl_easy_cleanup(curl_put);
return NULL;
@@ -923,90 +937,63 @@
*/
static int httpSend(TSession * tsession,
const void * msg,
- const unsigned int size,
+ unsigned int size,
int important) {
HTTPSession * httpSession = tsession->internal;
+ struct HTTPPutData * putData;
CURL * curl_put;
CURLMcode mret;
+ MESSAGE_HEADER * hdr;
+ /* FIXME: check if we have a GET pending for
+ this peer, and if so, use that! */
+
if (size >= MAX_BUFFER_SIZE)
return SYSERR;
if (size == 0) {
GE_BREAK(NULL, 0);
return SYSERR;
}
+ putData = MALLOC(sizeof(struct HTTPPutData));
+ putData->msg = MALLOC(size + sizeof(MESSAGE_HEADER));
+ hdr = (MESSAGE_HEADER*) putData->msg;
+ hdr->size = htons(size + sizeof(MESSAGE_HEADER));
+ hdr->type = htons(0);
+ memcpy(&putData->msg[sizeof(MESSAGE_HEADER)],
+ msg,
+ size);
+ putData->size = size + sizeof(MESSAGE_HEADER);
MUTEX_LOCK(httpSession->lock);
- if (httpSession->cs.client.put == NULL) {
- /* first data to send, add PUT to multi set! */
- curl_put = create_curl_put(httpSession);
- if (curl_put == NULL) {
- MUTEX_UNLOCK(httpSession->lock);
- return SYSERR;
- }
- httpSession->cs.client.put = curl_put;
- mret = curl_multi_add_handle(curl_multi, curl_put);
- if (mret != CURLM_OK) {
- GE_LOG(coreAPI->ectx,
- GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
- _("%s failed at %s:%d: `%s'\n"),
- "curl_multi_add_handle",
- __FILE__,
- __LINE__,
- curl_multi_strerror(mret));
- curl_easy_cleanup(curl_put);
- httpSession->cs.client.put = NULL;
- MUTEX_UNLOCK(httpSession->lock);
- return SYSERR;
- }
+ curl_put = create_curl_put(httpSession,
+ putData,
+ size + sizeof(MESSAGE_HEADER));
+ MUTEX_UNLOCK(httpSession->lock);
+ putData->curl_put = curl_put;
+ if (curl_put == NULL) {
+ FREE(putData->msg);
+ FREE(putData);
+ return SYSERR;
}
-
- if ( (httpSession->wsize > HTTP_BUF_SIZE) &&
- (important == NO) ) {
- if (stats != NULL)
- stats->change(stat_bytesDropped,
- size);
- MUTEX_UNLOCK(httpSession->lock);
- return NO;
+ MUTEX_LOCK(httplock);
+ putData->next = putHead;
+ putHead = putData;
+ mret = curl_multi_add_handle(curl_multi, curl_put);
+ if (mret != CURLM_OK) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+ _("%s failed at %s:%d: `%s'\n"),
+ "curl_multi_add_handle",
+ __FILE__,
+ __LINE__,
+ curl_multi_strerror(mret));
+ putHead = putData->next;
+ curl_easy_cleanup(curl_put);
+ FREE(putData->msg);
+ FREE(putData);
+ MUTEX_UNLOCK(httplock);
+ return SYSERR;
}
- if (httpSession->wsize >= httpSession->wpos + size) {
- if (httpSession->woff + size <= httpSession->wsize) {
- memcpy(&httpSession->wbuff[httpSession->woff],
- msg,
- size);
- httpSession->woff += size;
- httpSession->wpos += size;
- } else {
- memmove(httpSession->wbuff,
- &httpSession->wbuff[httpSession->woff - httpSession->wpos],
- httpSession->wpos);
- memcpy(&httpSession->wbuff[httpSession->wpos],
- msg,
- size);
- httpSession->woff = httpSession->wpos + size;
- httpSession->wpos += size;
- }
- } else {
- if ( (httpSession->wpos + size > HTTP_BUF_SIZE) &&
- (important == NO) ) {
- if (stats != NULL)
- stats->change(stat_bytesDropped,
- size);
- MUTEX_UNLOCK(httpSession->lock);
- return NO;
- }
- GROW(httpSession->wbuff,
- httpSession->wsize,
- httpSession->wpos + size);
- memmove(httpSession->wbuff,
- &httpSession->wbuff[httpSession->woff - httpSession->wpos],
- httpSession->wpos);
- memcpy(&httpSession->wbuff[httpSession->wpos],
- msg,
- size);
- httpSession->woff = httpSession->wpos + size;
- httpSession->wpos += size;
- }
- MUTEX_UNLOCK(httpSession->lock);
+ MUTEX_UNLOCK(httplock);
return OK;
}
@@ -1040,11 +1027,9 @@
curl_multi_strerror(mret));
break;
}
- /* use timeout of 1s in case that SELECT is not interrupted by
- signal (just to increase portability a bit) -- better a 1s
- delay in the reaction than hanging... */
- tv.tv_sec = 1;
- tv.tv_usec = 0;
+ /* CURL requires a regular timeout... */
+ tv.tv_sec = 0;
+ tv.tv_usec = 1000;
SELECT(max + 1,
&rs,
&ws,
@@ -1249,6 +1234,10 @@
NULL);
coreAPI->releaseService(stats);
stats = NULL;
+ if (upnp != NULL) {
+ coreAPI->releaseService(upnp);
+ stats = NULL;
+ }
FREENONNULL(filteredNetworks_);
MUTEX_DESTROY(httplock);
curl_global_cleanup();
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r5088 - GNUnet/src/transports,
gnunet <=