gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] branch master updated (1b9fbb02 -> bcf54117)


From: gnunet
Subject: [libeufin] branch master updated (1b9fbb02 -> bcf54117)
Date: Wed, 12 Apr 2023 11:28:18 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a change to branch master
in repository libeufin.

    from 1b9fbb02 comment
     new 090bc0e5 Helpers.
     new 1b9e30d6 testing DB events
     new 3f9689f2 helper
     new 8ca8c368 comments, indentation.
     new 382c5fc8 DB events.
     new a35ce901 DB events at Access API.
     new bcf54117 comment

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 cli/tests/wire-transfer.sh                         |   3 +-
 nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt |   4 +-
 nexus/src/test/kotlin/DbEventTest.kt               |  37 +++++++
 nexus/src/test/kotlin/SandboxAccessApiTest.kt      |  63 ++++++++++-
 nexus/src/test/kotlin/TalerTest.kt                 |   2 +-
 .../kotlin/tech/libeufin/sandbox/CircuitApi.kt     |   1 +
 .../tech/libeufin/sandbox/ConversionService.kt     |  72 +++++++++++++
 .../main/kotlin/tech/libeufin/sandbox/Helpers.kt   |   1 -
 .../src/main/kotlin/tech/libeufin/sandbox/Main.kt  |  74 +++++++------
 .../kotlin/tech/libeufin/sandbox/bankAccount.kt    |  57 ++++++++++
 util/src/main/kotlin/DB.kt                         | 120 ++++++++++++++++++---
 util/src/main/kotlin/HTTP.kt                       |  11 ++
 util/src/test/kotlin/ibanTest.kt                   |  10 --
 13 files changed, 390 insertions(+), 65 deletions(-)
 create mode 100644 nexus/src/test/kotlin/DbEventTest.kt
 create mode 100644 
sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
 delete mode 100644 util/src/test/kotlin/ibanTest.kt

diff --git a/cli/tests/wire-transfer.sh b/cli/tests/wire-transfer.sh
index afc6fec5..849ff44f 100755
--- a/cli/tests/wire-transfer.sh
+++ b/cli/tests/wire-transfer.sh
@@ -2,7 +2,8 @@
 
 set -eux
 
-# Pays the www Sandbox user, usually owned by the Exchange.
+# Pays the www Sandbox user, using one reserve pub
+# as the subject -- _in case_ Taler is being tested.
 RESERVE_PUB=$(gnunet-ecc -g1 /tmp/www &> /dev/null && gnunet-ecc -p /tmp/www)
 # Must match the one from launch_services.sh
 export 
LIBEUFIN_SANDBOX_DB_CONNECTION="jdbc:postgresql://localhost:5432/libeufincheck?user=$(whoami)"
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index ba37d8be..8cdbae02 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -506,15 +506,17 @@ private suspend fun historyIncoming(call: 
ApplicationCall) {
 
     /**
      * NOTE: the LISTEN command MAY also go inside this transaction,
-     * but that uses a connection other than the one provided by the
+     * but LISTEN uses a connection other than the one provided by the
      * transaction block.  More facts on the consequences are needed.
      */
     var result: List<TalerIncomingPaymentEntity> = transaction {
         TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
     }
+    // The request was lucky, unlisten then.
     if (result.isNotEmpty() && listenHandle != null)
         listenHandle.postgresUnlisten()
 
+    // The request was NOT lucky, wait now.
     if (result.isEmpty() && listenHandle != null && longPollTimeout != null) {
         logger.debug("Waiting for NOTIFY on channel 
${listenHandle.channelName}," +
                 " with timeout: $longPollTimeoutPar ms")
diff --git a/nexus/src/test/kotlin/DbEventTest.kt 
b/nexus/src/test/kotlin/DbEventTest.kt
new file mode 100644
index 00000000..745959b8
--- /dev/null
+++ b/nexus/src/test/kotlin/DbEventTest.kt
@@ -0,0 +1,37 @@
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.jetbrains.exposed.sql.transactions.transaction
+import org.junit.Test
+import tech.libeufin.util.PostgresListenHandle
+import tech.libeufin.util.postgresNotify
+
+
+class DbEventTest {
+
+    /**
+     * LISTENs to one DB channel but only wakes up
+     * if the payload is how expected.
+     */
+    @Test
+    fun payloadTest() {
+        withTestDatabase {
+            val listenHandle = PostgresListenHandle("X")
+            transaction { listenHandle.postgresListen() }
+            runBlocking {
+                launch {
+                    val isArrived = listenHandle.waitOnIoDispatchersForPayload(
+                        timeoutMs = 1000L,
+                        expectedPayload = "Y"
+                    )
+                    assert(isArrived)
+                }
+                launch {
+                    delay(500L); // Ensures the wait helper runs first.
+                    transaction { this.postgresNotify("X", "Y") }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/SandboxAccessApiTest.kt 
b/nexus/src/test/kotlin/SandboxAccessApiTest.kt
index ca4f2f08..175e479a 100644
--- a/nexus/src/test/kotlin/SandboxAccessApiTest.kt
+++ b/nexus/src/test/kotlin/SandboxAccessApiTest.kt
@@ -6,15 +6,24 @@ import io.ktor.client.statement.*
 import io.ktor.http.*
 import io.ktor.server.testing.*
 import io.ktor.util.*
+import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.time.delay
 import org.jetbrains.exposed.sql.and
 import org.jetbrains.exposed.sql.transactions.transaction
+import org.junit.Ignore
 import org.junit.Test
 import tech.libeufin.nexus.bankaccount.getBankAccount
 import tech.libeufin.sandbox.*
+import java.util.*
+import kotlin.concurrent.schedule
 
 class SandboxAccessApiTest {
     val mapper = ObjectMapper()
+    private fun getTxs(respJson: String): JsonNode {
+        val mapper = ObjectMapper()
+        return mapper.readTree(respJson).get("transactions")
+    }
 
     /**
      * Testing that ..access-api/withdrawals/{wopid} and
@@ -139,10 +148,6 @@ class SandboxAccessApiTest {
     // Tests the time range filter of Access API's GET /transactions
     @Test
     fun timeRangedTransactions() {
-        fun getTxs(respJson: String): JsonNode {
-            val mapper = ObjectMapper()
-            return mapper.readTree(respJson).get("transactions")
-        }
         withTestDatabase {
             prepSandboxDb()
             testApplication {
@@ -428,4 +433,54 @@ class SandboxAccessApiTest {
 
         }
     }
+
+    /**
+     * This test checks that the bank hangs before responding with the list
+     * of transactions, in case there is none to return.  The timing checks
+     * that the server hangs for as long as the unblocking payment takes place
+     * but NOT as long as the long_poll_ms parameter would suggest.  This last
+     * check ensures that the response can only contain the artificial 
unblocking
+     * payment (that happens after a certain timeout).
+     */
+    @Test
+    fun longPolledTransactions() {
+        val unblockingTxTimer = Timer()
+        val testStartTime = System.currentTimeMillis()
+        withTestDatabase {
+            prepSandboxDb()
+            testApplication {
+                application(sandboxApp)
+                runBlocking {
+                    launch {
+                        // long polls at most 50 seconds.
+                        val R = 
client.get("/demobanks/default/access-api/accounts/foo/transactions?long_poll_ms=50000")
 {
+                            expectSuccess = true
+                            basicAuth("foo", "foo")
+                        }
+                        assert(getTxs(R.bodyAsText()).size() == 1)
+                        val testEndTime = System.currentTimeMillis()
+                        val timeDiff = (testEndTime - testStartTime) / 1000L
+                        /**
+                         * Now checking that the server responded after the 
unblocking tx
+                         * took place and before the long poll timeout would 
occur.
+                         */
+                        println(timeDiff)
+                        assert(timeDiff in 4 .. 39)
+                    }
+                    unblockingTxTimer.schedule(
+                        delay = 4000L, // unblocks the server in (at least) 4 
seconds.
+                        action = {
+                            wireTransfer(
+                                "admin",
+                                "foo",
+                                "default",
+                                "#9",
+                                "TESTKUDOS:2"
+                            )
+                        }
+                    )
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/TalerTest.kt 
b/nexus/src/test/kotlin/TalerTest.kt
index 6565c197..ecaa7a0a 100644
--- a/nexus/src/test/kotlin/TalerTest.kt
+++ b/nexus/src/test/kotlin/TalerTest.kt
@@ -118,7 +118,6 @@ class TalerTest {
             prepNexusDb()
             testApplication {
                 application(nexusApp)
-                // This call blocks for 90 seconds
                 val currentTime = System.currentTimeMillis()
                 runBlocking {
                     launch {
@@ -133,6 +132,7 @@ class TalerTest {
                         expectSuccess = true
                     }
                     val latestTime = System.currentTimeMillis()
+                    // Checks that the call didn't hang for the whole 
long_poll_ms.
                     assert(R.status.value == HttpStatusCode.OK.value
                             && (latestTime - currentTime) < 2000
                     )
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
index d63dac33..d3786968 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
@@ -320,6 +320,7 @@ fun circuitApi(circuitRoute: Route) {
             )
             op.status = CashoutOperationStatus.CONFIRMED
             op.confirmationTime = getUTCnow().toInstant().toEpochMilli()
+            // TODO(signal this payment over LIBEUFIN_REGIO_INCOMING)
         }
         call.respond(HttpStatusCode.NoContent)
         return@post
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
new file mode 100644
index 00000000..1d256beb
--- /dev/null
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -0,0 +1,72 @@
+package tech.libeufin.sandbox
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+
+/**
+ * This file contains the logic for downloading/submitting incoming/outgoing
+ * fiat transactions to Nexus.  It needs the following values for operating.
+ *
+ * 1.  Nexus URL.
+ * 2.  Credentials to authenticate at Nexus JSON API.
+ * 3.  Long-polling interval.
+ * 4.  Frequency of the download loop.
+ *
+ * Notes:
+ *
+ * 1.  The account to credit on incoming transactions is ALWAYS "admin".
+ * 2.  The time to submit a new payment is as soon as "admin" receives one
+ *     incoming regional payment.
+ * 3.  At this time, Nexus does NOT offer long polling when it serves the
+ *     transactions via its JSON API.
+ * 4.  At this time, Nexus does NOT offer any filter when it serves the
+ *     transactions via its JSON API.
+ */
+
+// Temporarily hard-coded.  According to fiat times, these values could be WAY 
higher.
+val longPollMs = 30000L // 30s long-polling.
+val loopNewReqMs = 2000L // 2s for the next request.
+
+/**
+ * Executes the 'block' function every 'loopNewReqMs' milliseconds.
+ * Does not exit/fail the process upon exceptions - just logs them.
+ */
+fun downloadLoop(block: () -> Unit) {
+    // Needs "runBlocking {}" to call "delay()" and in case 'block'
+    // contains suspend functions.
+    runBlocking {
+        while(true) {
+            try { block() }
+            catch (e: Exception) {
+                /**
+                 * Not exiting to tolerate network issues, or optimistically
+                 * tolerate problems not caused by Sandbox itself.
+                 */
+                logger.error("Sandbox fiat-incoming monitor excepted: 
${e.message}")
+            }
+            delay(loopNewReqMs)
+        }
+    }
+}
+
+/**
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and signals their arrival 
(LIBEUFIN_FIAT_INCOMING)
+ * to allow crediting the "admin" account.
+ */
+// fetchTransactions()
+
+/**
+ * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
+ * and credits the "admin" account as a reaction.  Lastly, the Nexus instance
+ * wired to Sandbox will pick the new payment and serve it via its TWG, but
+ * this is OUT of the Sandbox scope.
+ */
+// creditAdmin()
+
+/**
+ * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING)
+ * and submits the related cash-out payment to Nexus.  The fiat payment will
+ * then take place ENTIRELY on Nexus' responsibility.
+ */
+// issueCashout()
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
index 54519bbb..a2454ff4 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
@@ -460,5 +460,4 @@ fun prepareEbicsPayload(
     }
     val enc = CryptoUtil.encryptEbicsE002(compressedResponse, pub)
     return Pair(Base64.getEncoder().encodeToString(enc.encryptedData), enc)
-
 }
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
index 31856c09..f11d8cb3 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
@@ -47,6 +47,7 @@ import io.ktor.server.util.*
 import io.ktor.server.plugins.callloging.*
 import io.ktor.server.plugins.cors.routing.*
 import io.ktor.util.date.*
+import org.jetbrains.exposed.dao.flushCache
 import org.jetbrains.exposed.sql.*
 import org.jetbrains.exposed.sql.statements.api.ExposedBlob
 import org.jetbrains.exposed.sql.transactions.transaction
@@ -1530,37 +1531,48 @@ val sandboxApp: Application.() -> Unit = {
                     if (fromMs < 0) throw badRequest("'from_ms' param is less 
than 0")
                     val untilMs = 
expectLong(call.request.queryParameters["until_ms"] ?: 
Long.MAX_VALUE.toString())
                     if (untilMs < 0) throw badRequest("'until_ms' param is 
less than 0")
-                    val ret = mutableListOf<XLibeufinBankTransaction>()
-                    /**
-                     * Case where page number wasn't given,
-                     * therefore the results starts from the last transaction.
-                     */
-                    transaction {
-                        /**
-                         * Get a history page - from the calling bank account 
- having
-                         * 'firstElementId' as the latest transaction in it.  
*/
-                        fun getPage(firstElementId: Long): 
Iterable<BankAccountTransactionEntity> {
-                            logger.debug("Trying to build pageBuf from ID: 
$firstElementId," +
-                                    " including $size txs in the past."
-                            )
-                            return BankAccountTransactionEntity.find {
-                                (BankAccountTransactionsTable.id lessEq 
firstElementId) and
-                                        (BankAccountTransactionsTable.account 
eq bankAccount.id) and
-                                        
(BankAccountTransactionsTable.date.between(fromMs, untilMs))
-                            }.sortedByDescending { it.id.value }.take(size)
-                        }
-                        val lt: BankAccountTransactionEntity? = 
bankAccount.lastTransaction
-                        if (lt == null) return@transaction
-                        var nextPageIdUpperLimit: Long = lt.id.value
-                        // This loop fetches (and discards) pages until the 
desired one is found.
-                        for (i in 1..(page)) {
-                            val pageBuf = getPage(nextPageIdUpperLimit)
-                            logger.debug("pageBuf #$i follows.  Request wants 
#$page:")
-                            pageBuf.forEach { logger.debug("ID: ${it.id}, 
subject: ${it.subject}, amount: ${it.currency}:${it.amount}") }
-                            if (pageBuf.none()) return@transaction
-                            nextPageIdUpperLimit = pageBuf.last().id.value - 1
-                            if (i == page) pageBuf.forEach {
-                                
ret.add(getHistoryElementFromTransactionRow(it))
+                    val longPollMs: Long? = call.maybeLong("long_poll_ms")
+                    // LISTEN, if Postgres.
+                    val listenHandle = if (isPostgres() && longPollMs != null) 
{
+                        val channelName = buildChannelName(
+                            NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+                            call.expectUriComponent("account_name")
+                        )
+                        val listenHandle = PostgresListenHandle(channelName)
+                        // Can't LISTEN on the same DB TX that checks for 
data, as Exposed
+                        // closes that connection and the notification getter 
would fail.
+                        // Can't invoke the notification getter in the same DB 
TX either,
+                        // as it would block the DB.
+                        listenHandle.postgresListen()
+                        listenHandle
+                    } else null
+                    val historyParams = HistoryParams(
+                        pageNumber = page,
+                        pageSize = size,
+                        bankAccount = bankAccount,
+                        fromMs = fromMs,
+                        untilMs = untilMs
+                    )
+                    var ret: List<XLibeufinBankTransaction> = transaction {
+                        extractTxHistory(historyParams)
+                    }
+                    // Data was found already, UNLISTEN and respond.
+                    if (listenHandle != null && ret.isNotEmpty()) {
+                        listenHandle.postgresUnlisten()
+                        call.respond(object {val transactions = ret})
+                        return@get
+                    }
+                    // No data was found, sleep until the timeout or getting 
woken up.
+                    // Third condition only silences the compiler.
+                    if (listenHandle != null && ret.isEmpty() && longPollMs != 
null) {
+                        val notificationArrived = 
listenHandle.waitOnIODispatchers(longPollMs)
+                        // Only if the awaited event fired, query again the DB.
+                        if (notificationArrived)
+                        {
+                            ret = transaction {
+                                // Refreshing to update the index to the very 
last transaction.
+                                historyParams.bankAccount.refresh()
+                                extractTxHistory(historyParams)
                             }
                         }
                     }
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
index e6559897..748962d5 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
@@ -224,6 +224,63 @@ fun wireTransfer(
             this.demobank = demobank
             this.pmtInfId = pmtInfId
         }
+        // Signaling this wire transfer's event.
+        if (this.isPostgres()) {
+            val creditChannel = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+                creditAccount.label
+            )
+            this.postgresNotify(creditChannel, "CRDT")
+            val debitChannel = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+                debitAccount.label
+            )
+            this.postgresNotify(debitChannel, "DBIT")
+        }
     }
     return transactionRef
+}
+
+/**
+ * Helper that constructs a transactions history page
+ * according to the URI parameters passed to Access API's
+ * GET /transactions.
+ */
+data class HistoryParams(
+    val pageNumber: Int,
+    val pageSize: Int,
+    val fromMs: Long,
+    val untilMs: Long,
+    val bankAccount: BankAccountEntity
+)
+fun extractTxHistory(params: HistoryParams): List<XLibeufinBankTransaction> {
+    val ret = mutableListOf<XLibeufinBankTransaction>()
+    /**
+     * Helper that gets transactions earlier than the 'firstElementId'
+     * transaction AND that match the URI parameters.
+     */
+    fun getPage(firstElementId: Long): Iterable<BankAccountTransactionEntity> {
+        return BankAccountTransactionEntity.find {
+            (BankAccountTransactionsTable.id lessEq firstElementId) and
+                    (BankAccountTransactionsTable.account eq 
params.bankAccount.id) and
+                    (BankAccountTransactionsTable.date.between(params.fromMs, 
params.untilMs))
+        }.sortedByDescending { it.id.value }.take(params.pageSize)
+    }
+    // Gets a pointer to the last transaction of this bank account.
+    val lastTransaction: BankAccountTransactionEntity? = 
params.bankAccount.lastTransaction
+    if (lastTransaction == null) return ret
+    var nextPageIdUpperLimit: Long = lastTransaction.id.value
+
+    // This loop fetches (and discards) pages until the desired one is found.
+    for (i in 1..(params.pageNumber)) {
+        val pageBuf = getPage(nextPageIdUpperLimit)
+        logger.debug("pageBuf #$i follows.  Request wants 
#${params.pageNumber}:")
+        pageBuf.forEach { logger.debug("ID: ${it.id}, subject: ${it.subject}, 
amount: ${it.currency}:${it.amount}") }
+        if (pageBuf.none()) return ret
+        nextPageIdUpperLimit = pageBuf.last().id.value - 1
+        if (i == params.pageNumber) pageBuf.forEach {
+            ret.add(getHistoryElementFromTransactionRow(it))
+        }
+    }
+    return ret
 }
\ No newline at end of file
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index a4ccbda3..59737a21 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -18,18 +18,13 @@
  */
 
 package tech.libeufin.util
-import UtilError
-import io.ktor.http.*
+import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.coroutineScope
 import logger
 import net.taler.wallet.crypto.Base32Crockford
-import org.jetbrains.exposed.sql.Database
 import org.jetbrains.exposed.sql.Transaction
 import org.jetbrains.exposed.sql.transactions.TransactionManager
-import org.jetbrains.exposed.sql.transactions.transaction
-import org.jetbrains.exposed.sql.transactions.transactionManager
-import org.postgresql.PGNotification
 import org.postgresql.jdbc.PgConnection
 
 fun Transaction.isPostgres(): Boolean {
@@ -46,22 +41,51 @@ fun isPostgres(): Boolean {
 
 // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
 enum class NotificationsChannelDomains(val value: Int) {
-    LIBEUFIN_TALER_INCOMING(3000)
+    // When payments with well-formed Taler subject arrive.
+    LIBEUFIN_TALER_INCOMING(3000),
+    // A transaction happened for a particular user.  The payload
+    // informs about the direction.
+    LIBEUFIN_REGIO_TX(3001),
+    // When an incoming fiat payment is downloaded from Nexus.
+    // Happens when a customer wants to withdraw Taler coins in the
+    // regional currency.
+    LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
+    // When Nexus discovers a new transactions from the bank it
+    // is connected to.  This even may wake up a client who is waiting
+    // on Nexus' GET /transactions.
+    LIBEUFIN_NEXUS_FIAT_INCOMING(3003)
 }
 
-// Helper that builds a LISTEN-NOTIFY channel name.
+/**
+ * Helper that builds a LISTEN-NOTIFY channel name.
+ * 'salt' should be any value that would uniquely deliver the
+ * message to its receiver.  IBANs are ideal, but they cost DB queries.
+ */
+
 fun buildChannelName(
     domain: NotificationsChannelDomains,
-    iban: String,
+    salt: String,
     separator: String = "_"
 ): String {
-    val channelElements = "${domain.value}$separator$iban"
+    val channelElements = "${domain.value}$separator$salt"
     val ret = 
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
-    logger.debug("Defining db channel name for IBAN: $iban, domain: 
${domain.name}, resulting in: $ret")
+    logger.debug("Defining db channel name for salt: $salt, domain: 
${domain.name}, resulting in: $ret")
     return ret
 }
 
-fun Transaction.postgresNotify(channel: String) {
+fun Transaction.postgresNotify(
+    channel: String,
+    payload: String? = null
+    ) {
+    if (payload != null) {
+        val argEnc = Base32Crockford.encode(payload.toByteArray())
+        if (payload.toByteArray().size > 8000)
+            throw internalServerError(
+                "DB notification on channel $channel used >8000 bytes payload 
'$payload'"
+            )
+        this.exec("NOTIFY $channel, '$argEnc'")
+        return
+    }
     this.exec("NOTIFY $channel")
 }
 
@@ -85,6 +109,12 @@ class PostgresListenHandle(val channelName: String) {
         "Could not find the default database, won't get Postgres 
notifications."
     )
     private val conn = db.connector().connection as PgConnection
+    // Gets set to the NOTIFY's payload, in case one exists.
+    var receivedPayload: String? = null
+    // Signals whether the connection should be kept open,
+    // after one (and possibly not expected) event arrives.
+    // This gives more flexibility to the caller.
+    var keepConnection: Boolean = false
 
     fun postgresListen() {
         val stmt = conn.createStatement()
@@ -100,7 +130,16 @@ class PostgresListenHandle(val channelName: String) {
         conn.close()
     }
 
-    fun postgresGetNotifications(timeoutMs: Long): Boolean {
+    private fun likelyCloseConnection() {
+        if (this.keepConnection)
+            return
+        this.conn.close()
+    }
+
+    fun postgresGetNotifications(
+        timeoutMs: Long,
+        keepConnectionOpen: Boolean = false
+        ): Boolean {
         if (timeoutMs == 0L)
             logger.warn("Database notification checker has timeout == 0," +
                     " that waits FOREVER until a notification arrives."
@@ -110,17 +149,66 @@ class PostgresListenHandle(val channelName: String) {
         val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
         if (maybeNotifications == null || maybeNotifications.isEmpty()) {
             logger.debug("DB notification channel $channelName was found 
empty.")
-            conn.close()
+            this.likelyCloseConnection()
             return false
         }
         for (n in maybeNotifications) {
             if (n.name.lowercase() != channelName.lowercase()) {
-                conn.close()
+                conn.close() // always close on error, without the optional 
check.
                 throw internalServerError("Channel $channelName got notified 
from ${n.name}!")
             }
         }
         logger.debug("Found DB notifications on channel $channelName")
-        conn.close()
+        // Only ever used for singleton notifications.
+        assert(maybeNotifications.size == 1)
+        if(maybeNotifications[0].parameter.isNotEmpty())
+            this.receivedPayload = maybeNotifications[0].parameter
+        this.likelyCloseConnection()
         return true
     }
+
+    // Wrapper around the core method "postgresGetNotifications()" that
+    // sets up the coroutine environment to wait and release the execution.
+    suspend fun waitOnIODispatchers(timeoutMs: Long): Boolean =
+        coroutineScope {
+            async(Dispatchers.IO) {
+                postgresGetNotifications(timeoutMs)
+            }.await()
+        }
+
+    /**
+     * Waits at most 'timeoutMs' on 'this.channelName' for
+     * the one particular payload that's passed in the 'payload'
+     * argument.  FIXME: will be used along the fiat side of cash-outs.
+     */
+    suspend fun waitOnIoDispatchersForPayload(
+        timeoutMs: Long,
+        expectedPayload: String
+    ): Boolean {
+        var leftTime = timeoutMs
+        val expectedPayloadEnc = 
Base32Crockford.encode(expectedPayload.toByteArray())
+        /**
+         * This setting allows the loop to reuse the open connection,
+         * otherwise the internal loop would close it if one unexpected
+         * payload wakes it up.
+         */
+        this.keepConnection = true
+        while (leftTime > 0) {
+            val loopStart = System.currentTimeMillis()
+            // Ask for notifications.
+            val maybeNotification = waitOnIODispatchers(leftTime)
+            // One arrived, check the payload.
+            if (maybeNotification) {
+                if (this.receivedPayload != null && this.receivedPayload == 
expectedPayloadEnc) {
+                    conn.close()
+                    return true
+                }
+            }
+            val loopEnd = System.currentTimeMillis()
+            // Account the spent time.
+            leftTime -= loopEnd - loopStart
+        }
+        conn.close()
+        return false
+    }
 }
\ No newline at end of file
diff --git a/util/src/main/kotlin/HTTP.kt b/util/src/main/kotlin/HTTP.kt
index d9ff4491..30a15bd9 100644
--- a/util/src/main/kotlin/HTTP.kt
+++ b/util/src/main/kotlin/HTTP.kt
@@ -9,6 +9,7 @@ import io.ktor.server.util.*
 import io.ktor.util.*
 import logger
 import java.net.URLDecoder
+import kotlin.reflect.typeOf
 
 fun unauthorized(msg: String): UtilError {
     return UtilError(
@@ -208,4 +209,14 @@ fun expectLong(uriParam: String): Long {
         logger.error(e.message)
         throw badRequest("'$uriParam' is not Long")
     }
+}
+
+// Returns null, or tries to convert the parameter to type T.
+// Throws Bad Request, if the conversion could not be done.
+fun ApplicationCall.maybeLong(uriParamName: String): Long? {
+    val maybeParam = this.parameters[uriParamName] ?: return null
+    return try { maybeParam.toLong() }
+    catch (e: Exception) {
+        throw badRequest("Could not convert '$uriParamName' to Long")
+    }
 }
\ No newline at end of file
diff --git a/util/src/test/kotlin/ibanTest.kt b/util/src/test/kotlin/ibanTest.kt
deleted file mode 100644
index 964f822d..00000000
--- a/util/src/test/kotlin/ibanTest.kt
+++ /dev/null
@@ -1,10 +0,0 @@
-import org.junit.Test
-import tech.libeufin.util.getIban
-
-class IbanTest {
-
-    @Test
-    fun genIban() {
-        println(getIban())
-    }
-}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]