[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] 06/07: DB events at Access API.
From: |
gnunet |
Subject: |
[libeufin] 06/07: DB events at Access API. |
Date: |
Wed, 12 Apr 2023 11:28:24 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
commit a35ce9012f2cd6a11f850da49c5ca6a92ee4a5fc
Author: MS <ms@taler.net>
AuthorDate: Wed Apr 12 11:21:52 2023 +0200
DB events at Access API.
Offering long polling for transactions download.
---
.../tech/libeufin/sandbox/ConversionService.kt | 72 ++++++++++++++++++++++
.../src/main/kotlin/tech/libeufin/sandbox/Main.kt | 53 +++++++++++++---
.../kotlin/tech/libeufin/sandbox/bankAccount.kt | 13 ++++
3 files changed, 129 insertions(+), 9 deletions(-)
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
new file mode 100644
index 00000000..1d256beb
--- /dev/null
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -0,0 +1,72 @@
+package tech.libeufin.sandbox
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+
+/**
+ * This file contains the logic for downloading/submitting incoming/outgoing
+ * fiat transactions to Nexus. It needs the following values for operating.
+ *
+ * 1. Nexus URL.
+ * 2. Credentials to authenticate at Nexus JSON API.
+ * 3. Long-polling interval.
+ * 4. Frequency of the download loop.
+ *
+ * Notes:
+ *
+ * 1. The account to credit on incoming transactions is ALWAYS "admin".
+ * 2. The time to submit a new payment is as soon as "admin" receives one
+ * incoming regional payment.
+ * 3. At this time, Nexus does NOT offer long polling when it serves the
+ * transactions via its JSON API.
+ * 4. At this time, Nexus does NOT offer any filter when it serves the
+ * transactions via its JSON API.
+ */
+
+// Temporarily hard-coded. According to fiat times, these values could be WAY
higher.
+val longPollMs = 30000L // 30s long-polling.
+val loopNewReqMs = 2000L // 2s for the next request.
+
+/**
+ * Executes the 'block' function every 'loopNewReqMs' milliseconds.
+ * Does not exit/fail the process upon exceptions - just logs them.
+ */
+fun downloadLoop(block: () -> Unit) {
+ // Needs "runBlocking {}" to call "delay()" and in case 'block'
+ // contains suspend functions.
+ runBlocking {
+ while(true) {
+ try { block() }
+ catch (e: Exception) {
+ /**
+ * Not exiting to tolerate network issues, or optimistically
+ * tolerate problems not caused by Sandbox itself.
+ */
+ logger.error("Sandbox fiat-incoming monitor excepted:
${e.message}")
+ }
+ delay(loopNewReqMs)
+ }
+ }
+}
+
+/**
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and signals their arrival
(LIBEUFIN_FIAT_INCOMING)
+ * to allow crediting the "admin" account.
+ */
+// fetchTransactions()
+
+/**
+ * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
+ * and credits the "admin" account as a reaction. Lastly, the Nexus instance
+ * wired to Sandbox will pick the new payment and serve it via its TWG, but
+ * this is OUT of the Sandbox scope.
+ */
+// creditAdmin()
+
+/**
+ * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING)
+ * and submits the related cash-out payment to Nexus. The fiat payment will
+ * then take place ENTIRELY on Nexus' responsibility.
+ */
+// issueCashout()
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
index 856cbbeb..f11d8cb3 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
@@ -47,6 +47,7 @@ import io.ktor.server.util.*
import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.util.date.*
+import org.jetbrains.exposed.dao.flushCache
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.statements.api.ExposedBlob
import org.jetbrains.exposed.sql.transactions.transaction
@@ -1530,16 +1531,50 @@ val sandboxApp: Application.() -> Unit = {
if (fromMs < 0) throw badRequest("'from_ms' param is less
than 0")
val untilMs =
expectLong(call.request.queryParameters["until_ms"] ?:
Long.MAX_VALUE.toString())
if (untilMs < 0) throw badRequest("'until_ms' param is
less than 0")
- val ret: List<XLibeufinBankTransaction> = transaction {
- extractTxHistory(
- HistoryParams(
- pageNumber = page,
- pageSize = size,
- bankAccount = bankAccount,
- fromMs = fromMs,
- untilMs = untilMs
- )
+ val longPollMs: Long? = call.maybeLong("long_poll_ms")
+ // LISTEN, if Postgres.
+ val listenHandle = if (isPostgres() && longPollMs != null)
{
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ call.expectUriComponent("account_name")
)
+ val listenHandle = PostgresListenHandle(channelName)
+ // Can't LISTEN on the same DB TX that checks for
data, as Exposed
+ // closes that connection and the notification getter
would fail.
+ // Can't invoke the notification getter in the same DB
TX either,
+ // as it would block the DB.
+ listenHandle.postgresListen()
+ listenHandle
+ } else null
+ val historyParams = HistoryParams(
+ pageNumber = page,
+ pageSize = size,
+ bankAccount = bankAccount,
+ fromMs = fromMs,
+ untilMs = untilMs
+ )
+ var ret: List<XLibeufinBankTransaction> = transaction {
+ extractTxHistory(historyParams)
+ }
+ // Data was found already, UNLISTEN and respond.
+ if (listenHandle != null && ret.isNotEmpty()) {
+ listenHandle.postgresUnlisten()
+ call.respond(object {val transactions = ret})
+ return@get
+ }
+ // No data was found, sleep until the timeout or getting
woken up.
+ // Third condition only silences the compiler.
+ if (listenHandle != null && ret.isEmpty() && longPollMs !=
null) {
+ val notificationArrived =
listenHandle.waitOnIODispatchers(longPollMs)
+ // Only if the awaited event fired, query again the DB.
+ if (notificationArrived)
+ {
+ ret = transaction {
+ // Refreshing to update the index to the very
last transaction.
+ historyParams.bankAccount.refresh()
+ extractTxHistory(historyParams)
+ }
+ }
}
call.respond(object {val transactions = ret})
return@get
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
index b9344405..748962d5 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
@@ -224,6 +224,19 @@ fun wireTransfer(
this.demobank = demobank
this.pmtInfId = pmtInfId
}
+ // Signaling this wire transfer's event.
+ if (this.isPostgres()) {
+ val creditChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ creditAccount.label
+ )
+ this.postgresNotify(creditChannel, "CRDT")
+ val debitChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ debitAccount.label
+ )
+ this.postgresNotify(debitChannel, "DBIT")
+ }
}
return transactionRef
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [libeufin] branch master updated (1b9fbb02 -> bcf54117), gnunet, 2023/04/12
- [libeufin] 02/07: testing DB events, gnunet, 2023/04/12
- [libeufin] 01/07: Helpers., gnunet, 2023/04/12
- [libeufin] 04/07: comments, indentation., gnunet, 2023/04/12
- [libeufin] 07/07: comment, gnunet, 2023/04/12
- [libeufin] 05/07: DB events., gnunet, 2023/04/12
- [libeufin] 03/07: helper, gnunet, 2023/04/12
- [libeufin] 06/07: DB events at Access API.,
gnunet <=