[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r20051 - gnunet/src/stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r20051 - gnunet/src/stream |
Date: |
Sun, 26 Feb 2012 23:32:23 +0100 |
Author: harsha
Date: 2012-02-26 23:32:23 +0100 (Sun, 26 Feb 2012)
New Revision: 20051
Modified:
gnunet/src/stream/stream_api.c
Log:
-copy buffer and STREAM_read(incomplete)
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-02-26 22:31:40 UTC (rev 20050)
+++ gnunet/src/stream/stream_api.c 2012-02-26 22:32:23 UTC (rev 20051)
@@ -219,12 +219,12 @@
/**
* The write IO_handle associated with this socket
*/
- struct GNUNET_STREAM_IOHandle *write_handle;
+ struct GNUNET_STREAM_IOWriteHandle *write_handle;
/**
* The read IO_handle associated with this socket
*/
- struct GNUNET_STREAM_IOHandle *read_handle;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
/**
* Buffer for storing received messages
@@ -232,6 +232,11 @@
void *receive_buffer;
/**
+ * Copy buffer pointer; Used during read operations
+ */
+ void *copy_buffer;
+
+ /**
* The state of the protocol associated with this socket
*/
enum State state;
@@ -269,6 +274,11 @@
uint32_t receive_buffer_size;
/**
+ * The receiver buffer boundaries
+ */
+ uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
+ /**
* receiver's available buffer after the last acknowledged packet
*/
uint32_t receive_window_available;
@@ -282,6 +292,16 @@
* The offset after which we are expecting data
*/
uint32_t read_offset;
+
+ /**
+ * The size of the copy buffer
+ */
+ uint32_t copy_buffer_size;
+
+ /**
+ * The read offset of copy buffer
+ */
+ uint32_t copy_buffer_read_offset;
};
@@ -314,9 +334,9 @@
/**
- * The IO Handle
+ * The IO Write Handle
*/
-struct GNUNET_STREAM_IOHandle
+struct GNUNET_STREAM_IOWriteHandle
{
/**
* The packet_buffers associated with this Handle
@@ -330,11 +350,6 @@
GNUNET_STREAM_AckBitmap ack_bitmap;
/**
- * receiver's available buffer
- */
- uint32_t receive_window_available;
-
- /**
* Number of packets sent before waiting for an ack
*
* FIXME: Do we need this?
@@ -344,6 +359,23 @@
/**
+ * The IO Read Handle
+ */
+struct GNUNET_STREAM_IOReadHandle
+{
+ /**
+ * Callback for the read processor
+ */
+ GNUNET_STREAM_DataProcessor proc;
+
+ /**
+ * The closure pointer for the read processor callback
+ */
+ void *proc_cls;
+};
+
+
+/**
* Default value in seconds for various timeouts
*/
static unsigned int default_timeout = 300;
@@ -511,7 +543,8 @@
ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
- ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
+ ack_msg->receive_window_remaining =
+ htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
/* Request MESH for sending ACK */
GNUNET_MESH_notify_transmit_ready (socket->tunnel,
@@ -574,7 +607,7 @@
write_data_finish_cb (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
- struct GNUNET_STREAM_IOHandle *io_handle = cls;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
io_handle->sent_packets++;
}
@@ -589,7 +622,7 @@
static void
write_data (struct GNUNET_STREAM_Socket *socket)
{
- struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
unsigned int packet;
int ack_packet;
@@ -618,9 +651,9 @@
packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
while ( (NULL != io_handle->messages[packet]) &&
- (io_handle->receive_window_available >= ntohs
(io_handle->messages[packet]->header.header.size)) )
+ (socket->receive_window_available >= ntohs
(io_handle->messages[packet]->header.header.size)) )
{
- io_handle->receive_window_available -= ntohs
(io_handle->messages[packet]->header.header.size);
+ socket->receive_window_available -= ntohs
(io_handle->messages[packet]->header.header.size);
queue_message (socket,
&io_handle->messages[packet]->header,
&write_data_finish_cb,
@@ -651,6 +684,7 @@
const void *payload;
uint32_t bytes_needed;
uint32_t relative_offset;
+ uint32_t relative_sequence_number;
uint16_t size;
size = htons (msg->header.header.size);
@@ -666,9 +700,11 @@
case STATE_TRANSMIT_CLOSED:
case STATE_TRANSMIT_CLOSE_WAIT:
- /* check if the message's sequence number is greater than the one we are
+ /* check if the message's sequence number is in the range we are
expecting */
- if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64)
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number > 64)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring received message with sequence number %d",
@@ -688,8 +724,6 @@
socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
bytes_needed);
socket->receive_buffer_size = bytes_needed;
- socket->receive_window_available =
- RECEIVE_BUFFER_SIZE - socket->receive_buffer_size;
}
else
{
@@ -700,16 +734,18 @@
}
}
- /* Copy Data to buffer and send acknowledgement for this packet */
+ /* Copy Data to buffer */
payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
memcpy (socket->receive_buffer + relative_offset,
payload,
size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
/* Modify the ACK bitmap */
ackbitmap_modify_bit (&socket->ack_bitmap,
- ntohl (msg->sequence_number) -
- socket->read_sequence_number,
+ relative_sequence_number,
GNUNET_YES);
/* Start ACK sending task if one is not already present */
@@ -1427,7 +1463,7 @@
}
socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
- socket->write_handle->receive_window_available =
+ socket->receive_window_available =
ntohl (ack->receive_window_remaining);
write_data (socket);
break;
@@ -1617,6 +1653,53 @@
}
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ */
+static void
+call_read_processor_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ size_t read_size;
+ size_t valid_read_size;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
+
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (NULL != socket->read_handle->proc);
+ GNUNET_assert (NULL != socket->copy_buffer);
+ GNUNET_assert (0 != socket->copy_buffer_size);
+
+ valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
+ GNUNET_assert (0 != valid_read_size);
+
+ read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->copy_buffer
+ + socket->copy_buffer_read_offset,
+ valid_read_size);
+
+ GNUNET_assert (read_size <= valid_read_size);
+ socket->copy_buffer_read_offset += read_size;
+
+ /* Free the copy buffer once it has been read entirely */
+ if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
+ {
+ GNUNET_free (socket->copy_buffer);
+ socket->copy_buffer = NULL;
+ socket->copy_buffer_size = 0;
+ socket->copy_buffer_read_offset = 0;
+ }
+
+ /* Free the read handle */
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+}
+
+
/*****************/
/* API functions */
/*****************/
@@ -1878,7 +1961,7 @@
* @param write_cont_cls the closure
* @return handle to cancel the operation
*/
-struct GNUNET_STREAM_IOHandle *
+struct GNUNET_STREAM_IOWriteHandle *
GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
const void *data,
size_t size,
@@ -1888,7 +1971,7 @@
{
unsigned int num_needed_packets;
unsigned int packet;
- struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle;
uint32_t packet_size;
uint32_t payload_size;
struct GNUNET_STREAM_DataMessage *data_msg;
@@ -1912,8 +1995,7 @@
if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
- io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
- io_handle->receive_window_available = socket->receive_window_available;
+ io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
sweep = data;
/* Divide the given buffer into packets for sending */
for (packet=0; packet < num_needed_packets; packet++)
@@ -1966,22 +2048,59 @@
* @param proc_cls the closure for proc
* @return handle to cancel the operation
*/
-struct GNUNET_STREAM_IOHandle *
-GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket,
+struct GNUNET_STREAM_IOReadHandle *
+GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_TIME_Relative timeout,
GNUNET_STREAM_DataProcessor proc,
void *proc_cls)
{
+ unsigned int packet;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+
+ /* Return NULL if there is already a read handle; the user has to cancel that
+ first before continuing or has to wait until it is completed */
+ if (NULL != socket->read_handle) return NULL;
+
+ read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
+ read_handle->proc = proc;
+ socket->read_handle = read_handle;
+
+ /* if previous copy buffer is still not read call the data processor on it */
+ if (NULL != socket->copy_buffer)
+ {
+ GNUNET_SCHEDULER_add_now (&call_read_processor_task,
+ socket);
+ }
+
/* Check the bitmap for any holes */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
- /* Deem the data from the starting of the bitmap upto a hole as available
- data */
+ if (0 == packet) /* The first packet is still missing */
+ {
+ /* We can't do anything until it arrives */
+ }
+ else
+ {
+ /* Copy data to copy buffer */
+ socket->copy_buffer =
+ GNUNET_malloc (socket->receive_buffer_boundaries[packet-1]);
+
+ /* Shift the bitmap */
+ socket->ack_bitmap << packet;
- /* Create an IO handle */
+ /* Set read_sequence_number */
+ socket->read_sequence_number += packet;
- /* Call the Data processor with this available data */
-
- /* Update the read_sequence_number to the first hole in the bitmap */
+ /* Set read_offset */
+ socket->read_offset += packet;
- /* Shift the bitmap so that the first hole is now at the start */
+ /* FIXME: Fix relative calucations in receive buffer management */
+ }
+
+ return read_handle;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r20051 - gnunet/src/stream,
gnunet <=