poke-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 2/2] ios: Introduce IO stream device for stdin, stdout, stderr.


From: Egeyar Bagcioglu
Subject: [PATCH 2/2] ios: Introduce IO stream device for stdin, stdout, stderr.
Date: Sun, 10 May 2020 18:27:25 +0200

The handlers for stdin, stdout and stderr are exactly these strings.
open("stdin") for example opens stdin.

Upon opening these devices initialize a buffer to store the read or
to-be-written data. It was discussed during the first poke-conf that
such a buffer should not be kept as one piece, as this might easily
make copying the whole buffer ver expensive during expanding and
flushing (i.e. discarding the early parts of the buffer). Therefore,
the buffer is implemented in chunks. These chunks are kept in hashtable
-if you may- where the hashfunction is simply chunk_no % IOB_BUCKET_COUNT.

It is true that this code requires better error handling, which partly
reqires changes to the rest of the ios as well. Moreover, currently,
ios_dev_stream_flush still requires language level support. Otherwise
tested. Tested also with the following parameters:
IOB_CHUNK_SIZE          1
IOB_BUCKET_COUNT        2

2020-05-10  Egeyar Bagcioglu  <address@hidden>

        * libpoke/Makefile.am: Add ios-dev-buffer.h and ios-dev-stream.c.
        * libpoke/ios-dev-buffer.h: New file.
        * libpoke/ios-dev-stream.c: New file.
        * libpoke/ios.c: Extern ios_dev_stream.
        (ios_dev_ifs): Add ios_dev_stream.
---
 libpoke/Makefile.am      |   3 +-
 libpoke/ios-dev-buffer.h | 249 +++++++++++++++++++++++++++++++++++
 libpoke/ios-dev-stream.c | 275 +++++++++++++++++++++++++++++++++++++++
 libpoke/ios.c            |   2 +
 4 files changed, 528 insertions(+), 1 deletion(-)
 create mode 100644 libpoke/ios-dev-buffer.h
 create mode 100644 libpoke/ios-dev-stream.c

diff --git a/libpoke/Makefile.am b/libpoke/Makefile.am
index 4ff355c4..04d1ec01 100644
--- a/libpoke/Makefile.am
+++ b/libpoke/Makefile.am
@@ -54,7 +54,8 @@ libpoke_la_SOURCES = libpoke.h libpoke.c \
                      pvm-program.h pvm-program.c \
                      pvm.jitter \
                      ios.c ios.h ios-dev.h \
-                     ios-dev-file.c ios-dev-mem.c
+                     ios-dev-file.c ios-dev-mem.c \
+                     ios-dev-buffer.h ios-dev-stream.c
 
 libpoke_la_SOURCES += ../common/pk-utils.c ../common/pk-utils.h
 
diff --git a/libpoke/ios-dev-buffer.h b/libpoke/ios-dev-buffer.h
new file mode 100644
index 00000000..f41f4337
--- /dev/null
+++ b/libpoke/ios-dev-buffer.h
@@ -0,0 +1,249 @@
+/* ios-dev-buffer.h - The buffer for IO devices.  */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <config.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <malloc.h>
+#include <assert.h>
+
+#define IOB_CHUNK_SIZE         2048
+#define IOB_BUCKET_COUNT       8
+
+#define IOB_CHUNK_OFFSET(offset)       \
+  ((offset) % IOB_CHUNK_SIZE)
+
+#define IOB_CHUNK_NO(offset)           \
+  ((offset) / IOB_CHUNK_SIZE)
+
+#define IOB_BUCKET_NO(chunk_no)                \
+  ((chunk_no) % IOB_BUCKET_COUNT)
+
+typedef struct ios_dev_buffer_chunk
+{
+  uint8_t bytes[IOB_CHUNK_SIZE];
+  int chunk_no;
+  struct ios_dev_buffer_chunk *next;
+} ios_dev_buffer_chunk;
+
+/* begin_offset is the first offset that's not yet flushed, initilized as 0.
+   end_offset of an instream is the next byte to read to.  end_offset of an
+   outstream is the successor of the greatest offset that is written to.  */
+
+typedef struct ios_dev_buffer
+{
+  ios_dev_buffer_chunk* chunks[IOB_BUCKET_COUNT];
+  ios_dev_off begin_offset;
+  ios_dev_off end_offset;
+  int next_chunk_no;
+} ios_dev_buffer;
+
+static ios_dev_buffer *
+ios_dev_buffer_init ()
+{
+  ios_dev_buffer *bio = calloc (1, sizeof(ios_dev_buffer));
+  return bio;
+}
+
+static int
+ios_dev_buffer_free (ios_dev_buffer *buffer)
+{
+  ios_dev_buffer_chunk *chunk, *chunk_next;
+  for (int i = 0; i < IOB_BUCKET_COUNT; i++)
+    {
+      chunk = buffer->chunks[i];
+      while (chunk)
+       {
+         chunk_next = chunk->next;
+         free (chunk);
+         chunk = chunk_next;
+       }
+    }
+
+  free (buffer);
+  return 1;
+}
+
+static ios_dev_buffer_chunk*
+ios_dev_buffer_get_chunk(ios_dev_buffer *buffer, int chunk_no)
+{
+  int bucket_no = IOB_BUCKET_NO (chunk_no);
+  ios_dev_buffer_chunk *chunk = buffer->chunks[bucket_no];
+
+  for ( ; chunk; chunk = chunk->next)
+    if (chunk->chunk_no == chunk_no)
+      return chunk;
+
+  return NULL;
+}
+
+static int
+ios_dev_buffer_allocate_new_chunk(ios_dev_buffer *buffer, int final_chunk_no,
+                                 ios_dev_buffer_chunk **final_chunk)
+{
+  ios_dev_buffer_chunk *chunk;
+  int bucket_no;
+
+  assert (buffer->next_chunk_no <= final_chunk_no);
+
+  do
+    {
+      chunk = calloc(1, sizeof(ios_dev_buffer_chunk));
+      if (!chunk)
+       return IOD_ERROR;
+      /* Place the new chunk into the buffer.  */
+      chunk->chunk_no = buffer->next_chunk_no;
+      bucket_no = IOB_BUCKET_NO (chunk->chunk_no);
+      chunk->next = buffer->chunks[bucket_no];
+      buffer->chunks[bucket_no] = chunk;
+      buffer->next_chunk_no++;
+    }
+  while (buffer->next_chunk_no <= final_chunk_no);
+
+  /* end_offset is updated as the buffer is written to. Therefore, it is not
+     updated here, but in ios_dev_buffer_pwrite.  */
+  *final_chunk = chunk;
+  return 0;
+}
+
+/* Since ios_dev_stream_pread already needs to check begin_offset and
+   end_offset, so this function does not.  It assumes that the given range
+   already exists in the buffer.  */
+
+static int
+ios_dev_buffer_pread (ios_dev_buffer *buffer, void *buf, size_t count,
+                     ios_dev_off offset)
+{
+  int chunk_no;
+  ios_dev_buffer_chunk *chunk;
+  ios_dev_off chunk_offset;
+  size_t already_read_count = 0,
+        to_be_read_count = 0;
+
+  chunk_no = IOB_CHUNK_NO (offset);
+  chunk_offset = IOB_CHUNK_OFFSET(offset);
+  chunk = ios_dev_buffer_get_chunk(buffer, chunk_no);
+  if (!chunk && ios_dev_buffer_allocate_new_chunk(buffer, chunk_no, &chunk))
+    return IOD_ERROR;
+
+  /* The amount we read from this chunk is the maximum of
+     the COUNT requested and the size of the rest of this chunk. */
+  to_be_read_count = IOB_CHUNK_SIZE - chunk_offset > count
+                    ? count
+                    : IOB_CHUNK_SIZE - chunk_offset;
+
+  memcpy (buf, (void *) chunk + chunk_offset, to_be_read_count);
+
+  while ((already_read_count += to_be_read_count) < count)
+    {
+      to_be_read_count = count - already_read_count > IOB_CHUNK_SIZE
+                        ? IOB_CHUNK_SIZE
+                        : count - already_read_count;
+
+      chunk = ios_dev_buffer_get_chunk(buffer, ++chunk_no);
+      if (!chunk && ios_dev_buffer_allocate_new_chunk(buffer, chunk_no, 
&chunk))
+       return IOD_ERROR;
+      memcpy (buf + already_read_count, chunk, to_be_read_count);
+    };
+
+  return 0;
+}
+
+/* Since ios_dev_stream_pwrite already needs to check begin_offset, this
+   function does not.  It assumes the given range is not discarded.  It also
+   allocates new chunks when necessary.  */
+
+static int
+ios_dev_buffer_pwrite (ios_dev_buffer *buffer, const void *buf, size_t count,
+                      ios_dev_off offset)
+{
+  int chunk_no;
+  ios_dev_buffer_chunk *chunk;
+  ios_dev_off chunk_offset;
+  size_t already_written_count = 0,
+        to_be_written_count = 0;
+
+  chunk_no = IOB_CHUNK_NO (offset);
+  chunk_offset = IOB_CHUNK_OFFSET (offset);
+  chunk = ios_dev_buffer_get_chunk(buffer, chunk_no);
+  if (!chunk && ios_dev_buffer_allocate_new_chunk(buffer, chunk_no, &chunk))
+    return IOD_ERROR;
+
+  /* The amount we write to this chunk is the maximum of the COUNT requested
+     and the size of the rest of this chunk. */
+  to_be_written_count = IOB_CHUNK_SIZE - chunk_offset > count
+                       ? count
+                       : IOB_CHUNK_SIZE - chunk_offset;
+
+  memcpy ((void *) chunk + chunk_offset, buf, to_be_written_count);
+
+  while ((already_written_count += to_be_written_count) < count)
+    {
+      to_be_written_count = count - already_written_count > IOB_CHUNK_SIZE
+                           ? IOB_CHUNK_SIZE
+                           : count - already_written_count;
+
+      chunk = ios_dev_buffer_get_chunk(buffer, ++chunk_no);
+      if (!chunk && ios_dev_buffer_allocate_new_chunk(buffer, chunk_no, 
&chunk))
+       return IOD_ERROR;
+      memcpy (chunk, buf + already_written_count, to_be_written_count);
+    };
+
+  /* Lastly, keep track of the greatest offset we wrote to in the buffer.
+     (In fact, end_offset is the least offset we have not written to yet.)  */
+  if (buffer->end_offset < offset + count)
+    buffer->end_offset = offset + count;
+
+  return 0;
+}
+
+static int
+ios_dev_buffer_forget_till(ios_dev_buffer *buffer, ios_dev_off offset)
+{
+  ios_dev_buffer_chunk *chunk, *chunk_next;
+  int chunk_no = IOB_CHUNK_NO (offset);
+
+  for (int i = 0; i < IOB_BUCKET_COUNT; i++)
+    {
+      chunk = buffer->chunks[i];
+      buffer->chunks[i] = NULL;
+      while (chunk)
+       {
+         chunk_next = chunk->next;
+         if (chunk->chunk_no >= chunk_no)
+           {
+             chunk->next = buffer->chunks[i];
+             buffer->chunks[i] = chunk;
+           }
+         else
+           free (chunk);
+         chunk = chunk_next;
+       }
+    }
+
+  /* If this is a write stream, we must have written out some data to get here.
+     In this case, begin_offset is equal to OFFSET by now.  */
+  if (buffer->begin_offset < chunk_no * IOB_CHUNK_SIZE)
+    {
+      buffer->begin_offset = chunk_no * IOB_CHUNK_SIZE;
+      assert (buffer->end_offset >= buffer->begin_offset);
+    }
+  else
+    assert (buffer->begin_offset <= offset);
+  return 0;
+}
diff --git a/libpoke/ios-dev-stream.c b/libpoke/ios-dev-stream.c
new file mode 100644
index 00000000..1b8c725d
--- /dev/null
+++ b/libpoke/ios-dev-stream.c
@@ -0,0 +1,275 @@
+/* ios-dev-stream.c - Streaming IO devices.  */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <config.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <stdio.h>
+#include <malloc.h>
+#include <string.h>
+#include <assert.h>
+
+#include "ios.h"
+#include "ios-dev.h"
+#include "ios-dev-buffer.h"
+
+/* State associated with a stream device.  */
+
+struct ios_dev_stream
+{
+  FILE *file;
+  uint64_t flags;
+  ios_dev_buffer *buffer;
+};
+
+static char *
+ios_dev_stream_handler_normalize (const char *handler)
+{
+  if (strcmp (handler, "stdin")
+      && strcmp (handler, "stdout")
+      && strcmp (handler, "stderr"))
+    return NULL;
+
+  return strdup (handler);
+}
+
+static void *
+ios_dev_stream_open (const char *handler, uint64_t flags, int *error)
+{
+  struct ios_dev_stream *sio;
+
+  sio = malloc (sizeof (struct ios_dev_stream));
+  if (!sio)
+    goto error;
+
+  if (!strcmp (handler, "stdin"))
+    {
+      sio->file = stdin;
+      sio->flags = IOS_F_READ;
+    }
+  else if (!strcmp (handler, "stdout"))
+    {
+      sio->file = stdout;
+      sio->flags = IOS_F_WRITE;
+    }
+  else if (!strcmp (handler, "stderr"))
+    {
+      sio->file = stderr;
+      sio->flags = IOS_F_WRITE;
+    }
+  else
+    goto error;
+
+  sio->buffer = ios_dev_buffer_init();
+  if (!sio->buffer)
+    goto error;
+
+  return sio;
+
+error:
+  if (sio)
+    free (sio);
+  *error = IOD_ERROR;
+  return NULL;
+}
+
+static int
+ios_dev_stream_close (void *iod)
+{
+  struct ios_dev_stream *sio = iod;
+
+  ios_dev_buffer_free(sio->buffer);
+  free (sio);
+
+  return 1;
+}
+
+static uint64_t
+ios_dev_stream_get_flags (void *iod)
+{
+  struct ios_dev_stream *sio = iod;
+  return sio->flags;
+}
+
+static int
+ios_dev_stream_pread (void *iod, void *buf, size_t count, ios_dev_off offset)
+{
+  struct ios_dev_stream *sio = iod;
+  ios_dev_buffer *buffer = sio->buffer;
+  size_t read_count, total_read_count = 0;
+
+  /* If the beginning of the buffer is discarded, return EOF. */
+  if (buffer->begin_offset > offset)
+    return IOD_EOF;
+
+  /* If the requsted range is in the buffer, return it.  Or if this is not
+   * an in-stream, initialize the missing buffer as 0 and return it.  */
+  if ((buffer->end_offset >= offset + count)
+      || !(sio->flags & IOS_F_READ))
+    return ios_dev_buffer_pread (buffer, buf, count, offset);
+
+  /* What was last read into the buffer may be before or after the
+     offset that this function is provided with.  */
+  if (buffer->end_offset == offset)
+    {
+      do
+       {
+         read_count = fread(buf + total_read_count, count, 1, sio->file);
+         total_read_count += read_count;
+       }
+      while (total_read_count < count && read_count);
+
+      if (ios_dev_buffer_pwrite (buffer, buf, total_read_count, offset)
+         || total_read_count < count)
+       return IOD_ERROR;
+
+      return IOS_OK;
+    }
+  else
+    {
+      size_t to_be_read = (offset + count) - buffer->end_offset;
+      void *temp = malloc (to_be_read);
+      fread(temp, to_be_read, 1, sio->file);
+      if (ios_dev_buffer_pwrite (buffer, temp, to_be_read, buffer->end_offset))
+       return IOD_ERROR;
+      free (temp);
+      return ios_dev_buffer_pread (buffer, buf, count, offset);
+    }
+}
+
+static int
+ios_dev_stream_pwrite (void *iod, const void *buf, size_t count,
+                      ios_dev_off offset)
+{
+  struct ios_dev_stream *sio = iod;
+  ios_dev_buffer *buffer = sio->buffer;
+
+  /* If the offset we want to write to is already flushed,
+     we return an error.  */
+  if (buffer->begin_offset > offset)
+    return IOD_EOF;
+
+  /* If this is not a write stream and does not already contain this range,
+     return an error.  */
+  if ((sio->flags & IOS_F_WRITE) == 0
+      && buffer->end_offset < offset + count)
+    return IOD_EOF;
+
+  return ios_dev_buffer_pwrite (buffer, buf, count, offset);
+}
+
+static ios_dev_off
+ios_dev_stream_size (void *iod)
+{
+  struct ios_dev_stream *sio = iod;
+  ios_dev_buffer *buffer = sio->buffer;
+  return buffer->end_offset;
+}
+
+static int
+ios_dev_stream_write_out_till(struct ios_dev_stream *sio, ios_dev_off offset)
+{
+  ios_dev_buffer *buffer = sio->buffer;
+  ios_dev_buffer_chunk *chunk, *first_chunk, *last_chunk;
+  size_t count, written;
+  int chunk_no;
+
+  last_chunk = ios_dev_buffer_get_chunk(buffer, IOB_CHUNK_NO(offset));
+  first_chunk
+    = ios_dev_buffer_get_chunk(buffer, IOB_CHUNK_NO(buffer->begin_offset));
+
+  assert (first_chunk);
+  assert (last_chunk);
+
+  if (first_chunk == last_chunk)
+    {
+      count = offset - buffer->begin_offset;
+      written = fwrite ((void *) first_chunk
+                         + IOB_CHUNK_OFFSET (buffer->begin_offset),
+                       count, 1, sio->file);
+      buffer->begin_offset += written;
+      if (written != count)
+       return IOD_ERROR;
+      else
+       return IOS_OK;
+    }
+  else
+    {
+      count = IOB_CHUNK_SIZE - IOB_CHUNK_OFFSET (buffer->begin_offset);
+      written = fwrite ((void *) first_chunk
+                         + IOB_CHUNK_OFFSET (buffer->begin_offset),
+                       count, 1, sio->file);
+      buffer->begin_offset += written;
+      if (written != count)
+       return IOD_ERROR;
+
+      /* Print out all the chunks inbetween.  */
+      for (chunk_no = first_chunk->chunk_no + 1;
+          chunk_no < last_chunk->chunk_no;
+          chunk_no++)
+        {
+         chunk = ios_dev_buffer_get_chunk(buffer, chunk_no);
+         written = fwrite (chunk, IOB_CHUNK_SIZE, 1, sio->file);
+         buffer->begin_offset += written;
+         if (written != IOB_CHUNK_SIZE)
+           return IOD_ERROR;
+       }
+
+      count = IOB_CHUNK_OFFSET (offset);
+      written = fwrite (last_chunk, count, 1, sio->file);
+      buffer->begin_offset += written;
+      if (written != count)
+       return IOD_ERROR;
+      else
+        {
+         assert (buffer->begin_offset == offset);
+         return IOS_OK;
+       }
+    }
+}
+
+static int
+ios_dev_stream_flush (void *iod, ios_dev_off offset)
+{
+  struct ios_dev_stream *sio = iod;
+  ios_dev_buffer *buffer = sio->buffer;
+
+  if (buffer->begin_offset >= offset
+      || buffer->end_offset < offset)
+    return IOD_ERROR;
+
+  if (sio->flags & IOS_F_WRITE)
+    {
+      ios_dev_stream_write_out_till(sio, offset);
+    }
+  return ios_dev_buffer_forget_till(buffer, offset);
+}
+
+struct ios_dev_if ios_dev_stream
+  __attribute__ ((visibility ("hidden"))) =
+  {
+   .handler_normalize = ios_dev_stream_handler_normalize,
+   .open = ios_dev_stream_open,
+   .close = ios_dev_stream_close,
+   .pread = ios_dev_stream_pread,
+   .pwrite = ios_dev_stream_pwrite,
+   .get_flags = ios_dev_stream_get_flags,
+   .size = ios_dev_stream_size,
+   .flush = ios_dev_stream_flush
+  };
diff --git a/libpoke/ios.c b/libpoke/ios.c
index 6f8bcd6b..9cb1cb56 100644
--- a/libpoke/ios.c
+++ b/libpoke/ios.c
@@ -86,6 +86,7 @@ static struct ios *cur_io;
 
 extern struct ios_dev_if ios_dev_mem; /* ios-dev-mem.c */
 extern struct ios_dev_if ios_dev_file; /* ios-dev-file.c */
+extern struct ios_dev_if ios_dev_stream; /* ios-dev-stream.c */
 #ifdef HAVE_LIBNBD
 extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */
 #endif
@@ -93,6 +94,7 @@ extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */
 static struct ios_dev_if *ios_dev_ifs[] =
   {
    &ios_dev_mem,
+   &ios_dev_stream,
 #ifdef HAVE_LIBNBD
    &ios_dev_nbd,
 #endif
-- 
2.25.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]