[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] branch master updated (1b9fbb02 -> bcf54117)
From: |
gnunet |
Subject: |
[libeufin] branch master updated (1b9fbb02 -> bcf54117) |
Date: |
Wed, 12 Apr 2023 11:28:18 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a change to branch master
in repository libeufin.
from 1b9fbb02 comment
new 090bc0e5 Helpers.
new 1b9e30d6 testing DB events
new 3f9689f2 helper
new 8ca8c368 comments, indentation.
new 382c5fc8 DB events.
new a35ce901 DB events at Access API.
new bcf54117 comment
The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
cli/tests/wire-transfer.sh | 3 +-
nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 4 +-
nexus/src/test/kotlin/DbEventTest.kt | 37 +++++++
nexus/src/test/kotlin/SandboxAccessApiTest.kt | 63 ++++++++++-
nexus/src/test/kotlin/TalerTest.kt | 2 +-
.../kotlin/tech/libeufin/sandbox/CircuitApi.kt | 1 +
.../tech/libeufin/sandbox/ConversionService.kt | 72 +++++++++++++
.../main/kotlin/tech/libeufin/sandbox/Helpers.kt | 1 -
.../src/main/kotlin/tech/libeufin/sandbox/Main.kt | 74 +++++++------
.../kotlin/tech/libeufin/sandbox/bankAccount.kt | 57 ++++++++++
util/src/main/kotlin/DB.kt | 120 ++++++++++++++++++---
util/src/main/kotlin/HTTP.kt | 11 ++
util/src/test/kotlin/ibanTest.kt | 10 --
13 files changed, 390 insertions(+), 65 deletions(-)
create mode 100644 nexus/src/test/kotlin/DbEventTest.kt
create mode 100644
sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
delete mode 100644 util/src/test/kotlin/ibanTest.kt
diff --git a/cli/tests/wire-transfer.sh b/cli/tests/wire-transfer.sh
index afc6fec5..849ff44f 100755
--- a/cli/tests/wire-transfer.sh
+++ b/cli/tests/wire-transfer.sh
@@ -2,7 +2,8 @@
set -eux
-# Pays the www Sandbox user, usually owned by the Exchange.
+# Pays the www Sandbox user, using one reserve pub
+# as the subject -- _in case_ Taler is being tested.
RESERVE_PUB=$(gnunet-ecc -g1 /tmp/www &> /dev/null && gnunet-ecc -p /tmp/www)
# Must match the one from launch_services.sh
export
LIBEUFIN_SANDBOX_DB_CONNECTION="jdbc:postgresql://localhost:5432/libeufincheck?user=$(whoami)"
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index ba37d8be..8cdbae02 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -506,15 +506,17 @@ private suspend fun historyIncoming(call:
ApplicationCall) {
/**
* NOTE: the LISTEN command MAY also go inside this transaction,
- * but that uses a connection other than the one provided by the
+ * but LISTEN uses a connection other than the one provided by the
* transaction block. More facts on the consequences are needed.
*/
var result: List<TalerIncomingPaymentEntity> = transaction {
TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
}
+ // The request was lucky, unlisten then.
if (result.isNotEmpty() && listenHandle != null)
listenHandle.postgresUnlisten()
+ // The request was NOT lucky, wait now.
if (result.isEmpty() && listenHandle != null && longPollTimeout != null) {
logger.debug("Waiting for NOTIFY on channel
${listenHandle.channelName}," +
" with timeout: $longPollTimeoutPar ms")
diff --git a/nexus/src/test/kotlin/DbEventTest.kt
b/nexus/src/test/kotlin/DbEventTest.kt
new file mode 100644
index 00000000..745959b8
--- /dev/null
+++ b/nexus/src/test/kotlin/DbEventTest.kt
@@ -0,0 +1,37 @@
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.jetbrains.exposed.sql.transactions.transaction
+import org.junit.Test
+import tech.libeufin.util.PostgresListenHandle
+import tech.libeufin.util.postgresNotify
+
+
+class DbEventTest {
+
+ /**
+ * LISTENs to one DB channel but only wakes up
+ * if the payload is how expected.
+ */
+ @Test
+ fun payloadTest() {
+ withTestDatabase {
+ val listenHandle = PostgresListenHandle("X")
+ transaction { listenHandle.postgresListen() }
+ runBlocking {
+ launch {
+ val isArrived = listenHandle.waitOnIoDispatchersForPayload(
+ timeoutMs = 1000L,
+ expectedPayload = "Y"
+ )
+ assert(isArrived)
+ }
+ launch {
+ delay(500L); // Ensures the wait helper runs first.
+ transaction { this.postgresNotify("X", "Y") }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/SandboxAccessApiTest.kt
b/nexus/src/test/kotlin/SandboxAccessApiTest.kt
index ca4f2f08..175e479a 100644
--- a/nexus/src/test/kotlin/SandboxAccessApiTest.kt
+++ b/nexus/src/test/kotlin/SandboxAccessApiTest.kt
@@ -6,15 +6,24 @@ import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.server.testing.*
import io.ktor.util.*
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.time.delay
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
+import org.junit.Ignore
import org.junit.Test
import tech.libeufin.nexus.bankaccount.getBankAccount
import tech.libeufin.sandbox.*
+import java.util.*
+import kotlin.concurrent.schedule
class SandboxAccessApiTest {
val mapper = ObjectMapper()
+ private fun getTxs(respJson: String): JsonNode {
+ val mapper = ObjectMapper()
+ return mapper.readTree(respJson).get("transactions")
+ }
/**
* Testing that ..access-api/withdrawals/{wopid} and
@@ -139,10 +148,6 @@ class SandboxAccessApiTest {
// Tests the time range filter of Access API's GET /transactions
@Test
fun timeRangedTransactions() {
- fun getTxs(respJson: String): JsonNode {
- val mapper = ObjectMapper()
- return mapper.readTree(respJson).get("transactions")
- }
withTestDatabase {
prepSandboxDb()
testApplication {
@@ -428,4 +433,54 @@ class SandboxAccessApiTest {
}
}
+
+ /**
+ * This test checks that the bank hangs before responding with the list
+ * of transactions, in case there is none to return. The timing checks
+ * that the server hangs for as long as the unblocking payment takes place
+ * but NOT as long as the long_poll_ms parameter would suggest. This last
+ * check ensures that the response can only contain the artificial
unblocking
+ * payment (that happens after a certain timeout).
+ */
+ @Test
+ fun longPolledTransactions() {
+ val unblockingTxTimer = Timer()
+ val testStartTime = System.currentTimeMillis()
+ withTestDatabase {
+ prepSandboxDb()
+ testApplication {
+ application(sandboxApp)
+ runBlocking {
+ launch {
+ // long polls at most 50 seconds.
+ val R =
client.get("/demobanks/default/access-api/accounts/foo/transactions?long_poll_ms=50000")
{
+ expectSuccess = true
+ basicAuth("foo", "foo")
+ }
+ assert(getTxs(R.bodyAsText()).size() == 1)
+ val testEndTime = System.currentTimeMillis()
+ val timeDiff = (testEndTime - testStartTime) / 1000L
+ /**
+ * Now checking that the server responded after the
unblocking tx
+ * took place and before the long poll timeout would
occur.
+ */
+ println(timeDiff)
+ assert(timeDiff in 4 .. 39)
+ }
+ unblockingTxTimer.schedule(
+ delay = 4000L, // unblocks the server in (at least) 4
seconds.
+ action = {
+ wireTransfer(
+ "admin",
+ "foo",
+ "default",
+ "#9",
+ "TESTKUDOS:2"
+ )
+ }
+ )
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/TalerTest.kt
b/nexus/src/test/kotlin/TalerTest.kt
index 6565c197..ecaa7a0a 100644
--- a/nexus/src/test/kotlin/TalerTest.kt
+++ b/nexus/src/test/kotlin/TalerTest.kt
@@ -118,7 +118,6 @@ class TalerTest {
prepNexusDb()
testApplication {
application(nexusApp)
- // This call blocks for 90 seconds
val currentTime = System.currentTimeMillis()
runBlocking {
launch {
@@ -133,6 +132,7 @@ class TalerTest {
expectSuccess = true
}
val latestTime = System.currentTimeMillis()
+ // Checks that the call didn't hang for the whole
long_poll_ms.
assert(R.status.value == HttpStatusCode.OK.value
&& (latestTime - currentTime) < 2000
)
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
index d63dac33..d3786968 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/CircuitApi.kt
@@ -320,6 +320,7 @@ fun circuitApi(circuitRoute: Route) {
)
op.status = CashoutOperationStatus.CONFIRMED
op.confirmationTime = getUTCnow().toInstant().toEpochMilli()
+ // TODO(signal this payment over LIBEUFIN_REGIO_INCOMING)
}
call.respond(HttpStatusCode.NoContent)
return@post
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
new file mode 100644
index 00000000..1d256beb
--- /dev/null
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -0,0 +1,72 @@
+package tech.libeufin.sandbox
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+
+/**
+ * This file contains the logic for downloading/submitting incoming/outgoing
+ * fiat transactions to Nexus. It needs the following values for operating.
+ *
+ * 1. Nexus URL.
+ * 2. Credentials to authenticate at Nexus JSON API.
+ * 3. Long-polling interval.
+ * 4. Frequency of the download loop.
+ *
+ * Notes:
+ *
+ * 1. The account to credit on incoming transactions is ALWAYS "admin".
+ * 2. The time to submit a new payment is as soon as "admin" receives one
+ * incoming regional payment.
+ * 3. At this time, Nexus does NOT offer long polling when it serves the
+ * transactions via its JSON API.
+ * 4. At this time, Nexus does NOT offer any filter when it serves the
+ * transactions via its JSON API.
+ */
+
+// Temporarily hard-coded. According to fiat times, these values could be WAY
higher.
+val longPollMs = 30000L // 30s long-polling.
+val loopNewReqMs = 2000L // 2s for the next request.
+
+/**
+ * Executes the 'block' function every 'loopNewReqMs' milliseconds.
+ * Does not exit/fail the process upon exceptions - just logs them.
+ */
+fun downloadLoop(block: () -> Unit) {
+ // Needs "runBlocking {}" to call "delay()" and in case 'block'
+ // contains suspend functions.
+ runBlocking {
+ while(true) {
+ try { block() }
+ catch (e: Exception) {
+ /**
+ * Not exiting to tolerate network issues, or optimistically
+ * tolerate problems not caused by Sandbox itself.
+ */
+ logger.error("Sandbox fiat-incoming monitor excepted:
${e.message}")
+ }
+ delay(loopNewReqMs)
+ }
+ }
+}
+
+/**
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and signals their arrival
(LIBEUFIN_FIAT_INCOMING)
+ * to allow crediting the "admin" account.
+ */
+// fetchTransactions()
+
+/**
+ * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
+ * and credits the "admin" account as a reaction. Lastly, the Nexus instance
+ * wired to Sandbox will pick the new payment and serve it via its TWG, but
+ * this is OUT of the Sandbox scope.
+ */
+// creditAdmin()
+
+/**
+ * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING)
+ * and submits the related cash-out payment to Nexus. The fiat payment will
+ * then take place ENTIRELY on Nexus' responsibility.
+ */
+// issueCashout()
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
index 54519bbb..a2454ff4 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
@@ -460,5 +460,4 @@ fun prepareEbicsPayload(
}
val enc = CryptoUtil.encryptEbicsE002(compressedResponse, pub)
return Pair(Base64.getEncoder().encodeToString(enc.encryptedData), enc)
-
}
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
index 31856c09..f11d8cb3 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
@@ -47,6 +47,7 @@ import io.ktor.server.util.*
import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.util.date.*
+import org.jetbrains.exposed.dao.flushCache
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.statements.api.ExposedBlob
import org.jetbrains.exposed.sql.transactions.transaction
@@ -1530,37 +1531,48 @@ val sandboxApp: Application.() -> Unit = {
if (fromMs < 0) throw badRequest("'from_ms' param is less
than 0")
val untilMs =
expectLong(call.request.queryParameters["until_ms"] ?:
Long.MAX_VALUE.toString())
if (untilMs < 0) throw badRequest("'until_ms' param is
less than 0")
- val ret = mutableListOf<XLibeufinBankTransaction>()
- /**
- * Case where page number wasn't given,
- * therefore the results starts from the last transaction.
- */
- transaction {
- /**
- * Get a history page - from the calling bank account
- having
- * 'firstElementId' as the latest transaction in it.
*/
- fun getPage(firstElementId: Long):
Iterable<BankAccountTransactionEntity> {
- logger.debug("Trying to build pageBuf from ID:
$firstElementId," +
- " including $size txs in the past."
- )
- return BankAccountTransactionEntity.find {
- (BankAccountTransactionsTable.id lessEq
firstElementId) and
- (BankAccountTransactionsTable.account
eq bankAccount.id) and
-
(BankAccountTransactionsTable.date.between(fromMs, untilMs))
- }.sortedByDescending { it.id.value }.take(size)
- }
- val lt: BankAccountTransactionEntity? =
bankAccount.lastTransaction
- if (lt == null) return@transaction
- var nextPageIdUpperLimit: Long = lt.id.value
- // This loop fetches (and discards) pages until the
desired one is found.
- for (i in 1..(page)) {
- val pageBuf = getPage(nextPageIdUpperLimit)
- logger.debug("pageBuf #$i follows. Request wants
#$page:")
- pageBuf.forEach { logger.debug("ID: ${it.id},
subject: ${it.subject}, amount: ${it.currency}:${it.amount}") }
- if (pageBuf.none()) return@transaction
- nextPageIdUpperLimit = pageBuf.last().id.value - 1
- if (i == page) pageBuf.forEach {
-
ret.add(getHistoryElementFromTransactionRow(it))
+ val longPollMs: Long? = call.maybeLong("long_poll_ms")
+ // LISTEN, if Postgres.
+ val listenHandle = if (isPostgres() && longPollMs != null)
{
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ call.expectUriComponent("account_name")
+ )
+ val listenHandle = PostgresListenHandle(channelName)
+ // Can't LISTEN on the same DB TX that checks for
data, as Exposed
+ // closes that connection and the notification getter
would fail.
+ // Can't invoke the notification getter in the same DB
TX either,
+ // as it would block the DB.
+ listenHandle.postgresListen()
+ listenHandle
+ } else null
+ val historyParams = HistoryParams(
+ pageNumber = page,
+ pageSize = size,
+ bankAccount = bankAccount,
+ fromMs = fromMs,
+ untilMs = untilMs
+ )
+ var ret: List<XLibeufinBankTransaction> = transaction {
+ extractTxHistory(historyParams)
+ }
+ // Data was found already, UNLISTEN and respond.
+ if (listenHandle != null && ret.isNotEmpty()) {
+ listenHandle.postgresUnlisten()
+ call.respond(object {val transactions = ret})
+ return@get
+ }
+ // No data was found, sleep until the timeout or getting
woken up.
+ // Third condition only silences the compiler.
+ if (listenHandle != null && ret.isEmpty() && longPollMs !=
null) {
+ val notificationArrived =
listenHandle.waitOnIODispatchers(longPollMs)
+ // Only if the awaited event fired, query again the DB.
+ if (notificationArrived)
+ {
+ ret = transaction {
+ // Refreshing to update the index to the very
last transaction.
+ historyParams.bankAccount.refresh()
+ extractTxHistory(historyParams)
}
}
}
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
index e6559897..748962d5 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
@@ -224,6 +224,63 @@ fun wireTransfer(
this.demobank = demobank
this.pmtInfId = pmtInfId
}
+ // Signaling this wire transfer's event.
+ if (this.isPostgres()) {
+ val creditChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ creditAccount.label
+ )
+ this.postgresNotify(creditChannel, "CRDT")
+ val debitChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ debitAccount.label
+ )
+ this.postgresNotify(debitChannel, "DBIT")
+ }
}
return transactionRef
+}
+
+/**
+ * Helper that constructs a transactions history page
+ * according to the URI parameters passed to Access API's
+ * GET /transactions.
+ */
+data class HistoryParams(
+ val pageNumber: Int,
+ val pageSize: Int,
+ val fromMs: Long,
+ val untilMs: Long,
+ val bankAccount: BankAccountEntity
+)
+fun extractTxHistory(params: HistoryParams): List<XLibeufinBankTransaction> {
+ val ret = mutableListOf<XLibeufinBankTransaction>()
+ /**
+ * Helper that gets transactions earlier than the 'firstElementId'
+ * transaction AND that match the URI parameters.
+ */
+ fun getPage(firstElementId: Long): Iterable<BankAccountTransactionEntity> {
+ return BankAccountTransactionEntity.find {
+ (BankAccountTransactionsTable.id lessEq firstElementId) and
+ (BankAccountTransactionsTable.account eq
params.bankAccount.id) and
+ (BankAccountTransactionsTable.date.between(params.fromMs,
params.untilMs))
+ }.sortedByDescending { it.id.value }.take(params.pageSize)
+ }
+ // Gets a pointer to the last transaction of this bank account.
+ val lastTransaction: BankAccountTransactionEntity? =
params.bankAccount.lastTransaction
+ if (lastTransaction == null) return ret
+ var nextPageIdUpperLimit: Long = lastTransaction.id.value
+
+ // This loop fetches (and discards) pages until the desired one is found.
+ for (i in 1..(params.pageNumber)) {
+ val pageBuf = getPage(nextPageIdUpperLimit)
+ logger.debug("pageBuf #$i follows. Request wants
#${params.pageNumber}:")
+ pageBuf.forEach { logger.debug("ID: ${it.id}, subject: ${it.subject},
amount: ${it.currency}:${it.amount}") }
+ if (pageBuf.none()) return ret
+ nextPageIdUpperLimit = pageBuf.last().id.value - 1
+ if (i == params.pageNumber) pageBuf.forEach {
+ ret.add(getHistoryElementFromTransactionRow(it))
+ }
+ }
+ return ret
}
\ No newline at end of file
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index a4ccbda3..59737a21 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -18,18 +18,13 @@
*/
package tech.libeufin.util
-import UtilError
-import io.ktor.http.*
+import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import logger
import net.taler.wallet.crypto.Base32Crockford
-import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
-import org.jetbrains.exposed.sql.transactions.transaction
-import org.jetbrains.exposed.sql.transactions.transactionManager
-import org.postgresql.PGNotification
import org.postgresql.jdbc.PgConnection
fun Transaction.isPostgres(): Boolean {
@@ -46,22 +41,51 @@ fun isPostgres(): Boolean {
// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
enum class NotificationsChannelDomains(val value: Int) {
- LIBEUFIN_TALER_INCOMING(3000)
+ // When payments with well-formed Taler subject arrive.
+ LIBEUFIN_TALER_INCOMING(3000),
+ // A transaction happened for a particular user. The payload
+ // informs about the direction.
+ LIBEUFIN_REGIO_TX(3001),
+ // When an incoming fiat payment is downloaded from Nexus.
+ // Happens when a customer wants to withdraw Taler coins in the
+ // regional currency.
+ LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
+ // When Nexus discovers a new transactions from the bank it
+ // is connected to. This even may wake up a client who is waiting
+ // on Nexus' GET /transactions.
+ LIBEUFIN_NEXUS_FIAT_INCOMING(3003)
}
-// Helper that builds a LISTEN-NOTIFY channel name.
+/**
+ * Helper that builds a LISTEN-NOTIFY channel name.
+ * 'salt' should be any value that would uniquely deliver the
+ * message to its receiver. IBANs are ideal, but they cost DB queries.
+ */
+
fun buildChannelName(
domain: NotificationsChannelDomains,
- iban: String,
+ salt: String,
separator: String = "_"
): String {
- val channelElements = "${domain.value}$separator$iban"
+ val channelElements = "${domain.value}$separator$salt"
val ret =
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
- logger.debug("Defining db channel name for IBAN: $iban, domain:
${domain.name}, resulting in: $ret")
+ logger.debug("Defining db channel name for salt: $salt, domain:
${domain.name}, resulting in: $ret")
return ret
}
-fun Transaction.postgresNotify(channel: String) {
+fun Transaction.postgresNotify(
+ channel: String,
+ payload: String? = null
+ ) {
+ if (payload != null) {
+ val argEnc = Base32Crockford.encode(payload.toByteArray())
+ if (payload.toByteArray().size > 8000)
+ throw internalServerError(
+ "DB notification on channel $channel used >8000 bytes payload
'$payload'"
+ )
+ this.exec("NOTIFY $channel, '$argEnc'")
+ return
+ }
this.exec("NOTIFY $channel")
}
@@ -85,6 +109,12 @@ class PostgresListenHandle(val channelName: String) {
"Could not find the default database, won't get Postgres
notifications."
)
private val conn = db.connector().connection as PgConnection
+ // Gets set to the NOTIFY's payload, in case one exists.
+ var receivedPayload: String? = null
+ // Signals whether the connection should be kept open,
+ // after one (and possibly not expected) event arrives.
+ // This gives more flexibility to the caller.
+ var keepConnection: Boolean = false
fun postgresListen() {
val stmt = conn.createStatement()
@@ -100,7 +130,16 @@ class PostgresListenHandle(val channelName: String) {
conn.close()
}
- fun postgresGetNotifications(timeoutMs: Long): Boolean {
+ private fun likelyCloseConnection() {
+ if (this.keepConnection)
+ return
+ this.conn.close()
+ }
+
+ fun postgresGetNotifications(
+ timeoutMs: Long,
+ keepConnectionOpen: Boolean = false
+ ): Boolean {
if (timeoutMs == 0L)
logger.warn("Database notification checker has timeout == 0," +
" that waits FOREVER until a notification arrives."
@@ -110,17 +149,66 @@ class PostgresListenHandle(val channelName: String) {
val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
if (maybeNotifications == null || maybeNotifications.isEmpty()) {
logger.debug("DB notification channel $channelName was found
empty.")
- conn.close()
+ this.likelyCloseConnection()
return false
}
for (n in maybeNotifications) {
if (n.name.lowercase() != channelName.lowercase()) {
- conn.close()
+ conn.close() // always close on error, without the optional
check.
throw internalServerError("Channel $channelName got notified
from ${n.name}!")
}
}
logger.debug("Found DB notifications on channel $channelName")
- conn.close()
+ // Only ever used for singleton notifications.
+ assert(maybeNotifications.size == 1)
+ if(maybeNotifications[0].parameter.isNotEmpty())
+ this.receivedPayload = maybeNotifications[0].parameter
+ this.likelyCloseConnection()
return true
}
+
+ // Wrapper around the core method "postgresGetNotifications()" that
+ // sets up the coroutine environment to wait and release the execution.
+ suspend fun waitOnIODispatchers(timeoutMs: Long): Boolean =
+ coroutineScope {
+ async(Dispatchers.IO) {
+ postgresGetNotifications(timeoutMs)
+ }.await()
+ }
+
+ /**
+ * Waits at most 'timeoutMs' on 'this.channelName' for
+ * the one particular payload that's passed in the 'payload'
+ * argument. FIXME: will be used along the fiat side of cash-outs.
+ */
+ suspend fun waitOnIoDispatchersForPayload(
+ timeoutMs: Long,
+ expectedPayload: String
+ ): Boolean {
+ var leftTime = timeoutMs
+ val expectedPayloadEnc =
Base32Crockford.encode(expectedPayload.toByteArray())
+ /**
+ * This setting allows the loop to reuse the open connection,
+ * otherwise the internal loop would close it if one unexpected
+ * payload wakes it up.
+ */
+ this.keepConnection = true
+ while (leftTime > 0) {
+ val loopStart = System.currentTimeMillis()
+ // Ask for notifications.
+ val maybeNotification = waitOnIODispatchers(leftTime)
+ // One arrived, check the payload.
+ if (maybeNotification) {
+ if (this.receivedPayload != null && this.receivedPayload ==
expectedPayloadEnc) {
+ conn.close()
+ return true
+ }
+ }
+ val loopEnd = System.currentTimeMillis()
+ // Account the spent time.
+ leftTime -= loopEnd - loopStart
+ }
+ conn.close()
+ return false
+ }
}
\ No newline at end of file
diff --git a/util/src/main/kotlin/HTTP.kt b/util/src/main/kotlin/HTTP.kt
index d9ff4491..30a15bd9 100644
--- a/util/src/main/kotlin/HTTP.kt
+++ b/util/src/main/kotlin/HTTP.kt
@@ -9,6 +9,7 @@ import io.ktor.server.util.*
import io.ktor.util.*
import logger
import java.net.URLDecoder
+import kotlin.reflect.typeOf
fun unauthorized(msg: String): UtilError {
return UtilError(
@@ -208,4 +209,14 @@ fun expectLong(uriParam: String): Long {
logger.error(e.message)
throw badRequest("'$uriParam' is not Long")
}
+}
+
+// Returns null, or tries to convert the parameter to type T.
+// Throws Bad Request, if the conversion could not be done.
+fun ApplicationCall.maybeLong(uriParamName: String): Long? {
+ val maybeParam = this.parameters[uriParamName] ?: return null
+ return try { maybeParam.toLong() }
+ catch (e: Exception) {
+ throw badRequest("Could not convert '$uriParamName' to Long")
+ }
}
\ No newline at end of file
diff --git a/util/src/test/kotlin/ibanTest.kt b/util/src/test/kotlin/ibanTest.kt
deleted file mode 100644
index 964f822d..00000000
--- a/util/src/test/kotlin/ibanTest.kt
+++ /dev/null
@@ -1,10 +0,0 @@
-import org.junit.Test
-import tech.libeufin.util.getIban
-
-class IbanTest {
-
- @Test
- fun genIban() {
- println(getIban())
- }
-}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [libeufin] branch master updated (1b9fbb02 -> bcf54117),
gnunet <=
- [libeufin] 02/07: testing DB events, gnunet, 2023/04/12
- [libeufin] 01/07: Helpers., gnunet, 2023/04/12
- [libeufin] 04/07: comments, indentation., gnunet, 2023/04/12
- [libeufin] 07/07: comment, gnunet, 2023/04/12
- [libeufin] 05/07: DB events., gnunet, 2023/04/12
- [libeufin] 03/07: helper, gnunet, 2023/04/12
- [libeufin] 06/07: DB events at Access API., gnunet, 2023/04/12