[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] 05/07: DB events.
From: |
gnunet |
Subject: |
[libeufin] 05/07: DB events. |
Date: |
Wed, 12 Apr 2023 11:28:23 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
commit 382c5fc8c0008160f7ed4e85c49053164f2c6915
Author: MS <ms@taler.net>
AuthorDate: Wed Apr 12 11:11:09 2023 +0200
DB events.
Implementing a poller that only succeeds if the channel
name and its payload are expected.
---
util/src/main/kotlin/DB.kt | 120 +++++++++++++++++++++++++++++++++++++++------
1 file changed, 104 insertions(+), 16 deletions(-)
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index a4ccbda3..59737a21 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -18,18 +18,13 @@
*/
package tech.libeufin.util
-import UtilError
-import io.ktor.http.*
+import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import logger
import net.taler.wallet.crypto.Base32Crockford
-import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
-import org.jetbrains.exposed.sql.transactions.transaction
-import org.jetbrains.exposed.sql.transactions.transactionManager
-import org.postgresql.PGNotification
import org.postgresql.jdbc.PgConnection
fun Transaction.isPostgres(): Boolean {
@@ -46,22 +41,51 @@ fun isPostgres(): Boolean {
// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
enum class NotificationsChannelDomains(val value: Int) {
- LIBEUFIN_TALER_INCOMING(3000)
+ // When payments with well-formed Taler subject arrive.
+ LIBEUFIN_TALER_INCOMING(3000),
+ // A transaction happened for a particular user. The payload
+ // informs about the direction.
+ LIBEUFIN_REGIO_TX(3001),
+ // When an incoming fiat payment is downloaded from Nexus.
+ // Happens when a customer wants to withdraw Taler coins in the
+ // regional currency.
+ LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
+ // When Nexus discovers a new transactions from the bank it
+ // is connected to. This even may wake up a client who is waiting
+ // on Nexus' GET /transactions.
+ LIBEUFIN_NEXUS_FIAT_INCOMING(3003)
}
-// Helper that builds a LISTEN-NOTIFY channel name.
+/**
+ * Helper that builds a LISTEN-NOTIFY channel name.
+ * 'salt' should be any value that would uniquely deliver the
+ * message to its receiver. IBANs are ideal, but they cost DB queries.
+ */
+
fun buildChannelName(
domain: NotificationsChannelDomains,
- iban: String,
+ salt: String,
separator: String = "_"
): String {
- val channelElements = "${domain.value}$separator$iban"
+ val channelElements = "${domain.value}$separator$salt"
val ret =
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
- logger.debug("Defining db channel name for IBAN: $iban, domain:
${domain.name}, resulting in: $ret")
+ logger.debug("Defining db channel name for salt: $salt, domain:
${domain.name}, resulting in: $ret")
return ret
}
-fun Transaction.postgresNotify(channel: String) {
+fun Transaction.postgresNotify(
+ channel: String,
+ payload: String? = null
+ ) {
+ if (payload != null) {
+ val argEnc = Base32Crockford.encode(payload.toByteArray())
+ if (payload.toByteArray().size > 8000)
+ throw internalServerError(
+ "DB notification on channel $channel used >8000 bytes payload
'$payload'"
+ )
+ this.exec("NOTIFY $channel, '$argEnc'")
+ return
+ }
this.exec("NOTIFY $channel")
}
@@ -85,6 +109,12 @@ class PostgresListenHandle(val channelName: String) {
"Could not find the default database, won't get Postgres
notifications."
)
private val conn = db.connector().connection as PgConnection
+ // Gets set to the NOTIFY's payload, in case one exists.
+ var receivedPayload: String? = null
+ // Signals whether the connection should be kept open,
+ // after one (and possibly not expected) event arrives.
+ // This gives more flexibility to the caller.
+ var keepConnection: Boolean = false
fun postgresListen() {
val stmt = conn.createStatement()
@@ -100,7 +130,16 @@ class PostgresListenHandle(val channelName: String) {
conn.close()
}
- fun postgresGetNotifications(timeoutMs: Long): Boolean {
+ private fun likelyCloseConnection() {
+ if (this.keepConnection)
+ return
+ this.conn.close()
+ }
+
+ fun postgresGetNotifications(
+ timeoutMs: Long,
+ keepConnectionOpen: Boolean = false
+ ): Boolean {
if (timeoutMs == 0L)
logger.warn("Database notification checker has timeout == 0," +
" that waits FOREVER until a notification arrives."
@@ -110,17 +149,66 @@ class PostgresListenHandle(val channelName: String) {
val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
if (maybeNotifications == null || maybeNotifications.isEmpty()) {
logger.debug("DB notification channel $channelName was found
empty.")
- conn.close()
+ this.likelyCloseConnection()
return false
}
for (n in maybeNotifications) {
if (n.name.lowercase() != channelName.lowercase()) {
- conn.close()
+ conn.close() // always close on error, without the optional
check.
throw internalServerError("Channel $channelName got notified
from ${n.name}!")
}
}
logger.debug("Found DB notifications on channel $channelName")
- conn.close()
+ // Only ever used for singleton notifications.
+ assert(maybeNotifications.size == 1)
+ if(maybeNotifications[0].parameter.isNotEmpty())
+ this.receivedPayload = maybeNotifications[0].parameter
+ this.likelyCloseConnection()
return true
}
+
+ // Wrapper around the core method "postgresGetNotifications()" that
+ // sets up the coroutine environment to wait and release the execution.
+ suspend fun waitOnIODispatchers(timeoutMs: Long): Boolean =
+ coroutineScope {
+ async(Dispatchers.IO) {
+ postgresGetNotifications(timeoutMs)
+ }.await()
+ }
+
+ /**
+ * Waits at most 'timeoutMs' on 'this.channelName' for
+ * the one particular payload that's passed in the 'payload'
+ * argument. FIXME: will be used along the fiat side of cash-outs.
+ */
+ suspend fun waitOnIoDispatchersForPayload(
+ timeoutMs: Long,
+ expectedPayload: String
+ ): Boolean {
+ var leftTime = timeoutMs
+ val expectedPayloadEnc =
Base32Crockford.encode(expectedPayload.toByteArray())
+ /**
+ * This setting allows the loop to reuse the open connection,
+ * otherwise the internal loop would close it if one unexpected
+ * payload wakes it up.
+ */
+ this.keepConnection = true
+ while (leftTime > 0) {
+ val loopStart = System.currentTimeMillis()
+ // Ask for notifications.
+ val maybeNotification = waitOnIODispatchers(leftTime)
+ // One arrived, check the payload.
+ if (maybeNotification) {
+ if (this.receivedPayload != null && this.receivedPayload ==
expectedPayloadEnc) {
+ conn.close()
+ return true
+ }
+ }
+ val loopEnd = System.currentTimeMillis()
+ // Account the spent time.
+ leftTime -= loopEnd - loopStart
+ }
+ conn.close()
+ return false
+ }
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [libeufin] branch master updated (1b9fbb02 -> bcf54117), gnunet, 2023/04/12
- [libeufin] 02/07: testing DB events, gnunet, 2023/04/12
- [libeufin] 01/07: Helpers., gnunet, 2023/04/12
- [libeufin] 04/07: comments, indentation., gnunet, 2023/04/12
- [libeufin] 07/07: comment, gnunet, 2023/04/12
- [libeufin] 05/07: DB events.,
gnunet <=
- [libeufin] 03/07: helper, gnunet, 2023/04/12
- [libeufin] 06/07: DB events at Access API., gnunet, 2023/04/12