gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-merchant] branch master updated: -rate limiting


From: gnunet
Subject: [taler-merchant] branch master updated: -rate limiting
Date: Fri, 28 Apr 2023 15:04:30 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository merchant.

The following commit(s) were added to refs/heads/master by this push:
     new 5d59b885 -rate limiting
5d59b885 is described below

commit 5d59b885fd6683c248d89df55891009f0b460bf5
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Fri Apr 28 15:03:59 2023 +0200

    -rate limiting
---
 src/backend/taler-merchant-exchange.c | 197 +++++++++++++++++++++++++---------
 1 file changed, 147 insertions(+), 50 deletions(-)

diff --git a/src/backend/taler-merchant-exchange.c 
b/src/backend/taler-merchant-exchange.c
index ab4bc03f..66893f03 100644
--- a/src/backend/taler-merchant-exchange.c
+++ b/src/backend/taler-merchant-exchange.c
@@ -35,6 +35,15 @@
     GNUNET_TIME_UNIT_MINUTES, \
     30)
 
+/**
+ * How many inquiries do we process concurrently at most.
+ */
+#define OPEN_INQUIRY_LIMIT 1024
+
+/**
+ * How many inquiries do we process concurrently per exchange at most.
+ */
+#define EXCHANGE_INQUIRY_LIMIT 16
 
 /**
  * Information about an exchange.
@@ -61,6 +70,16 @@ struct Exchange
    */
   struct TALER_EXCHANGE_Handle *conn;
 
+  /**
+   * Task where we retry fetching /keys from the exchange.
+   */
+  struct GNUNET_SCHEDULER_Task *retry_task;
+
+  /**
+   * How many active inquiries do we have right now with this exchange.
+   */
+  unsigned int exchange_inquiries;
+
   /**
    * How soon can may we, at the earliest, re-download /keys?
    */
@@ -71,11 +90,6 @@ struct Exchange
    */
   struct GNUNET_TIME_Relative retry_delay;
 
-  /**
-   * Task where we retry fetching /keys from the exchange.
-   */
-  struct GNUNET_SCHEDULER_Task *retry_task;
-
   /**
    * False to indicate that there is an ongoing
    * /keys transfer we are waiting for;
@@ -107,45 +121,45 @@ struct Inquiry
   struct Exchange *exchange;
 
   /**
-   * When did the transfer happen?
+   * For which merchant instance is this tracking request?
    */
-  struct GNUNET_TIME_Absolute execution_time;
+  char *instance_id;
 
   /**
-   * Argument for the /wire/transfers request.
+   * payto:// URI used for the transfer.
    */
-  struct TALER_WireTransferIdentifierRawP wtid;
+  char *payto_uri;
 
   /**
-   * Amount of the wire transfer.
+   * Handle for the /wire/transfers request.
    */
-  struct TALER_Amount total;
+  struct TALER_EXCHANGE_TransfersGetHandle *wdh;
 
   /**
-   * For which merchant instance is this tracking request?
+   * Pointer to the detail that we are currently
+   * checking in #check_transfer().
    */
-  char *instance_id;
+  const struct TALER_TrackTransferDetails *current_detail;
 
   /**
-   * payto:// URI used for the transfer.
+   * When did the transfer happen?
    */
-  char *payto_uri;
+  struct GNUNET_TIME_Absolute execution_time;
 
   /**
-   * Row of the wire transfer in our database.
+   * Argument for the /wire/transfers request.
    */
-  uint64_t rowid;
+  struct TALER_WireTransferIdentifierRawP wtid;
 
   /**
-   * Handle for the /wire/transfers request.
+   * Amount of the wire transfer.
    */
-  struct TALER_EXCHANGE_TransfersGetHandle *wdh;
+  struct TALER_Amount total;
 
   /**
-   * Pointer to the detail that we are currently
-   * checking in #check_transfer().
+   * Row of the wire transfer in our database.
    */
-  const struct TALER_TrackTransferDetails *current_detail;
+  uint64_t rowid;
 
   /**
    * Which transaction detail are we currently looking at?
@@ -219,6 +233,11 @@ static struct GNUNET_SCHEDULER_Task *task;
  */
 static struct GNUNET_DB_EventHandler *eh;
 
+/**
+ * How many active inquiries do we have right now.
+ */
+static unsigned int active_inquiries;
+
 /**
  * Value to return from main(). 0 on success, non-zero on errors.
  */
@@ -244,6 +263,38 @@ static void
 exchange_request (void *cls);
 
 
+/**
+ * The exchange @a e is ready to handle more inquiries,
+ * prepare to launch them.
+ *
+ * @param e exchange to potentially launch inquiries on
+ */
+static void
+launch_inquiries_at_exchange (const struct Exchange *e)
+{
+  /* Note: this is an O(n) that should be optimized to an O(1) by tracking
+     inquiries per exchange at the exchange... */
+  for (struct Inquiry w = w_head;
+       NULL != w;
+       w = w->next)
+  {
+    if (w->exchange_done)
+      continue;
+    if (w->exchange != e)
+      continue;
+    if ( (NULL == w->task) &&
+         (NULL == w->wdh) )
+    {
+      /* Note: we should additionally count the number of exchange
+         requests launched here to not then run into the per-exchange
+         limit at exchange_request */
+      w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
+                                          w);
+    }
+  }
+}
+
+
 /**
  * Function called with information about who is auditing
  * a particular exchange and what keys the exchange is using.
@@ -267,21 +318,7 @@ cert_cb (
   {
   case MHD_HTTP_OK:
     e->ready = true;
-    for (struct Inquiry w = w_head;
-         NULL != w;
-         w = w->next)
-    {
-      if (w->exchange_done)
-        continue;
-      if (w->exchange != e)
-        continue;
-      if ( (NULL == w->task) &&
-           (NULL == w->wdh) )
-      {
-        w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
-                                            w);
-      }
-    }
+    launch_inquiries_at_exchange (e);
     // FIXME: schedule retry?
     break;
   default:
@@ -291,6 +328,26 @@ cert_cb (
 }
 
 
+/**
+ * Updates the transaction status for inquiry @a w to the given values.
+ *
+ * @param w inquiry to update status for
+ * @param next_attempt when should we retry @a w (if ever)
+ * @param ec error code to use (if any)
+ * @param failed failure status (if ultimately failed)
+ * @param verified success status (if ultimately successful)
+ */
+static void
+update_transaction_status (const struct Inquiry *w,
+                           struct GNUNET_TIME_Absolute next_attempt,
+                           enum TALER_ErrorCode ec,
+                           bool failed,
+                           bool verified)
+{
+  // FIXME: DB update here. Log result, on failure shutdown.
+}
+
+
 /**
  * Lookup our internal data structure for the given
  * @a exchange_url or create one if we do not yet have
@@ -330,6 +387,8 @@ find_exchange (const char *exchange_url)
 static void
 end_inquiry (struct Inquiry *w)
 {
+  GNUNET_assert (active_inquiries > 0);
+  active_inquiries--;
   if (NULL != w->wdh)
   {
     TALER_EXCHANGE_transfers_get_cancel (w->wdh);
@@ -479,8 +538,12 @@ wire_transfer_cb (void *cls,
                   const struct TALER_EXCHANGE_TransferData *td)
 {
   struct Inquiry *w = cls;
+  struct Exchange *e = w->exchange;
   enum GNUNET_DB_QueryStatus qs;
 
+  e->exchange_inquiries--;
+  if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries)
+    launch_inquiries_at_exchange (e);
   w->wdh = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Got response code %u from exchange for GET /transfers/$WTID\n",
@@ -490,10 +553,18 @@ wire_transfer_cb (void *cls,
   case MHD_HTTP_OK:
     break;
   case MHD_HTTP_NOT_FOUND:
-    // FIXME: record permanent failure in DB!
+    update_transaction_status (w,
+                               GNUNET_TIME_UNIT_FOREVER_ABS,
+                               TALER_EC_MERCHANT_XXX,
+                               true,
+                               false);
     return;
   default:
-    // FIXME: record transient failure in DB!
+    update_transaction_status (w,
+                               GNUNET_TIME_relative_to_absolute (delay),
+                               TALER_EC_MERCHANT_XXX,
+                               false,
+                               false);
     return;
   }
   TMH_db->preflight (TMH_db->cls);
@@ -508,7 +579,8 @@ wire_transfer_cb (void *cls,
   {
     /* Always report on DB error as well to enable diagnostics */
     GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs);
-    // FIXME: shutdown?
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
     return;
   }
   if (0 == qs)
@@ -521,10 +593,20 @@ wire_transfer_cb (void *cls,
       TALER_amount_cmp (&td->total_amount,
                         &w->total))
   {
-    // FIXME: record inconsistency in DB!
+    /* record inconsistency in DB! (TODO: report to auditor!?) */
+    update_transaction_status (w,
+                               GNUNET_TIME_UNIT_FOREVER_ABS,
+                               TALER_EC_MERCHANT_XXX,
+                               true,
+                               false);
     return;
   }
-  // FIXME: record success in DB!
+  /* set transaction to successful */
+  update_transaction_status (w,
+                             GNUNET_TIME_UNIT_FOREVER_ABS,
+                             TALER_EC_NONE,
+                             false,
+                             true);
 }
 
 
@@ -537,21 +619,31 @@ static void
 exchange_request (void *cls)
 {
   struct Inquiry *w = cls;
+  struct Exchange *e = w->exchange;
 
-  GNUNET_assert (w->exchange->ready);
-  // FIXME: rate-limit number of parallel requests per
-  // exchange!
-  w->wdh = TALER_EXCHANGE_transfers_get (w->exchange->conn,
+  GNUNET_assert (e->ready);
+  if (EXCHANGE_INQUIRY_LIMIT <= e->exchange_inquiries)
+    return; /* blocked by exchange rate limit */
+  w->wdh = TALER_EXCHANGE_transfers_get (e->conn,
                                          &w->wtid,
                                          &wire_transfer_cb,
                                          w);
   if (NULL == w->wdh)
   {
     GNUNET_break (0);
-    /* FIXME: update DB: status: failed on exchange! */
+    update_transaction_status (w,
+                               GNUNET_TIME_relative_to_absolute (delay),
+                               TALER_EC_MERCHANT_XXX,
+                               false,
+                               false);
     return;
   }
-  /* FIXME: update DB: status: waiting on exchange /transfers */
+  e->exchange_inquiries++;
+  update_transaction_status (w,
+                             GNUNET_TIME_relative_to_absolute (delay),
+                             
TALER_EC_MERCHANT_EXCHANGE_GET_TRANSFER_IN_PROGRESS,
+                             false,
+                             false);
 }
 
 
@@ -582,6 +674,7 @@ start_inquiry (
   struct Inquiry *w;
 
   (void) cls;
+  active_inquiries++;
   w = GNUNET_new (struct Inquiry);
   w->payto_uri = GNUNET_strdup (payto_uri);
   w->instance_id = GNUNET_strdup (instance_id);
@@ -595,7 +688,11 @@ start_inquiry (
   if (w->exchange->ready)
     w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
                                         w);
-  /* FIXME: update DB: status: waiting on exchange /keys */
+  update_transaction_status (w,
+                             GNUNET_TIME_relative_to_absolute (delay),
+                             TALER_EC_MERCHANT_EXCHANGE_KEYS_IN_PROGRESS,
+                             false,
+                             false);
 }
 
 
@@ -614,7 +711,7 @@ find_work (void *cls)
   // NOTE: SELECT WHERE confirmed AND NOT verified AND NOT failed?;
   // FIXME: use LIMIT clause! => When do we try again if LIMIT applied?
   qs = db_plugin->select_open_transfers (db_plugin->cls,
-                                         LIMIT - active,
+                                         OPEN_INQUIRY_LIMIT - active_inquiries,
                                          &start_inquiry,
                                          NULL);
   if (qs < 0)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]