gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 06/07: DB events at Access API.


From: gnunet
Subject: [libeufin] 06/07: DB events at Access API.
Date: Wed, 12 Apr 2023 11:28:24 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit a35ce9012f2cd6a11f850da49c5ca6a92ee4a5fc
Author: MS <ms@taler.net>
AuthorDate: Wed Apr 12 11:21:52 2023 +0200

    DB events at Access API.
    
    Offering long polling for transactions download.
---
 .../tech/libeufin/sandbox/ConversionService.kt     | 72 ++++++++++++++++++++++
 .../src/main/kotlin/tech/libeufin/sandbox/Main.kt  | 53 +++++++++++++---
 .../kotlin/tech/libeufin/sandbox/bankAccount.kt    | 13 ++++
 3 files changed, 129 insertions(+), 9 deletions(-)

diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
new file mode 100644
index 00000000..1d256beb
--- /dev/null
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -0,0 +1,72 @@
+package tech.libeufin.sandbox
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+
+/**
+ * This file contains the logic for downloading/submitting incoming/outgoing
+ * fiat transactions to Nexus.  It needs the following values for operating.
+ *
+ * 1.  Nexus URL.
+ * 2.  Credentials to authenticate at Nexus JSON API.
+ * 3.  Long-polling interval.
+ * 4.  Frequency of the download loop.
+ *
+ * Notes:
+ *
+ * 1.  The account to credit on incoming transactions is ALWAYS "admin".
+ * 2.  The time to submit a new payment is as soon as "admin" receives one
+ *     incoming regional payment.
+ * 3.  At this time, Nexus does NOT offer long polling when it serves the
+ *     transactions via its JSON API.
+ * 4.  At this time, Nexus does NOT offer any filter when it serves the
+ *     transactions via its JSON API.
+ */
+
+// Temporarily hard-coded.  According to fiat times, these values could be WAY 
higher.
+val longPollMs = 30000L // 30s long-polling.
+val loopNewReqMs = 2000L // 2s for the next request.
+
+/**
+ * Executes the 'block' function every 'loopNewReqMs' milliseconds.
+ * Does not exit/fail the process upon exceptions - just logs them.
+ */
+fun downloadLoop(block: () -> Unit) {
+    // Needs "runBlocking {}" to call "delay()" and in case 'block'
+    // contains suspend functions.
+    runBlocking {
+        while(true) {
+            try { block() }
+            catch (e: Exception) {
+                /**
+                 * Not exiting to tolerate network issues, or optimistically
+                 * tolerate problems not caused by Sandbox itself.
+                 */
+                logger.error("Sandbox fiat-incoming monitor excepted: 
${e.message}")
+            }
+            delay(loopNewReqMs)
+        }
+    }
+}
+
+/**
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and signals their arrival 
(LIBEUFIN_FIAT_INCOMING)
+ * to allow crediting the "admin" account.
+ */
+// fetchTransactions()
+
+/**
+ * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
+ * and credits the "admin" account as a reaction.  Lastly, the Nexus instance
+ * wired to Sandbox will pick the new payment and serve it via its TWG, but
+ * this is OUT of the Sandbox scope.
+ */
+// creditAdmin()
+
+/**
+ * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING)
+ * and submits the related cash-out payment to Nexus.  The fiat payment will
+ * then take place ENTIRELY on Nexus' responsibility.
+ */
+// issueCashout()
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
index 856cbbeb..f11d8cb3 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
@@ -47,6 +47,7 @@ import io.ktor.server.util.*
 import io.ktor.server.plugins.callloging.*
 import io.ktor.server.plugins.cors.routing.*
 import io.ktor.util.date.*
+import org.jetbrains.exposed.dao.flushCache
 import org.jetbrains.exposed.sql.*
 import org.jetbrains.exposed.sql.statements.api.ExposedBlob
 import org.jetbrains.exposed.sql.transactions.transaction
@@ -1530,16 +1531,50 @@ val sandboxApp: Application.() -> Unit = {
                     if (fromMs < 0) throw badRequest("'from_ms' param is less 
than 0")
                     val untilMs = 
expectLong(call.request.queryParameters["until_ms"] ?: 
Long.MAX_VALUE.toString())
                     if (untilMs < 0) throw badRequest("'until_ms' param is 
less than 0")
-                    val ret: List<XLibeufinBankTransaction> = transaction {
-                        extractTxHistory(
-                            HistoryParams(
-                                pageNumber = page,
-                                pageSize = size,
-                                bankAccount = bankAccount,
-                                fromMs = fromMs,
-                                untilMs = untilMs
-                            )
+                    val longPollMs: Long? = call.maybeLong("long_poll_ms")
+                    // LISTEN, if Postgres.
+                    val listenHandle = if (isPostgres() && longPollMs != null) 
{
+                        val channelName = buildChannelName(
+                            NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+                            call.expectUriComponent("account_name")
                         )
+                        val listenHandle = PostgresListenHandle(channelName)
+                        // Can't LISTEN on the same DB TX that checks for 
data, as Exposed
+                        // closes that connection and the notification getter 
would fail.
+                        // Can't invoke the notification getter in the same DB 
TX either,
+                        // as it would block the DB.
+                        listenHandle.postgresListen()
+                        listenHandle
+                    } else null
+                    val historyParams = HistoryParams(
+                        pageNumber = page,
+                        pageSize = size,
+                        bankAccount = bankAccount,
+                        fromMs = fromMs,
+                        untilMs = untilMs
+                    )
+                    var ret: List<XLibeufinBankTransaction> = transaction {
+                        extractTxHistory(historyParams)
+                    }
+                    // Data was found already, UNLISTEN and respond.
+                    if (listenHandle != null && ret.isNotEmpty()) {
+                        listenHandle.postgresUnlisten()
+                        call.respond(object {val transactions = ret})
+                        return@get
+                    }
+                    // No data was found, sleep until the timeout or getting 
woken up.
+                    // Third condition only silences the compiler.
+                    if (listenHandle != null && ret.isEmpty() && longPollMs != 
null) {
+                        val notificationArrived = 
listenHandle.waitOnIODispatchers(longPollMs)
+                        // Only if the awaited event fired, query again the DB.
+                        if (notificationArrived)
+                        {
+                            ret = transaction {
+                                // Refreshing to update the index to the very 
last transaction.
+                                historyParams.bankAccount.refresh()
+                                extractTxHistory(historyParams)
+                            }
+                        }
                     }
                     call.respond(object {val transactions = ret})
                     return@get
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
index b9344405..748962d5 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
@@ -224,6 +224,19 @@ fun wireTransfer(
             this.demobank = demobank
             this.pmtInfId = pmtInfId
         }
+        // Signaling this wire transfer's event.
+        if (this.isPostgres()) {
+            val creditChannel = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+                creditAccount.label
+            )
+            this.postgresNotify(creditChannel, "CRDT")
+            val debitChannel = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+                debitAccount.label
+            )
+            this.postgresNotify(debitChannel, "DBIT")
+        }
     }
     return transactionRef
 }

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]