gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 05/07: DB events.


From: gnunet
Subject: [libeufin] 05/07: DB events.
Date: Wed, 12 Apr 2023 11:28:23 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit 382c5fc8c0008160f7ed4e85c49053164f2c6915
Author: MS <ms@taler.net>
AuthorDate: Wed Apr 12 11:11:09 2023 +0200

    DB events.
    
    Implementing a poller that only succeeds if the channel
    name and its payload are expected.
---
 util/src/main/kotlin/DB.kt | 120 +++++++++++++++++++++++++++++++++++++++------
 1 file changed, 104 insertions(+), 16 deletions(-)

diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index a4ccbda3..59737a21 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -18,18 +18,13 @@
  */
 
 package tech.libeufin.util
-import UtilError
-import io.ktor.http.*
+import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.coroutineScope
 import logger
 import net.taler.wallet.crypto.Base32Crockford
-import org.jetbrains.exposed.sql.Database
 import org.jetbrains.exposed.sql.Transaction
 import org.jetbrains.exposed.sql.transactions.TransactionManager
-import org.jetbrains.exposed.sql.transactions.transaction
-import org.jetbrains.exposed.sql.transactions.transactionManager
-import org.postgresql.PGNotification
 import org.postgresql.jdbc.PgConnection
 
 fun Transaction.isPostgres(): Boolean {
@@ -46,22 +41,51 @@ fun isPostgres(): Boolean {
 
 // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
 enum class NotificationsChannelDomains(val value: Int) {
-    LIBEUFIN_TALER_INCOMING(3000)
+    // When payments with well-formed Taler subject arrive.
+    LIBEUFIN_TALER_INCOMING(3000),
+    // A transaction happened for a particular user.  The payload
+    // informs about the direction.
+    LIBEUFIN_REGIO_TX(3001),
+    // When an incoming fiat payment is downloaded from Nexus.
+    // Happens when a customer wants to withdraw Taler coins in the
+    // regional currency.
+    LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
+    // When Nexus discovers a new transactions from the bank it
+    // is connected to.  This even may wake up a client who is waiting
+    // on Nexus' GET /transactions.
+    LIBEUFIN_NEXUS_FIAT_INCOMING(3003)
 }
 
-// Helper that builds a LISTEN-NOTIFY channel name.
+/**
+ * Helper that builds a LISTEN-NOTIFY channel name.
+ * 'salt' should be any value that would uniquely deliver the
+ * message to its receiver.  IBANs are ideal, but they cost DB queries.
+ */
+
 fun buildChannelName(
     domain: NotificationsChannelDomains,
-    iban: String,
+    salt: String,
     separator: String = "_"
 ): String {
-    val channelElements = "${domain.value}$separator$iban"
+    val channelElements = "${domain.value}$separator$salt"
     val ret = 
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
-    logger.debug("Defining db channel name for IBAN: $iban, domain: 
${domain.name}, resulting in: $ret")
+    logger.debug("Defining db channel name for salt: $salt, domain: 
${domain.name}, resulting in: $ret")
     return ret
 }
 
-fun Transaction.postgresNotify(channel: String) {
+fun Transaction.postgresNotify(
+    channel: String,
+    payload: String? = null
+    ) {
+    if (payload != null) {
+        val argEnc = Base32Crockford.encode(payload.toByteArray())
+        if (payload.toByteArray().size > 8000)
+            throw internalServerError(
+                "DB notification on channel $channel used >8000 bytes payload 
'$payload'"
+            )
+        this.exec("NOTIFY $channel, '$argEnc'")
+        return
+    }
     this.exec("NOTIFY $channel")
 }
 
@@ -85,6 +109,12 @@ class PostgresListenHandle(val channelName: String) {
         "Could not find the default database, won't get Postgres 
notifications."
     )
     private val conn = db.connector().connection as PgConnection
+    // Gets set to the NOTIFY's payload, in case one exists.
+    var receivedPayload: String? = null
+    // Signals whether the connection should be kept open,
+    // after one (and possibly not expected) event arrives.
+    // This gives more flexibility to the caller.
+    var keepConnection: Boolean = false
 
     fun postgresListen() {
         val stmt = conn.createStatement()
@@ -100,7 +130,16 @@ class PostgresListenHandle(val channelName: String) {
         conn.close()
     }
 
-    fun postgresGetNotifications(timeoutMs: Long): Boolean {
+    private fun likelyCloseConnection() {
+        if (this.keepConnection)
+            return
+        this.conn.close()
+    }
+
+    fun postgresGetNotifications(
+        timeoutMs: Long,
+        keepConnectionOpen: Boolean = false
+        ): Boolean {
         if (timeoutMs == 0L)
             logger.warn("Database notification checker has timeout == 0," +
                     " that waits FOREVER until a notification arrives."
@@ -110,17 +149,66 @@ class PostgresListenHandle(val channelName: String) {
         val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
         if (maybeNotifications == null || maybeNotifications.isEmpty()) {
             logger.debug("DB notification channel $channelName was found 
empty.")
-            conn.close()
+            this.likelyCloseConnection()
             return false
         }
         for (n in maybeNotifications) {
             if (n.name.lowercase() != channelName.lowercase()) {
-                conn.close()
+                conn.close() // always close on error, without the optional 
check.
                 throw internalServerError("Channel $channelName got notified 
from ${n.name}!")
             }
         }
         logger.debug("Found DB notifications on channel $channelName")
-        conn.close()
+        // Only ever used for singleton notifications.
+        assert(maybeNotifications.size == 1)
+        if(maybeNotifications[0].parameter.isNotEmpty())
+            this.receivedPayload = maybeNotifications[0].parameter
+        this.likelyCloseConnection()
         return true
     }
+
+    // Wrapper around the core method "postgresGetNotifications()" that
+    // sets up the coroutine environment to wait and release the execution.
+    suspend fun waitOnIODispatchers(timeoutMs: Long): Boolean =
+        coroutineScope {
+            async(Dispatchers.IO) {
+                postgresGetNotifications(timeoutMs)
+            }.await()
+        }
+
+    /**
+     * Waits at most 'timeoutMs' on 'this.channelName' for
+     * the one particular payload that's passed in the 'payload'
+     * argument.  FIXME: will be used along the fiat side of cash-outs.
+     */
+    suspend fun waitOnIoDispatchersForPayload(
+        timeoutMs: Long,
+        expectedPayload: String
+    ): Boolean {
+        var leftTime = timeoutMs
+        val expectedPayloadEnc = 
Base32Crockford.encode(expectedPayload.toByteArray())
+        /**
+         * This setting allows the loop to reuse the open connection,
+         * otherwise the internal loop would close it if one unexpected
+         * payload wakes it up.
+         */
+        this.keepConnection = true
+        while (leftTime > 0) {
+            val loopStart = System.currentTimeMillis()
+            // Ask for notifications.
+            val maybeNotification = waitOnIODispatchers(leftTime)
+            // One arrived, check the payload.
+            if (maybeNotification) {
+                if (this.receivedPayload != null && this.receivedPayload == 
expectedPayloadEnc) {
+                    conn.close()
+                    return true
+                }
+            }
+            val loopEnd = System.currentTimeMillis()
+            // Account the spent time.
+            leftTime -= loopEnd - loopStart
+        }
+        conn.close()
+        return false
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]