[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r2674 - GNUnet/src/server
From: |
grothoff |
Subject: |
[GNUnet-SVN] r2674 - GNUnet/src/server |
Date: |
Thu, 27 Apr 2006 22:23:19 -0700 (PDT) |
Author: grothoff
Date: 2006-04-27 22:23:16 -0700 (Thu, 27 Apr 2006)
New Revision: 2674
Modified:
GNUnet/src/server/connection.c
Log:
fixing bug #1049
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2006-04-27 21:51:25 UTC (rev 2673)
+++ GNUnet/src/server/connection.c 2006-04-28 05:23:16 UTC (rev 2674)
@@ -867,10 +867,10 @@
totalMessageSize = 0;
(*priority) = 0;
- for(i = be->sendBufferSize - 1; i >= 0; i--)
+ for (i = be->sendBufferSize - 1; i >= 0; i--)
be->sendBuffer[i]->knapsackSolution = NO;
- if(be->session.mtu == 0) {
+ if (be->session.mtu == 0) {
totalMessageSize = sizeof(P2P_PACKET_HEADER);
i = 0;
/* assumes entries are sorted by priority! */
@@ -928,8 +928,7 @@
a small message if there is nothing else to do! */
return 0;
}
- }
- else { /* if (be->session.mtu == 0) */
+ } else { /* if (be->session.mtu == 0) */
/* solve knapsack problem, compute accumulated priority */
approxProb = getCPULoad();
if(approxProb > 50) {
@@ -970,17 +969,20 @@
if(j == 0) {
LOG(LOG_ERROR,
_("`%s' selected %d out of %d messages (MTU: %d).\n"),
- "solveKnapsack",
- j, be->sendBufferSize, be->session.mtu - sizeof(P2P_PACKET_HEADER));
+ __FUNCTION__,
+ j, be->sendBufferSize,
+ be->session.mtu - sizeof(P2P_PACKET_HEADER));
for(j = 0; j < be->sendBufferSize; j++)
LOG(LOG_ERROR,
_("Message details: %u: length %d, priority: %d\n"),
- j, be->sendBuffer[j]->len, be->sendBuffer[j]->pri);
+ j,
+ be->sendBuffer[j]->len,
+ be->sendBuffer[j]->pri);
return 0;
}
- if(be->available_send_window < be->session.mtu) {
+ if (be->available_send_window < be->session.mtu) {
/* if we have a very high priority, we may
want to ignore bandwidth availability (e.g. for HANGUP,
which has EXTREME_PRIORITY) */
@@ -1188,6 +1190,55 @@
}
/**
+ * The MTU has changed. We may have messages larger than the
+ * MTU in the buffer. Check if this is the case, and if so,
+ * fragment those messages.
+ */
+static void fragmentIfNecessary(BufferEntry * be) {
+ SendEntry ** entries;
+ SendEntry * entry;
+ unsigned int i;
+ unsigned int ret;
+ unsigned int j;
+ int changed;
+
+ if (be->session.mtu == 0)
+ return; /* clearly not necessary */
+
+ /* MTU change may require new fragmentation! */
+ changed = YES;
+ while (changed) {
+ changed = NO;
+ entries = be->sendBuffer;
+ ret = be->sendBufferSize;
+ for (i=0;i<ret;i++) {
+ entry = entries[i];
+ if (entry->len <= be->session.mtu - sizeof(P2P_PACKET_HEADER))
+ continue;
+ ret--;
+ for (j = i; j < ret; j++)
+ entries[j] = entries[j + 1]; /* preserve ordering */
+ GROW(be->sendBuffer,
+ be->sendBufferSize,
+ ret);
+ /* calling fragment will change be->sendBuffer;
+ thus we need to restart from the beginning afterwards... */
+ fragmentation->fragment(&be->session.sender,
+ be->session.mtu - sizeof(P2P_PACKET_HEADER),
+ entry->pri,
+ entry->transmissionTime,
+ entry->len,
+ entry->callback,
+ entry->closure);
+ FREE(entry);
+ changed = YES;
+ break; /* "entries" changed as side-effect of fragment call */
+ }
+ } /* while changed */
+ return OK;
+}
+
+/**
* Try to make sure that the transport service for the given buffer is
* connected. If the transport service changes, this function also
* ensures that the pending messages are properly fragmented (if
@@ -1196,55 +1247,16 @@
* @return OK on success, NO on error
*/
static int ensureTransportConnected(BufferEntry * be) {
- SendEntry **entries;
- SendEntry *entry;
- int i;
- int ret;
- int j;
- int changed;
-
- if(be->session.tsession == NULL) {
- be->session.tsession = transport->connectFreely(&be->session.sender, YES);
- if(be->session.tsession == NULL)
- return NO;
- be->session.mtu = transport->getMTU(be->session.tsession->ttype);
- if(be->session.mtu > 0) {
- /* MTU change may require new fragmentation! */
- changed = YES;
- while(changed) {
- changed = NO;
- entries = be->sendBuffer;
- i = 0;
- ret = be->sendBufferSize;
- while(i < ret) {
- entry = entries[i];
- if(entry->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) {
- ret--;
- for(j = i; j < ret; j++)
- entries[j] = entries[j + 1]; /* preserve ordering */
- GROW(be->sendBuffer, be->sendBufferSize, ret);
- /* calling fragment will change be->sendBuffer;
- thus we need to restart from the beginning afterwards... */
- fragmentation->fragment(&be->session.sender,
- be->session.mtu -
- sizeof(P2P_PACKET_HEADER), entry->pri,
- entry->transmissionTime, entry->len,
- entry->callback, entry->closure);
- FREE(entry);
- changed = YES;
- break;
- }
- else {
- i++;
- }
- } /* for all i (until change) */
- } /* while changed */
- } /* if MTU changed */
- } /* if need to reconnect */
+ if (be->session.tsession != NULL)
+ return OK;
+ be->session.tsession = transport->connectFreely(&be->session.sender, YES);
+ if (be->session.tsession == NULL)
+ return NO;
+ be->session.mtu = transport->getMTU(be->session.tsession->ttype);
+ fragmentIfNecessary(be);
return OK;
}
-
/**
* Send a buffer; assumes that access is already synchronized. This
* message solves the knapsack problem, assembles the message
@@ -1295,7 +1307,7 @@
#endif
totalMessageSize = selectMessagesToSend(be, &priority);
- if(totalMessageSize == 0) {
+ if (totalMessageSize == 0) {
expireSendBufferEntries(be);
be->inSendBuffer = NO;
return; /* deferr further */
@@ -1347,7 +1359,8 @@
while(pos != NULL) {
if(pos->minimumPadding + p <= totalMessageSize) {
p += pos->callback(&be->session.sender,
- &plaintextMsg[p], be->session.mtu - p);
+ &plaintextMsg[p],
+ be->session.mtu - p);
}
pos = pos->next;
}
@@ -1443,19 +1456,22 @@
unsigned long long queueSize;
ENTRY();
- if((se == NULL) || (se->len == 0)) {
+ if ( (se == NULL) ||
+ (se->len == 0) ) {
BREAK();
FREENONNULL(se);
return;
}
- if((be->session.mtu != 0) &&
- (se->len > be->session.mtu - sizeof(P2P_PACKET_HEADER))) {
+ if ( (be->session.mtu != 0) &&
+ (se->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) ) {
/* this message is so big that it must be fragmented! */
fragmentation->fragment(&be->session.sender,
be->session.mtu - sizeof(P2P_PACKET_HEADER),
se->pri,
se->transmissionTime,
- se->len, se->callback, se->closure);
+ se->len,
+ se->callback,
+ se->closure);
FREE(se);
return;
}
@@ -1463,13 +1479,17 @@
#if DEBUG_CONNECTION
IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
LOG(LOG_DEBUG,
- "adding message of size %d to buffer of host %s.\n", se->len, &enc);
+ "adding message of size %d to buffer of host `%s'\n",
+ se->len,
+ &enc);
#endif
if((be->sendBufferSize > 0) && (be->status != STAT_UP)) {
/* as long as we do not have a confirmed
connection, do NOT queue messages! */
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG, "not connected to %s, message dropped\n", &enc);
+ LOG(LOG_DEBUG,
+ "not connected to `%s', message dropped\n",
+ &enc);
#endif
FREE(se->closure);
FREE(se);
@@ -1494,7 +1514,8 @@
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"queueSize (%llu) >= %d, refusing to queue message.\n",
- queueSize, MAX_SEND_BUFFER_SIZE);
+ queueSize,
+ MAX_SEND_BUFFER_SIZE);
#endif
FREE(se->closure);
FREE(se);
@@ -2500,7 +2521,8 @@
* @param tsession the transport session that is for grabs
* @param sender the identity of the other node
*/
-void considerTakeover(const PeerIdentity * sender, TSession * tsession) {
+void considerTakeover(const PeerIdentity * sender,
+ TSession * tsession) {
BufferEntry *be;
ENTRY();
@@ -2530,9 +2552,10 @@
transport->disconnect(be->session.tsession);
be->session.tsession = tsession;
be->session.mtu = transport->getMTU(tsession->ttype);
+ fragmentIfNecessary(be);
}
- } /* end if cheaper AND possible */
- } /* end if connected */
+ } /* end if cheaper AND possible */
+ } /* end if connected */
}
MUTEX_UNLOCK(&lock);
transport->disconnect(tsession);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r2674 - GNUnet/src/server,
grothoff <=