gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] branch master updated: Performance.


From: gnunet
Subject: [libeufin] branch master updated: Performance.
Date: Wed, 12 Apr 2023 22:49:32 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

The following commit(s) were added to refs/heads/master by this push:
     new 6ffd6c5f Performance.
6ffd6c5f is described below

commit 6ffd6c5f2b2dbb659b08ec6503a6d4a71ff47386
Author: MS <ms@taler.net>
AuthorDate: Wed Apr 12 22:46:49 2023 +0200

    Performance.
    
    Implementing long polling to get transactions via
    the Nexus native API.
---
 nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt    | 17 +++++--
 .../tech/libeufin/nexus/iso20022/Iso20022.kt       |  1 -
 .../kotlin/tech/libeufin/nexus/server/Helpers.kt   | 39 ++++++++++++++--
 .../tech/libeufin/nexus/server/NexusServer.kt      | 49 ++++++++++++++------
 nexus/src/test/kotlin/DbEventTest.kt               | 35 ++++++++++++++
 nexus/src/test/kotlin/NexusApiTest.kt              | 53 +++++++++++++++++++++-
 util/src/main/kotlin/DB.kt                         | 17 +++++--
 7 files changed, 185 insertions(+), 26 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index 5679b8ca..6b152858 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -27,7 +27,7 @@ import org.jetbrains.exposed.dao.id.LongIdTable
 import org.jetbrains.exposed.sql.*
 import org.jetbrains.exposed.sql.transactions.TransactionManager
 import org.jetbrains.exposed.sql.transactions.transaction
-import tech.libeufin.util.EbicsInitState
+import tech.libeufin.util.*
 import java.sql.Connection
 import kotlin.reflect.typeOf
 
@@ -210,8 +210,19 @@ object NexusBankTransactionsTable : LongIdTable() {
 }
 
 class NexusBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) {
-    companion object : 
LongEntityClass<NexusBankTransactionEntity>(NexusBankTransactionsTable)
-
+    companion object : 
LongEntityClass<NexusBankTransactionEntity>(NexusBankTransactionsTable) {
+        override fun new(init: NexusBankTransactionEntity.() -> Unit): 
NexusBankTransactionEntity {
+            val ret = super.new(init)
+            if (isPostgres()) {
+                val channelName = buildChannelName(
+                    NotificationsChannelDomains.LIBEUFIN_NEXUS_TX,
+                    ret.bankAccount.bankAccountName
+                )
+                TransactionManager.current().postgresNotify(channelName, 
ret.creditDebitIndicator)
+            }
+            return ret
+        }
+    }
     var currency by NexusBankTransactionsTable.currency
     var amount by NexusBankTransactionsTable.amount
     var status by NexusBankTransactionsTable.status
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
index a543d22a..a8509323 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
@@ -273,7 +273,6 @@ data class Batch(
 @JsonInclude(JsonInclude.Include.NON_NULL)
 data class CamtBankAccountEntry(
     val amount: CurrencyAmount,
-
     /**
      * Is this entry debiting or crediting the account
      * it is reported for?
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
index 08eb2b1d..aecc58eb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
@@ -1,13 +1,46 @@
 package tech.libeufin.nexus.server
 
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.http.*
+import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
+import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq
+import org.jetbrains.exposed.sql.and
 import org.jetbrains.exposed.sql.transactions.transaction
-import tech.libeufin.nexus.NexusBankConnectionEntity
-import tech.libeufin.nexus.NexusBankConnectionsTable
-import tech.libeufin.nexus.NexusError
+import tech.libeufin.nexus.*
+import tech.libeufin.nexus.bankaccount.getBankAccount
+import tech.libeufin.nexus.iso20022.CamtBankAccountEntry
 import tech.libeufin.util.internalServerError
 import tech.libeufin.util.notFound
 
+// Type holding parameters of GET /transactions.
+data class GetTransactionsParams(
+    val bankAccountId: String,
+    val startIndex: Long,
+    val resultSize: Long
+)
+
+/**
+ * Queries the database according to the GET /transactions
+ * parameters.
+ */
+fun getIngestedTransactions(params: GetTransactionsParams): List<JsonNode> =
+    transaction {
+        val bankAccount = getBankAccount(params.bankAccountId)
+        val maybeResult = NexusBankTransactionEntity.find {
+            NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and 
(
+                    NexusBankTransactionsTable.id greaterEq params.startIndex
+                    )
+        }.sortedBy { it.id.value }.take(params.resultSize.toInt()) // Smallest 
index (= earliest transaction) first
+        // Converting the result to the HTTP response type.
+        maybeResult.map {
+            val element: ObjectNode = 
jacksonObjectMapper().readTree(it.transactionJson) as ObjectNode
+            element.put("index", it.id.value.toString())
+            return@map element
+        }
+    }
+
 fun unknownBankAccount(bankAccountLabel: String): NexusError {
     return NexusError(
         HttpStatusCode.NotFound,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index 03f12cf6..269d6a32 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -762,21 +762,42 @@ val nexusApp: Application.() -> Unit = {
         // Asks list of transactions ALREADY downloaded from the bank.
         get("/bank-accounts/{accountid}/transactions") {
             requireSuperuser(call.request)
-            val bankAccountId = expectNonNull(call.parameters["accountid"])
-            val ret = Transactions()
-            transaction {
-                val bankAccount = 
NexusBankAccountEntity.findByName(bankAccountId)
-                if (bankAccount == null) {
-                    throw unknownBankAccount(bankAccountId)
-                }
-                NexusBankTransactionEntity.find { 
NexusBankTransactionsTable.bankAccount eq bankAccount.id }.map {
-                    val tx = jacksonObjectMapper().readValue(
-                        it.transactionJson, CamtBankAccountEntry::class.java
-                    )
-                    ret.transactions.add(tx)
-                }
+            val accountLabel = expectNonNull(call.parameters["accountid"])
+            // Getting the URI parameters.
+            val maybeStart = call.maybeLong("start") // Earliest TX in the 
result.
+            val maybeSize = call.maybeLong("size") // How many TXs at most.
+            val maybeLongPoll = call.maybeLong("long_poll_ms")
+
+            // Ask for a DB event (before the actual query),
+            // in case the DB is Postgres and the client wants.
+            val listenHandle = if (isPostgres() && maybeLongPoll != null) {
+                val channelName = buildChannelName(
+                    NotificationsChannelDomains.LIBEUFIN_NEXUS_TX,
+                    accountLabel
+                )
+                val listenHandle = PostgresListenHandle(channelName)
+                listenHandle.postgresListen()
+                listenHandle
+            } else null
+
+            // Try getting results, and UNLISTEN in case they exist.
+            val queryParam = GetTransactionsParams(
+                bankAccountId = accountLabel,
+                resultSize = maybeSize ?: 5,
+                startIndex = maybeStart ?: 1
+            )
+            var ret = getIngestedTransactions(queryParam)
+            if (ret.isNotEmpty() && listenHandle != null)
+                listenHandle.postgresUnlisten() // closes the PG connection 
too.
+
+            // No results and a DB event is pending: wait.
+            if (ret.isEmpty() && listenHandle != null && maybeLongPoll != 
null) {
+                val isNotificationArrived = 
listenHandle.waitOnIODispatchers(maybeLongPoll)
+                // The event happened, query again.
+                if (isNotificationArrived)
+                    ret = getIngestedTransactions(queryParam)
             }
-            call.respond(ret)
+            call.respond(object {val transactions = ret})
             return@get
         }
 
diff --git a/nexus/src/test/kotlin/DbEventTest.kt 
b/nexus/src/test/kotlin/DbEventTest.kt
index 745959b8..57075882 100644
--- a/nexus/src/test/kotlin/DbEventTest.kt
+++ b/nexus/src/test/kotlin/DbEventTest.kt
@@ -4,7 +4,9 @@ import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.jetbrains.exposed.sql.transactions.transaction
 import org.junit.Test
+import tech.libeufin.util.NotificationsChannelDomains
 import tech.libeufin.util.PostgresListenHandle
+import tech.libeufin.util.buildChannelName
 import tech.libeufin.util.postgresNotify
 
 
@@ -34,4 +36,37 @@ class DbEventTest {
             }
         }
     }
+
+    /**
+     * This function tests the NOTIFY sent by a Exposed's
+     * "new {}" overridden static method.
+     */
+    @Test
+    fun automaticNotifyTest() {
+        withTestDatabase {
+            prepNexusDb()
+            val nexusTxChannel = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_NEXUS_TX,
+                "foo" // bank account label.
+            )
+            val listenHandle = PostgresListenHandle(nexusTxChannel)
+            transaction { listenHandle.postgresListen() }
+            runBlocking {
+                launch {
+                    val isArrived = listenHandle.waitOnIODispatchers(timeoutMs 
= 1000L)
+                    assert(isArrived)
+                }
+                launch {
+                    delay(500L); // Ensures the wait helper runs first.
+                    transaction {
+                        newNexusBankTransaction(
+                            "TESTKUDOS",
+                            "2",
+                            "unblocking event"
+                        )
+                    }
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/NexusApiTest.kt 
b/nexus/src/test/kotlin/NexusApiTest.kt
index e4fcc6d0..1b5caaec 100644
--- a/nexus/src/test/kotlin/NexusApiTest.kt
+++ b/nexus/src/test/kotlin/NexusApiTest.kt
@@ -1,16 +1,67 @@
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.client.plugins.*
 import io.ktor.client.request.*
+import io.ktor.client.statement.*
 import io.ktor.http.*
 import io.ktor.server.testing.*
+import kotlinx.coroutines.async
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.ensureActive
+import kotlinx.coroutines.runBlocking
 import org.junit.Test
 import tech.libeufin.nexus.server.nexusApp
-import tech.libeufin.sandbox.sandboxApp
 
 /**
  * This class tests the API offered by Nexus,
  * documented here: https://docs.taler.net/libeufin/api-nexus.html
  */
 class NexusApiTest {
+    // Testing long-polling on GET /transactions
+    @Test
+    fun getTransactions() {
+        withTestDatabase {
+            prepNexusDb()
+            testApplication {
+                application(nexusApp)
+                /**
+                 * Requesting /transactions with long polling, and assert that
+                 * the response arrives _after_ the unblocking INSERT into the
+                 * database.
+                 */
+                val longPollMs = 5000
+                runBlocking {
+                    val requestJob = async {
+                        
client.get("/bank-accounts/foo/transactions?long_poll_ms=$longPollMs") {
+                            basicAuth("foo", "foo")
+                            contentType(ContentType.Application.Json)
+                        }
+                    }
+                    /**
+                     * The following delay ensures that the payment below
+                     * gets inserted after the client has issued the long
+                     * polled request above (and it is therefore waiting)
+                     */
+                    delay(2000)
+                    // Ensures that the request is active _before_ the
+                    // upcoming payment.  This ensures that the request
+                    // didn't find already another payment in the database.
+                    requestJob.ensureActive()
+                    newNexusBankTransaction(
+                        currency = "TESTKUDOS",
+                        value = "2",
+                        subject = "first"
+                    )
+                    val R = requestJob.await()
+                    // Ensures that the request did NOT wait all the timeout
+                    assert((R.responseTime.timestamp - 
R.requestTime.timestamp) < longPollMs)
+                    val body = jacksonObjectMapper().readTree(R.bodyAsText())
+                    // Ensures that the unblocking payment exists in the 
response.
+                    val tx = body.get("transactions")
+                    assert(tx.isArray && tx.size() == 1)
+                }
+            }
+        }
+    }
     // Testing basic operations on facades.
     @Test
     fun facades() {
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index 59737a21..b0dcec9a 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -40,6 +40,12 @@ fun isPostgres(): Boolean {
 }
 
 // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
+/**
+ * Note: every domain is ALWAYS meant to be salted with
+ * a unique identifier that points to the user waiting for
+ * a notification.  The reference function for salting is:
+ * "buildChannelName()", in this file.
+ */
 enum class NotificationsChannelDomains(val value: Int) {
     // When payments with well-formed Taler subject arrive.
     LIBEUFIN_TALER_INCOMING(3000),
@@ -50,10 +56,12 @@ enum class NotificationsChannelDomains(val value: Int) {
     // Happens when a customer wants to withdraw Taler coins in the
     // regional currency.
     LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
-    // When Nexus discovers a new transactions from the bank it
-    // is connected to.  This even may wake up a client who is waiting
-    // on Nexus' GET /transactions.
-    LIBEUFIN_NEXUS_FIAT_INCOMING(3003)
+    // When Nexus has ingested a new transactions from the bank it
+    // is connected to.  This event carries incoming and outgoing
+    // payments, and it specifies that in its payload.  The direction
+    // codename is the same as CaMt (DBIT, CRDT), as that is also
+    // used in the database.
+    LIBEUFIN_NEXUS_TX(3003)
 }
 
 /**
@@ -77,6 +85,7 @@ fun Transaction.postgresNotify(
     channel: String,
     payload: String? = null
     ) {
+    logger.debug("Sending NOTIFY on channel '$channel' with payload 
'$payload'")
     if (payload != null) {
         val argEnc = Base32Crockford.encode(payload.toByteArray())
         if (payload.toByteArray().size > 8000)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]