gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnurl] 161/220: ngtcp2: accept upload via callback


From: gnunet
Subject: [GNUnet-SVN] [gnurl] 161/220: ngtcp2: accept upload via callback
Date: Thu, 12 Sep 2019 17:28:41 +0200

This is an automated email from the git hooks/post-receive script.

ng0 pushed a commit to branch master
in repository gnurl.

commit 0a5d28fa2ec872de55c8d3f3b62675f17ca9cd45
Author: Daniel Stenberg <address@hidden>
AuthorDate: Thu Aug 22 14:08:18 2019 +0200

    ngtcp2: accept upload via callback
    
    Closes #4256
---
 lib/http.h         |   8 +++
 lib/quic.h         |   5 +-
 lib/transfer.c     |   2 +
 lib/vquic/ngtcp2.c | 139 +++++++++++++++++++++++++++++++++++++++++++++++------
 lib/vquic/quiche.c |  18 ++++++-
 5 files changed, 153 insertions(+), 19 deletions(-)

diff --git a/lib/http.h b/lib/http.h
index 945aceb56..6232bbc3a 100644
--- a/lib/http.h
+++ b/lib/http.h
@@ -126,6 +126,10 @@ CURLcode Curl_http_auth_act(struct connectdata *conn);
 
 #endif /* CURL_DISABLE_HTTP */
 
+#ifdef USE_NGHTTP3
+struct h3out; /* see ngtcp2 */
+#endif
+
 /****************************************************************************
  * HTTP unique setup
  ***************************************************************************/
@@ -196,6 +200,10 @@ struct HTTP {
   int64_t stream3_id; /* stream we are interested in */
   bool firstbody;  /* FALSE until body arrives */
   bool h3req;    /* FALSE until request is issued */
+  bool upload_done;
+#endif
+#ifdef USE_NGHTTP3
+  struct h3out *h3out; /* per-stream buffers for upload */
 #endif
 };
 
diff --git a/lib/quic.h b/lib/quic.h
index d73ba0c36..6c132a324 100644
--- a/lib/quic.h
+++ b/lib/quic.h
@@ -44,7 +44,10 @@ CURLcode Curl_quic_is_connected(struct connectdata *conn,
                                 curl_socket_t sockfd,
                                 bool *connected);
 int Curl_quic_ver(char *p, size_t len);
+CURLcode Curl_quic_done_sending(struct connectdata *conn);
 
-#endif
+#else /* ENABLE_QUIC */
+#define Curl_quic_done_sending(x)
+#endif /* !ENABLE_QUIC */
 
 #endif /* HEADER_CURL_QUIC_H */
diff --git a/lib/transfer.c b/lib/transfer.c
index ab662fbc0..7e57fbe03 100644
--- a/lib/transfer.c
+++ b/lib/transfer.c
@@ -942,7 +942,9 @@ CURLcode Curl_done_sending(struct connectdata *conn,
 {
   k->keepon &= ~KEEP_SEND; /* we're done writing */
 
+  /* These functions should be moved into the handler struct! */
   Curl_http2_done_sending(conn);
+  Curl_quic_done_sending(conn);
 
   if(conn->bits.rewindaftersend) {
     CURLcode result = Curl_readrewind(conn);
diff --git a/lib/vquic/ngtcp2.c b/lib/vquic/ngtcp2.c
index f9de76960..008a75cfd 100644
--- a/lib/vquic/ngtcp2.c
+++ b/lib/vquic/ngtcp2.c
@@ -50,6 +50,20 @@
 #define H3BUGF(x) do { } WHILE_FALSE
 #endif
 
+/*
+ * This holds outgoing HTTP/3 stream data that is used by nghttp3 until acked.
+ * It is used as a circular buffer. Add new bytes at the end until it reaches
+ * the far end, then start over at index 0 again.
+ */
+
+#define H3_SEND_SIZE (20*1024)
+struct h3out {
+  uint8_t buf[H3_SEND_SIZE];
+  size_t used;   /* number of bytes used in the buffer */
+  size_t windex; /* index in the buffer where to start writing the next
+                    data block */
+};
+
 #define QUIC_MAX_STREAMS (256*1024)
 #define QUIC_MAX_DATA (1*1024*1024)
 #define QUIC_IDLE_TIMEOUT 60000 /* milliseconds */
@@ -63,6 +77,9 @@ static CURLcode ng_process_ingress(struct connectdata *conn,
                                    struct quicsocket *qs);
 static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd,
                                 struct quicsocket *qs);
+static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
+                                   size_t datalen, void *user_data,
+                                   void *stream_user_data);
 
 static ngtcp2_tstamp timestamp(void)
 {
@@ -1194,7 +1211,7 @@ static unsigned int ng_conncheck(struct connectdata *conn,
   return CONNRESULT_NONE;
 }
 
-static const struct Curl_handler Curl_handler_h3_quiche = {
+static const struct Curl_handler Curl_handler_http3 = {
   "HTTPS",                              /* scheme */
   ZERO_NULL,                            /* setup_connection */
   Curl_http,                            /* do_it */
@@ -1370,7 +1387,7 @@ static int cb_h3_send_stop_sending(nghttp3_conn *conn, 
int64_t stream_id,
 }
 
 static nghttp3_conn_callbacks ngh3_callbacks = {
-  NULL, /* acked_stream_data */
+  cb_h3_acked_stream_data, /* acked_stream_data */
   cb_h3_stream_close,
   cb_h3_recv_data,
   cb_h3_deferred_consume,
@@ -1386,6 +1403,7 @@ static nghttp3_conn_callbacks ngh3_callbacks = {
   NULL, /* http_cancel_push */
   cb_h3_send_stop_sending,
   NULL, /* push_stream */
+  NULL, /* end_stream */
 };
 
 static int init_ngh3_conn(struct quicsocket *qs)
@@ -1451,6 +1469,7 @@ static int init_ngh3_conn(struct quicsocket *qs)
 static Curl_recv ngh3_stream_recv;
 static Curl_send ngh3_stream_send;
 
+/* incoming data frames on the h3 stream */
 static ssize_t ngh3_stream_recv(struct connectdata *conn,
                                 int sockindex,
                                 char *buf,
@@ -1497,18 +1516,38 @@ static ssize_t ngh3_stream_recv(struct connectdata 
*conn,
   return -1;
 }
 
+/* this amount of data has now been acked on this stream */
+static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
+                                   size_t datalen, void *user_data,
+                                   void *stream_user_data)
+{
+  struct Curl_easy *data = stream_user_data;
+  struct HTTP *stream = data->req.protop;
+  (void)conn;
+  (void)stream_id;
+  (void)user_data;
+
+  if(!data->set.postfields) {
+    stream->h3out->used -= datalen;
+    fprintf(stderr, "cb_h3_acked_stream_data, %zd bytes, %zd left unacked\n",
+            datalen, stream->h3out->used);
+    DEBUGASSERT(stream->h3out->used < H3_SEND_SIZE);
+  }
+  return 0;
+}
+
 static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id,
                               const uint8_t **pdata,
                               size_t *pdatalen, uint32_t *pflags,
                               void *user_data, void *stream_user_data)
 {
   struct Curl_easy *data = stream_user_data;
+  size_t nread;
+  struct HTTP *stream = data->req.protop;
   (void)conn;
   (void)stream_id;
   (void)user_data;
 
-  fprintf(stderr, "called cb_h3_readfunction\n");
-
   if(data->set.postfields) {
     *pdata = data->set.postfields;
     *pdatalen = data->state.infilesize;
@@ -1516,6 +1555,48 @@ static int cb_h3_readfunction(nghttp3_conn *conn, 
int64_t stream_id,
     return 0;
   }
 
+  nread = CURLMIN(stream->upload_len, H3_SEND_SIZE - stream->h3out->used);
+  if(nread > 0) {
+    /* nghttp3 wants us to hold on to the data until it tells us it is okay to
+       delete it. Append the data at the end of the h3out buffer. Since we can
+       only return consecutive data, copy the amount that fits and the next
+       part comes in next invoke. */
+    struct h3out *out = stream->h3out;
+    if(nread + out->windex > H3_SEND_SIZE)
+      nread = H3_SEND_SIZE - out->windex;
+
+    memcpy(&out->buf[out->windex], stream->upload_mem, nread);
+    out->windex += nread;
+    out->used += nread;
+
+    /* that's the chunk we return to nghttp3 */
+    *pdata = &out->buf[out->windex];
+    *pdatalen = nread;
+
+    if(out->windex == H3_SEND_SIZE)
+      out->windex = 0; /* wrap */
+    stream->upload_mem += nread;
+    stream->upload_len -= nread;
+    if(data->state.infilesize != -1) {
+      stream->upload_left -= nread;
+      if(!stream->upload_left)
+        *pflags = NGHTTP3_DATA_FLAG_EOF;
+    }
+    fprintf(stderr, "cb_h3_readfunction %zd bytes%s (at %zd unacked)\n",
+            nread, *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"",
+            out->used);
+  }
+  if(stream->upload_done && !stream->upload_len &&
+     (stream->upload_left <= 0)) {
+    fprintf(stderr, "!!!!!!!!! cb_h3_readfunction sets EOF\n");
+    *pdata = NULL;
+    *pdatalen = 0;
+    *pflags = NGHTTP3_DATA_FLAG_EOF;
+  }
+  else if(!nread) {
+    *pdatalen = 0;
+    return NGHTTP3_ERR_WOULDBLOCK;
+  }
   return 0;
 }
 
@@ -1538,6 +1619,7 @@ static CURLcode http_request(struct connectdata *conn, 
const void *mem,
   nghttp3_nv *nva = NULL;
   int64_t stream3_id;
   int rc;
+  struct h3out *h3out = NULL;
 
   rc = ngtcp2_conn_open_bidi_stream(qs->qconn, &stream3_id, NULL);
   if(rc) {
@@ -1722,6 +1804,13 @@ static CURLcode http_request(struct connectdata *conn, 
const void *mem,
 
     data_reader.read_data = cb_h3_readfunction;
 
+    h3out = calloc(sizeof(struct h3out), 1);
+    if(!h3out) {
+      result = CURLE_OUT_OF_MEMORY;
+      goto fail;
+    }
+    stream->h3out = h3out;
+
     rc = nghttp3_conn_submit_request(qs->h3conn, stream->stream3_id,
                                      nva, nheader, &data_reader,
                                      conn->data);
@@ -1746,15 +1835,6 @@ static CURLcode http_request(struct connectdata *conn, 
const void *mem,
 
   Curl_safefree(nva);
 
-  if(!stream->upload_left) {
-    /* done with this stream, FIN it */
-    rc = nghttp3_conn_end_stream(qs->h3conn, stream->stream3_id);
-    if(rc) {
-      result = CURLE_SEND_ERROR;
-      goto fail;
-    }
-  }
-
   infof(data, "Using HTTP/3 Stream ID: %x (easy handle %p)\n",
         stream3_id, (void *)data);
 
@@ -1784,8 +1864,17 @@ static ssize_t ngh3_stream_send(struct connectdata *conn,
     sent = len;
   }
   else {
-    (void)qs;
-    /* TODO */
+    fprintf(stderr, "ngh3_stream_send() wants to send %zd bytes\n", len);
+    if(!stream->upload_len) {
+      stream->upload_mem = mem;
+      stream->upload_len = len;
+      (void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id);
+      sent = len;
+    }
+    else {
+      *curlcode = CURLE_AGAIN;
+      return -1;
+    }
   }
 
   if(ng_flush_egress(conn, sockfd, qs)) {
@@ -1801,7 +1890,7 @@ static void ng_has_connected(struct connectdata *conn, 
int tempindex)
 {
   conn->recv[FIRSTSOCKET] = ngh3_stream_recv;
   conn->send[FIRSTSOCKET] = ngh3_stream_send;
-  conn->handler = &Curl_handler_h3_quiche;
+  conn->handler = &Curl_handler_http3;
   conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
   conn->httpversion = 30;
   conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@@ -2022,4 +2111,22 @@ static CURLcode ng_flush_egress(struct connectdata 
*conn, int sockfd,
 
   return CURLE_OK;
 }
+
+/*
+ * Called from transfer.c:done_sending when we stop HTTP/3 uploading.
+ */
+CURLcode Curl_quic_done_sending(struct connectdata *conn)
+{
+  if(conn->handler == &Curl_handler_http3) {
+    /* only for HTTP/3 transfers */
+    struct HTTP *stream = conn->data->req.protop;
+    struct quicsocket *qs = conn->quic;
+    fprintf(stderr, "!!! Curl_quic_done_sending stream %zu\n",
+            stream->stream3_id);
+    stream->upload_done = TRUE;
+    (void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id);
+  }
+
+  return CURLE_OK;
+}
 #endif
diff --git a/lib/vquic/quiche.c b/lib/vquic/quiche.c
index b84cc4779..43723a5f9 100644
--- a/lib/vquic/quiche.c
+++ b/lib/vquic/quiche.c
@@ -116,7 +116,7 @@ static CURLcode quiche_do(struct connectdata *conn, bool 
*done)
   return Curl_http(conn, done);
 }
 
-static const struct Curl_handler Curl_handler_h3_quiche = {
+static const struct Curl_handler Curl_handler_http3 = {
   "HTTPS",                              /* scheme */
   ZERO_NULL,                            /* setup_connection */
   quiche_do,                            /* do_it */
@@ -232,7 +232,7 @@ static CURLcode quiche_has_connected(struct connectdata 
*conn,
 
   conn->recv[sockindex] = h3_stream_recv;
   conn->send[sockindex] = h3_stream_send;
-  conn->handler = &Curl_handler_h3_quiche;
+  conn->handler = &Curl_handler_http3;
   conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
   conn->httpversion = 30;
   conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@@ -750,5 +750,19 @@ fail:
   return result;
 }
 
+/*
+ * Called from transfer.c:done_sending when we stop HTTP/3 uploading.
+ */
+CURLcode Curl_quic_done_sending(struct connectdata *conn)
+{
+  if(conn->handler == &Curl_handler_http3) {
+    /* only for HTTP/3 transfers */
+    struct HTTP *stream = conn->data->req.protop;
+    fprintf(stderr, "!!! Curl_quic_done_sending\n");
+    stream->upload_done = TRUE;
+  }
+
+  return CURLE_OK;
+}
 
 #endif

-- 
To stop receiving notification emails like this one, please contact
address@hidden.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]