[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3143 - in GNUnet/src: include transports
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3143 - in GNUnet/src: include transports |
Date: |
Fri, 28 Jul 2006 02:30:06 -0700 (PDT) |
Author: grothoff
Date: 2006-07-28 02:30:02 -0700 (Fri, 28 Jul 2006)
New Revision: 3143
Modified:
GNUnet/src/include/gnunet_transport.h
GNUnet/src/include/gnunet_util_config.h
GNUnet/src/transports/nat.c
GNUnet/src/transports/tcp.c
GNUnet/src/transports/udp.c
Log:
updated transports
Modified: GNUnet/src/include/gnunet_transport.h
===================================================================
--- GNUnet/src/include/gnunet_transport.h 2006-07-28 08:29:51 UTC (rev
3142)
+++ GNUnet/src/include/gnunet_transport.h 2006-07-28 09:30:02 UTC (rev
3143)
@@ -263,6 +263,8 @@
* or the hello_message from connect)
* @param msg the message
* @param size the size of the message, <= mtu
+ * @param important YES if message is important (i.e. grow
+ * buffers to queue if needed)
* @return SYSERR on error, NO on temporary error (retry),
* YES/OK on success; after any persistent error,
* the caller must call "disconnect" and not continue
@@ -271,27 +273,10 @@
*/
int (*send)(TSession * tsession,
const void * msg,
- const unsigned int size);
+ const unsigned int size,
+ int important);
/**
- * Send a message to the specified remote node with
- * increased reliablility (whatever that means is
- * up to the transport).
- *
- * @param tsession an opaque session handle (e.g. a socket
- * or the hello_message from connect)
- * @param msg the message
- * @param size the size of the message, <= mtu
- * @return SYSERR on error, OK on success; after any error,
- * the caller must call "disconnect" and not continue
- * using the session afterwards (useful if the other
- * side closed the connection).
- */
- int (*sendReliable)(TSession * tsession,
- const void * msg,
- const unsigned int size);
-
- /**
* A (core) Session is to be associated with a transport session. The
* transport service may want to know in order to call back on the
* core if the connection is being closed. Associate can also be
@@ -340,12 +325,6 @@
int (*stopTransportServer)(void);
/**
- * Reload the configuration. Should never fail (keep old
- * configuration on error, syslog errors!)
- */
- void (*reloadConfiguration)(void);
-
- /**
* Convert transport address to human readable string.
*/
char * (*addressToString)(const P2P_hello_MESSAGE * helo);
Modified: GNUnet/src/include/gnunet_util_config.h
===================================================================
--- GNUnet/src/include/gnunet_util_config.h 2006-07-28 08:29:51 UTC (rev
3142)
+++ GNUnet/src/include/gnunet_util_config.h 2006-07-28 09:30:02 UTC (rev
3143)
@@ -182,7 +182,11 @@
/**
* Attach a callback that is notified whenever a
- * configuration option changes.
+ * configuration option changes.<p>
+ *
+ * TODO: also call callback on existing configuration and confirm
+ * existing configuration is OK! If not, return error!
+ *
* @return 0 on success, -1 on error
*/
int GC_attach_change_listener(struct GC_Configuration * cfg,
Modified: GNUnet/src/transports/nat.c
===================================================================
--- GNUnet/src/transports/nat.c 2006-07-28 08:29:51 UTC (rev 3142)
+++ GNUnet/src/transports/nat.c 2006-07-28 09:30:02 UTC (rev 3143)
@@ -148,7 +148,8 @@
*/
static int natSend(TSession * tsession,
const void * message,
- const unsigned int size) {
+ const unsigned int size,
+ int important) {
return SYSERR;
}
@@ -180,12 +181,6 @@
}
/**
- * Reload the configuration. Should never fail.
- */
-static void reloadConfiguration(void) {
-}
-
-/**
* Convert NAT address to a string.
*/
static char * addressToString(const P2P_hello_MESSAGE * helo) {
@@ -206,12 +201,10 @@
natAPI.createhello = &createhello;
natAPI.connect = &natConnect;
natAPI.send = &natSend;
- natAPI.sendReliable = &natSend; /* can't increase reliability */
natAPI.associate = &natAssociate;
natAPI.disconnect = &natDisconnect;
natAPI.startTransportServer = &startTransportServer;
natAPI.stopTransportServer = &stopTransportServer;
- natAPI.reloadConfiguration = &reloadConfiguration;
natAPI.addressToString = &addressToString;
return &natAPI;
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-07-28 08:29:51 UTC (rev 3142)
+++ GNUnet/src/transports/tcp.c 2006-07-28 09:30:02 UTC (rev 3143)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2002, 2003, 2004, 2005 Christian Grothoff (and other contributing
authors)
+ (C) 2002, 2003, 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -63,35 +63,11 @@
} HostAddress;
/**
- * TCP Message-Packet header.
- */
-typedef struct {
- /**
- * size of the message, in bytes, excluding this header;
- * max 65535; we do NOT want to make this field an int
- * because then a malicious peer could cause us to allocate
- * lots of memory -- this bounds it by 64k/peer.
- * Field is in network byte order.
- */
- unsigned short size;
-
- /**
- * For alignment, always 0.
- */
- unsigned short reserved;
-
- /**
- * This struct is followed by MESSAGE_PARTs - until size is reached
- * There is no "end of message".
- */
-} TCPP2P_PACKET;
-
-/**
* Initial handshake message. Note that the beginning
* must match the CS_MESSAGE_HEADER since we are using tcpio.
*/
typedef struct {
- TCPP2P_PACKET header;
+ MESSAGE_HEADER header;
/**
* Identity of the node connecting (TCP client)
@@ -104,21 +80,16 @@
*/
typedef struct {
/**
- * the tcp socket
+ * the tcp socket (used to identify this connection with selector)
*/
struct SocketHandle * sock;
/**
- * number of users of this session
+ * number of users of this session (reference count)
*/
int users;
/**
- * Last time this connection was used
- */
- cron_t lastUse;
-
- /**
* mutex for synchronized access to 'users'
*/
struct MUTEX * lock;
@@ -162,6 +133,8 @@
static struct GC_Configuration * cfg;
+static struct MUTEX * tcplock;
+
/* ******************** helper functions *********************** */
/**
@@ -178,62 +151,6 @@
}
/**
- * Disconnect from a remote node. May only be called
- * on sessions that were aquired by the caller first.
- * For the core, aquiration means to call associate or
- * connect. The number of disconnects must match the
- * number of calls to connect+associate.
- *
- * @param tsession the session that is closed
- * @return OK on success, SYSERR if the operation failed
- */
-static int tcpDisconnect(TSession * tsession) {
- if (tsession->internal != NULL) {
- TCPSession * tcpsession = tsession->internal;
-
- MUTEX_LOCK(tcpsession->lock);
- tcpsession->users--;
- if (tcpsession->users > 0) {
- MUTEX_UNLOCK(tcpsession->lock);
- return OK;
- }
- MUTEX_UNLOCK(tcpsession->lock);
- MUTEX_DESTROY(tcpsession->lock);
- FREE(tcpsession->rbuff);
- FREENONNULL(tcpsession->wbuff);
- tcpsession->wbuff = NULL;
- FREE(tcpsession);
- }
- FREE(tsession);
- return OK;
-}
-
-/**
- * Remove a session, either the other side closed the connection
- * or we have otherwise reason to believe that it should better
- * be killed. Destroy session closes the session as far as the
- * TCP layer is concerned, but since the core may still have
- * references to it, tcpDisconnect may not instantly free all
- * the associated resources. <p>
- *
- * destroySession may only be called if the tcplock is already
- * held.
- *
- * @param i index to the session handle
- */
-static void destroySession(int i) {
- TCPSession * tcpSession;
-
- tcpSession = tsessions[i]->internal;
- if (tcpSession->sock != NULL)
- socket_destroy(tcpSession->sock);
- tcpSession->sock = NULL;
- tcpDisconnect(tsessions[i]);
- tsessions[i] = tsessions[--tsessionCount];
- tsessions[tsessionCount] = NULL;
-}
-
-/**
* Get the GNUnet UDP port from the configuration,
* or from /etc/services if it is not specified in
* the config file.
@@ -258,6 +175,35 @@
}
/**
+ * Disconnect from a remote node. May only be called
+ * on sessions that were aquired by the caller first.
+ * For the core, aquiration means to call associate or
+ * connect. The number of disconnects must match the
+ * number of calls to connect+associate.
+ *
+ * @param tsession the session that is closed
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int tcpDisconnect(TSession * tsession) {
+ TCPSession * tcpsession = tsession->internal;
+
+ GE_ASSERT(ectx, tcpsession != NULL);
+ MUTEX_LOCK(tcpsession->lock);
+ tcpsession->users--;
+ if (tcpsession->users > 0) {
+ MUTEX_UNLOCK(tcpsession->lock);
+ return OK;
+ }
+ select_disconnect(selector,
+ tcpsession->sock);
+ MUTEX_UNLOCK(tcpsession->lock);
+ MUTEX_DESTROY(tcpsession->lock);
+ FREE(tcpsession);
+ FREE(tsession);
+ return OK;
+}
+
+/**
* A (core) Session is to be associated with a transport session. The
* transport service may want to know in order to call back on the
* core if the connection is being closed. Associate can also be
@@ -279,11 +225,8 @@
static int tcpAssociate(TSession * tsession) {
TCPSession * tcpSession;
- if (tsession == NULL) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- tcpSession = (TCPSession*) tsession->internal;
+ GE_ASSERT(ectx, tsession != NULL);
+ tcpSession = tsession->internal;
MUTEX_LOCK(tcpSession->lock);
tcpSession->users++;
MUTEX_UNLOCK(tcpSession->lock);
@@ -296,438 +239,146 @@
* This function may only be called if the tcplock is
* already held by the caller.
*/
-static int readAndProcess(int i) {
- TSession * tsession;
+static int select_message_handler(void * mh_cls,
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx,
+ const MESSAGE_HEADER * msg) {
+ TSession * tsession = sock_ctx;
TCPSession * tcpSession;
unsigned int len;
- int ret;
- TCPP2P_PACKET * pack;
P2P_PACKET * mp;
- size_t recvd;
+ const TCPWelcome * welcome;
- tsession = tsessions[i];
if (SYSERR == tcpAssociate(tsession))
return SYSERR;
+ len = ntohs(msg->size);
tcpSession = tsession->internal;
- if (tcpSession->rsize == tcpSession->pos) {
- /* read buffer too small, grow */
- GROW(tcpSession->rbuff,
- tcpSession->rsize,
- tcpSession->rsize * 2);
- }
- ret = socket_recv(tcpSession->sock,
- NC_Blocking | NC_IgnoreInt,
- &tcpSession->rbuff[tcpSession->pos],
- tcpSession->rsize - tcpSession->pos,
- &recvd);
- tcpSession->lastUse = get_time();
- if (ret != OK) {
- tcpDisconnect(tsession);
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "READ on socket %d returned 0 bytes, closing connection\n",
- tcpSession->sock);
-#endif
- return SYSERR; /* other side closed connection */
- }
- tcpSession->pos += recvd;
-
- while (tcpSession->pos > 2) {
- len = ntohs(((TCPP2P_PACKET*)&tcpSession->rbuff[0])->size)
- + sizeof(TCPP2P_PACKET);
- if (len > tcpSession->rsize) /* if message larger than read buffer, grow!
*/
- GROW(tcpSession->rbuff,
- tcpSession->rsize,
- len);
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "Read %d bytes on socket %d, expecting %d for full message\n",
- tcpSession->pos,
- tcpSession->sock,
- len);
-#endif
- if (tcpSession->pos < len) {
+ if (YES == tcpSession->expectingWelcome) {
+ welcome = (const TCPWelcome*) msg;
+ if ( (ntohs(welcome->header.type) != 0) ||
+ (len != sizeof(TCPWelcome)) ) {
tcpDisconnect(tsession);
- return OK;
+ return SYSERR;
}
-
- /* complete message received, let's check what it is */
- if (YES == tcpSession->expectingWelcome) {
- TCPWelcome * welcome;
-#if DEBUG_TCP
- EncName enc;
-#endif
-
- welcome = (TCPWelcome*) &tcpSession->rbuff[0];
- if ( (ntohs(welcome->header.reserved) != 0) ||
- (ntohs(welcome->header.size)
- != sizeof(TCPWelcome) - sizeof(TCPP2P_PACKET)) ) {
- GE_LOG(ectx,
- GE_WARNING | GE_USER | GE_BULK,
- _("Expected welcome message on tcp connection, "
- "got garbage (%u, %u). Closing.\n"),
- ntohs(welcome->header.reserved),
- ntohs(welcome->header.size));
- tcpDisconnect(tsession);
- return SYSERR;
- }
- tcpSession->expectingWelcome = NO;
- tcpSession->sender = welcome->clientIdentity;
-#if DEBUG_TCP
- IF_GELOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- hash2enc(&tcpSession->sender.hashPubKey,
- &enc));
- GE_LOG(etcx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcp welcome message from `%s' received\n",
- &enc);
-#endif
- memmove(&tcpSession->rbuff[0],
- &tcpSession->rbuff[sizeof(TCPWelcome)],
- tcpSession->pos - sizeof(TCPWelcome));
- tcpSession->pos -= sizeof(TCPWelcome);
- len = ntohs(((TCPP2P_PACKET*)&tcpSession->rbuff[0])->size)
- + sizeof(TCPP2P_PACKET);
- }
- if ( (tcpSession->pos < 2) ||
- (tcpSession->pos < len) ) {
- tcpDisconnect(tsession);
- return OK;
- }
-
- pack = (TCPP2P_PACKET*)&tcpSession->rbuff[0];
+ tcpSession->expectingWelcome = NO;
+ tcpSession->sender = welcome->clientIdentity;
+ } else {
/* send msg to core! */
- if (len <= sizeof(TCPP2P_PACKET)) {
+ if (len <= sizeof(MESSAGE_HEADER)) {
GE_LOG(ectx,
GE_WARNING | GE_USER | GE_BULK,
- _("Received malformed message (size %u)"
- " from tcp-peer connection. Closing.\n"),
- len);
+ _("Received malformed message from tcp-peer connection.
Closing.\n"));
tcpDisconnect(tsession);
return SYSERR;
}
mp = MALLOC(sizeof(P2P_PACKET));
- mp->msg = MALLOC(len - sizeof(TCPP2P_PACKET));
+ mp->msg = MALLOC(len - sizeof(MESSAGE_HEADER));
memcpy(mp->msg,
- &pack[1],
- len - sizeof(TCPP2P_PACKET));
+ &msg[1],
+ len - sizeof(MESSAGE_HEADER));
mp->sender = tcpSession->sender;
- mp->size = len - sizeof(TCPP2P_PACKET);
+ mp->size = len - sizeof(MESSAGE_HEADER);
mp->tsession = tsession;
-#if DEBUG_TCP
- {
- EncName enc;
-
- IF_GELOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- hash2enc(&mp->sender.hashPubKey,
- &enc);
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcp transport received %u bytes from %s (CRC %u), forwarding to
core\n",
- mp->size,
- &enc,
- crc32N(tcpSession->rbuff,
- tcpSession->pos));
- }
-#endif
coreAPI->receive(mp);
- /* finally, shrink buffer adequately */
- memmove(&tcpSession->rbuff[0],
- &tcpSession->rbuff[len],
- tcpSession->pos - len);
- tcpSession->pos -= len;
- if ( (tcpSession->pos + 1024 < tcpSession->rsize) &&
- (tcpSession->rsize > 4 * 1024) ) {
- /* read buffer far too large, shrink! */
- GROW(tcpSession->rbuff,
- tcpSession->rsize,
- tcpSession->pos + 1024);
- }
}
tcpDisconnect(tsession);
return OK;
}
-/**
- * Add a new session to the array watched by the select thread. Grows
- * the array if needed. If the caller wants to do anything useful
- * with the return value, it must have the lock on tcplock before
- * calling. It is ok to call this function without holding tcplock if
- * the return value is ignored.
- */
-static unsigned int addTSession(TSession * tsession) {
- unsigned int i;
- MUTEX_LOCK(tcplock);
- if (tsessionCount == tsessionArrayLength)
- GROW(tsessions,
- tsessionArrayLength,
- tsessionArrayLength * 2);
- i = tsessionCount;
- tsessions[tsessionCount++] = tsession;
- MUTEX_UNLOCK(tcplock);
- return i;
-}
-
/**
* Create a new session for an inbound connection on the given
* socket. Adds the session to the array of sessions watched
* by the select thread.
*/
-static void createNewSession(int sock) {
+static void * select_accept_handler(void * ah_cls,
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ const void * addr,
+ unsigned int addr_len) {
TSession * tsession;
TCPSession * tcpSession;
+ IPaddr ip;
+ if (addr_len != sizeof(IPaddr))
+ return NULL;
+ memcpy(&ip,
+ addr,
+ addr_len);
+ if (isBlacklisted(ip))
+ return NULL;
tcpSession = MALLOC(sizeof(TCPSession));
- tcpSession->pos = 0;
- tcpSession->rsize = 2 * 1024 + sizeof(TCPP2P_PACKET);
- tcpSession->rbuff = MALLOC(tcpSession->rsize);
- tcpSession->wpos = 0;
- tcpSession->wbuff = NULL;
- tcpSession->wsize = 0;
- tcpSession->sock = socket_create(ectx,
- load_monitor,
- sock);
+ tcpSession->sock = sock;
/* fill in placeholder identity to mark that we
are waiting for the welcome message */
tcpSession->sender = *(coreAPI->myIdentity);
tcpSession->expectingWelcome = YES;
tcpSession->lock = MUTEX_CREATE(YES);
tcpSession->users = 1; /* us only, core has not seen this tsession! */
- tcpSession->lastUse = get_time();
tsession = MALLOC(sizeof(TSession));
tsession->ttype = TCP_PROTOCOL_NUMBER;
tsession->internal = tcpSession;
- addTSession(tsession);
+
+ return tsession;
}
-/**
- * Send a message (already encapsulated if needed) via the
- * tcp socket (or enqueue if sending now would block).
- *
- * @param tcpSession the session to use for sending
- * @param mp the message to send
- * @param ssize the size of the message
- * @return OK if message send or queued, NO if queue is full and
- * message was dropped, SYSERR on error
- */
-static int tcpDirectSend(TCPSession * tcpSession,
- void * mp,
- unsigned int ssize) {
- size_t ret;
- int success;
-
-#if DEBUG_TCP
- {
- EncName enc;
-
- IF_GELOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- hash2enc(&tcpSession->sender.hashPubKey,
- &enc));
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpDirectSend called to transmit %u bytes to %s (CRC %u).\n",
- ssize,
- &enc,
- crc32N(mp, ssize));
- }
-#endif
- if (tcp_shutdown == YES) {
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpDirectSend called while TCP transport is shutdown.\n");
-#endif
- return SYSERR;
- }
- if (tcpSession->sock == NULL) {
-#if DEBUG_TCP
- LOG(LOG_INFO,
- "tcpDirectSend called, but socket is closed\n");
-#endif
- return SYSERR;
- }
- if (ssize == 0) {
- GE_BREAK(ectx, 0); /* size 0 not allowed */
- return SYSERR;
- }
- MUTEX_LOCK(tcplock);
- if (tcpSession->wpos > 0) {
- /* select already pending... */
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "write already pending, will not take additional message.\n");
-#endif
- if (stats != NULL)
- stats->change(stat_bytesDropped,
- ssize);
- MUTEX_UNLOCK(tcplock);
- return NO;
- }
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "TCP: trying to send %u bytes\n",
- ssize);
-#endif
- success = socket_send(tcpSession->sock,
- NC_Nonblocking,
- mp,
- ssize,
- &ret);
- if (success == SYSERR) {
-#if DEBUG_TCP
- LOG_STRERROR(LOG_INFO, "send");
-#endif
- MUTEX_UNLOCK(tcplock);
- return SYSERR;
- }
- if (success == NO)
- ret = 0;
- if (stats != NULL)
- stats->change(stat_bytesSent,
- ret);
-
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "TCP: transmitted %u bytes\n",
- ret);
-#endif
-
- if (ret < ssize) {/* partial send */
- if (tcpSession->wsize < ssize - ret) {
- GROW(tcpSession->wbuff,
- tcpSession->wsize,
- ssize - ret);
- }
- memcpy(tcpSession->wbuff,
- mp + ret,
- ssize - ret);
- tcpSession->wpos = ssize - ret;
- signalSelect(); /* select set changed! */
- }
- tcpSession->lastUse = get_time();
- MUTEX_UNLOCK(tcplock);
- return OK;
+static void select_close_handler(void * ch_cls,
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx) {
+ TSession * tsession = sock_ctx;
+ tcpDisconnect(tsession);
}
/**
- * Send a message (already encapsulated if needed) via the
- * tcp socket. Block if required.
+ * Send a message to the specified remote node.
*
- * @param tcpSession the session to use for sending
- * @param mp the message to send
- * @param ssize the size of the message
- * @return OK if message send or queued, NO if queue is full and
- * message was dropped, SYSERR on error
- */
-static int tcpDirectSendReliable(TCPSession * tcpSession,
- void * mp,
- unsigned int ssize) {
- int ok;
-
-#if DEBUG_TCP
- {
- EncName enc;
-
- IF_GELOC(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- hash2enc(&tcpSession->sender.hashPubKey,
- &enc));
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpDirectSendReliable called to transmit %u bytes to %s (CRC
%u).\n",
- ssize,
- &enc,
- crc32N(mp, ssize));
- }
-#endif
- if (tcp_shutdown == YES) {
-#if DEBUG_TCP
- LOG(LOG_INFO,
- "tcpDirectSendReliable called, but TCP service is shutdown\n");
-#endif
- return SYSERR;
- }
- if (tcpSession->sock == NULL) {
-#if DEBUG_TCP
- LOG(LOG_INFO,
- "tcpDirectSendReliable called, but socket is closed\n");
-#endif
- return SYSERR;
- }
- if (ssize == 0) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- MUTEX_LOCK(tcplock);
- if (tcpSession->wpos > 0) {
- unsigned int old = tcpSession->wpos;
- GROW(tcpSession->wbuff,
- tcpSession->wsize,
- tcpSession->wpos + ssize);
- tcpSession->wpos += ssize;
- memcpy(&tcpSession->wbuff[old],
- mp,
- ssize);
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpDirectSendReliable appended message to send buffer.\n");
-#endif
-
- ok = OK;
- } else {
- ok = tcpDirectSend(tcpSession,
- mp,
- ssize);
- }
- MUTEX_UNLOCK(tcplock);
- return ok;
-}
-
-/**
- * Send a message to the specified remote node with
- * increased reliability (i.e. grow TCP send buffer
- * above one frame if needed).
- *
- * @param tsession the P2P_hello_MESSAGE identifying the remote node
+ * @param tsession the handle identifying the remote node
* @param msg the message
* @param size the size of the message
- * @return SYSERR on error, OK on success, NO on temporary error
+ * @return SYSERR on error, OK on success
*/
-static int tcpSendReliable(TSession * tsession,
- const void * msg,
- const unsigned int size) {
- TCPP2P_PACKET * mp;
+static int tcpSend(TSession * tsession,
+ const void * msg,
+ const unsigned int size,
+ int important) {
+ TCPSession * tcpSession;
+ MESSAGE_HEADER * mp;
int ok;
- if (size >= MAX_BUFFER_SIZE)
+ tcpSession = tsession->internal;
+ if (size >= MAX_BUFFER_SIZE - sizeof(MESSAGE_HEADER)) {
+ GE_BREAK(ectx, 0);
+ return SYSERR; /* too big */
+ }
+ if (selector == NULL) {
+ if (stats != NULL)
+ stats->change(stat_bytesDropped,
+ size);
return SYSERR;
- if (tcp_shutdown == YES)
- return SYSERR;
+ }
if (size == 0) {
GE_BREAK(ectx, 0);
return SYSERR;
}
- if (((TCPSession*)tsession->internal)->sock == NULL)
+ if (tcpSession->sock == NULL) {
+ if (stats != NULL)
+ stats->change(stat_bytesDropped,
+ size);
return SYSERR; /* other side closed connection */
- mp = MALLOC(sizeof(TCPP2P_PACKET) + size);
+ }
+ mp = MALLOC(sizeof(MESSAGE_HEADER) + size);
+ mp->size = htons(size + sizeof(MESSAGE_HEADER));
+ mp->type = 0;
memcpy(&mp[1],
msg,
size);
- mp->size = htons(size);
- mp->reserved = 0;
- ok = tcpDirectSendReliable(tsession->internal,
- mp,
- size + sizeof(TCPP2P_PACKET));
+ ok = select_write(selector,
+ tcpSession->sock,
+ mp,
+ NO,
+ important);
FREE(mp);
return ok;
}
@@ -811,7 +462,6 @@
*/
static int tcpConnect(const P2P_hello_MESSAGE * helo,
TSession ** tsessionPtr) {
- int i;
HostAddress * haddr;
TCPWelcome welcome;
int sock;
@@ -819,8 +469,9 @@
TCPSession * tcpSession;
struct sockaddr_in soaddr;
struct SocketHandle * s;
+ int i;
- if (tcp_shutdown == YES)
+ if (selector == NULL)
return SYSERR;
haddr = (HostAddress*) &helo[1];
#if DEBUG_TCP
@@ -840,7 +491,7 @@
return SYSERR;
}
s = socket_create(ectx,
- load_monitor,
+ coreAPI->load_monitor,
sock);
if (-1 == socket_set_blocking(s, NO)) {
socket_destroy(s);
@@ -872,118 +523,40 @@
}
tcpSession = MALLOC(sizeof(TCPSession));
tcpSession->sock = s;
- tcpSession->wpos = 0;
- tcpSession->wbuff = NULL;
- tcpSession->wsize = 0;
- tcpSession->rsize = 2 * 1024 + sizeof(TCPP2P_PACKET);
- tcpSession->rbuff = MALLOC(tcpSession->rsize);
tsession = MALLOC(sizeof(TSession));
tsession->internal = tcpSession;
tsession->ttype = tcpAPI.protocolNumber;
tcpSession->lock = MUTEX_CREATE(YES);
tcpSession->users = 2; /* caller + us */
- tcpSession->pos = 0;
- tcpSession->lastUse = get_time();
tcpSession->sender = helo->senderIdentity;
tcpSession->expectingWelcome = NO;
MUTEX_LOCK(tcplock);
- i = addTSession(tsession);
+ select_connect(selector,
+ tcpSession->sock,
+ tsession);
/* send our node identity to the other side to fully establish the
connection! */
welcome.header.size
- = htons(sizeof(TCPWelcome) - sizeof(TCPP2P_PACKET));
- welcome.header.reserved
+ = htons(sizeof(TCPWelcome));
+ welcome.header.type
= htons(0);
welcome.clientIdentity
= *(coreAPI->myIdentity);
- if (SYSERR == tcpDirectSend(tcpSession,
- &welcome,
- sizeof(TCPWelcome))) {
- destroySession(i);
+ if (SYSERR == tcpSend(tsession,
+ &welcome.header,
+ sizeof(TCPWelcome),
+ YES)) {
tcpDisconnect(tsession);
MUTEX_UNLOCK(tcplock);
return SYSERR;
}
MUTEX_UNLOCK(tcplock);
- signalSelect();
-
*tsessionPtr = tsession;
return OK;
}
/**
- * Send a message to the specified remote node.
- *
- * @param tsession the P2P_hello_MESSAGE identifying the remote node
- * @param msg the message
- * @param size the size of the message
- * @return SYSERR on error, OK on success
- */
-static int tcpSend(TSession * tsession,
- const void * msg,
- const unsigned int size) {
- TCPP2P_PACKET * mp;
- int ok;
-
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpSend called to transmit %u bytes.\n",
- size);
-#endif
- if (size >= MAX_BUFFER_SIZE) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
-
- if (tcp_shutdown == YES) {
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpSend called while TCP is shutdown.\n");
-#endif
- if (stats != NULL)
- stats->change(stat_bytesDropped,
- size);
- return SYSERR;
- }
- if (size == 0) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- if (((TCPSession*)tsession->internal)->sock == NULL) {
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "tcpSend called after other side closed connection.\n");
-#endif
- if (stats != NULL)
- stats->change(stat_bytesDropped,
- size);
- return SYSERR; /* other side closed connection */
- }
- mp = MALLOC(sizeof(TCPP2P_PACKET) + size);
- memcpy(&mp[1],
- msg,
- size);
- mp->size = htons(size);
- mp->reserved = 0;
- /* if we would have less than TARGET_BUFFER_SIZE in buffers,
- do reliable send */
- if (((TCPSession*)tsession->internal)->wpos + size < TARGET_BUFFER_SIZE)
- ok = tcpDirectSendReliable(tsession->internal,
- mp,
- size + sizeof(TCPP2P_PACKET));
- else
- ok = tcpDirectSend(tsession->internal,
- mp,
- size + sizeof(TCPP2P_PACKET));
- FREE(mp);
- return ok;
-}
-
-/**
* Start the server process to receive inbound traffic.
* @return OK on success, SYSERR if the operation failed
*/
@@ -993,94 +566,66 @@
unsigned short port;
int s;
- if (serverSignal != NULL) {
+ if (selector != NULL) {
GE_BREAK(ectx, 0);
return SYSERR;
}
- serverSignal = SEMAPHORE_CREATE(0);
- tcp_shutdown = NO;
-
- if (0 != PIPE(tcp_pipe)) {
+ port = getGNUnetTCPPort();
+ if (port == 0) {
+ /* read-only TCP */
+ return OK;
+ }
+ s = SOCKET(PF_INET,
+ SOCK_STREAM,
+ 0);
+ if (s < 0) {
GE_LOG_STRERROR(ectx,
GE_ERROR | GE_ADMIN | GE_BULK,
- "pipe");
+ "socket");
return SYSERR;
}
- setBlocking(tcp_pipe[1], NO);
-
- port = getGNUnetTCPPort();
- if (port != 0) { /* if port == 0, this is a read-only
- business! */
- s = SOCKET(PF_INET,
- SOCK_STREAM,
- 0);
- if (s < 0) {
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_ADMIN | GE_BULK,
- "socket");
- if (0 != CLOSE(tcp_pipe[0]))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
- "close");
- if (0 != CLOSE(tcp_pipe[1]))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
- "close");
- SEMAPHORE_DESTROY(serverSignal);
- serverSignal = NULL;
- tcp_shutdown = YES;
- return SYSERR;
- }
- if (SETSOCKOPT(s,
- SOL_SOCKET,
- SO_REUSEADDR,
- &on,
- sizeof(on)) < 0 )
- GE_DIE_STRERROR(ectx,
- GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
- "setsockopt");
- memset((char *) &serverAddr,
- 0,
- sizeof(serverAddr));
- serverAddr.sin_family = AF_INET;
- serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serverAddr.sin_port = htons(getGNUnetTCPPort());
-#if DEBUG_TCP
+ if (SETSOCKOPT(s,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ &on,
+ sizeof(on)) < 0 )
+ GE_DIE_STRERROR(ectx,
+ GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+ "setsockopt");
+ memset((char *) &serverAddr,
+ 0,
+ sizeof(serverAddr));
+ serverAddr.sin_family = AF_INET;
+ serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ serverAddr.sin_port = htons(getGNUnetTCPPort());
+ if (BIND(s,
+ (struct sockaddr *) &serverAddr,
+ sizeof(serverAddr)) < 0) {
+ GE_LOG_STRERROR(ectx,
+ GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
+ "bind");
GE_LOG(ectx,
- GE_INFO | GE_USER | GE_BULK,
- "starting %s peer server on port %d\n",
- "tcp",
- ntohs(serverAddr.sin_port));
-#endif
- if (BIND(s,
- (struct sockaddr *) &serverAddr,
- sizeof(serverAddr)) < 0) {
+ GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
+ _("Failed to start transport service on port %d.\n"),
+ getGNUnetTCPPort());
+ if (0 != CLOSE(s))
GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
- "bind");
- GE_LOG(ectx,
- GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
- _("Failed to start transport service on port %d.\n"),
- getGNUnetTCPPort());
- if (0 != CLOSE(s))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
- "close");
- SEMAPHORE_DESTROY(serverSignal);
- serverSignal = NULL;
- return SYSERR;
- }
- if (0 != LISTEN(s, 5))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_IMMEDIATE,
- "listen");
- tcp_sock = socket_create(ectx,
- load_monitor,
- s);
- } else
- tcp_sock = NULL;
-
- /* FIXME: call network/select code! */
+ GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+ "close");
+ return SYSERR;
+ }
+ selector = select_create(ectx,
+ coreAPI->load_monitor,
+ s,
+ sizeof(IPaddr),
+ 0, /* timeout */
+ &select_message_handler,
+ NULL,
+ &select_accept_handler,
+ NULL,
+ &select_close_handler,
+ NULL,
+ 0 /* memory quota */ );
return OK;
}
@@ -1089,34 +634,10 @@
* traffic). Maybe restarted later!
*/
static int stopTransportServer() {
- void * unused;
- int haveThread;
-
- if (tcp_shutdown == YES)
- return OK;
- tcp_shutdown = YES;
- signalSelect();
- if (serverSignal != NULL) {
- haveThread = YES;
- SEMAPHORE_DOWN(serverSignal, YES);
- SEMAPHORE_DESTROY(serverSignal);
- } else
- haveThread = NO;
- serverSignal = NULL;
- if (0 != CLOSE(tcp_pipe[1]))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
- "close");
- if (0 != CLOSE(tcp_pipe[0]))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
- "close");
- if (tcp_sock != NULL) {
- socket_destroy(tcp_sock);
- tcp_sock = NULL;
+ if (selector != NULL) {
+ select_destroy(selector);
+ selector = NULL;
}
- if (haveThread == YES)
- PTHREAD_JOIN(listenThread, &unused);
return OK;
}
@@ -1124,9 +645,16 @@
* Reload the configuration. Should never fail (keep old
* configuration on error, syslog errors!)
*/
-static void reloadConfiguration() {
+static int reloadConfiguration(void * ctx,
+ struct GC_Configuration * cfg,
+ struct GE_Context * ectx,
+ const char * section,
+ const char * option) {
char * ch;
+ if (0 != strcmp(section, "TCP"))
+ return OK; /* fast path */
+
MUTEX_LOCK(tcplock);
FREENONNULL(filteredNetworks_);
if (0 != GC_get_configuration_value_string(cfg,
@@ -1142,6 +670,8 @@
FREE(ch);
}
MUTEX_UNLOCK(tcplock);
+ /* TODO: error handling! */
+ return OK;
}
/**
@@ -1173,17 +703,17 @@
TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
ectx = core->ectx;
cfg = core->cfg;
- load_monitor = core->load_monitor;
GE_ASSERT(ectx, sizeof(HostAddress) == 8);
- GE_ASSERT(ectx, sizeof(TCPP2P_PACKET) == 4);
+ GE_ASSERT(ectx, sizeof(MESSAGE_HEADER) == 4);
GE_ASSERT(ectx, sizeof(TCPWelcome) == 68);
tcplock = MUTEX_CREATE(YES);
- reloadConfiguration();
- tsessionCount = 0;
- tsessionArrayLength = 0;
- GROW(tsessions,
- tsessionArrayLength,
- 32);
+ if (0 != GC_attach_change_listener(cfg,
+ &reloadConfiguration,
+ NULL)) {
+ MUTEX_DESTROY(tcplock);
+ tcplock = NULL;
+ return NULL;
+ }
coreAPI = core;
stats = coreAPI->requestService("stats");
if (stats != NULL) {
@@ -1198,30 +728,24 @@
tcpAPI.mtu = 0;
tcpAPI.cost = 20000; /* about equal to udp */
tcpAPI.verifyHelo = &verifyHelo;
- tcpAPI.createhello = &createhello;
+ tcpAPI.createhello = &createhello;
tcpAPI.connect = &tcpConnect;
tcpAPI.associate = &tcpAssociate;
tcpAPI.send = &tcpSend;
- tcpAPI.sendReliable = &tcpSendReliable;
tcpAPI.disconnect = &tcpDisconnect;
tcpAPI.startTransportServer = &startTransportServer;
tcpAPI.stopTransportServer = &stopTransportServer;
- tcpAPI.reloadConfiguration = &reloadConfiguration;
tcpAPI.addressToString = &addressToString;
return &tcpAPI;
}
void donetransport_tcp() {
- int i;
-
+ GC_detach_change_listener(cfg,
+ &reloadConfiguration,
+ NULL);
coreAPI->releaseService(stats);
stats = NULL;
- for (i=tsessionCount-1;i>=0;i--)
- destroySession(i);
- GROW(tsessions,
- tsessionArrayLength,
- 0);
FREENONNULL(filteredNetworks_);
MUTEX_DESTROY(tcplock);
}
Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-07-28 08:29:51 UTC (rev 3142)
+++ GNUnet/src/transports/udp.c 2006-07-28 09:30:02 UTC (rev 3143)
@@ -530,7 +530,8 @@
*/
static int udpSend(TSession * tsession,
const void * message,
- const unsigned int size) {
+ const unsigned int size,
+ int important) {
char * msg;
UDPMessage mp;
P2P_hello_MESSAGE * helo;
@@ -785,12 +786,10 @@
udpAPI.createhello = &createhello;
udpAPI.connect = &udpConnect;
udpAPI.send = &udpSend;
- udpAPI.sendReliable = &udpSend; /* can't increase reliability */
udpAPI.associate = &udpAssociate;
udpAPI.disconnect = &udpDisconnect;
udpAPI.startTransportServer = &startTransportServer;
udpAPI.stopTransportServer = &stopTransportServer;
- udpAPI.reloadConfiguration = &reloadConfiguration;
udpAPI.addressToString = &addressToString;
return &udpAPI;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3143 - in GNUnet/src: include transports,
grothoff <=