[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] branch master updated: Performance.
From: |
gnunet |
Subject: |
[libeufin] branch master updated: Performance. |
Date: |
Wed, 12 Apr 2023 22:49:32 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
The following commit(s) were added to refs/heads/master by this push:
new 6ffd6c5f Performance.
6ffd6c5f is described below
commit 6ffd6c5f2b2dbb659b08ec6503a6d4a71ff47386
Author: MS <ms@taler.net>
AuthorDate: Wed Apr 12 22:46:49 2023 +0200
Performance.
Implementing long polling to get transactions via
the Nexus native API.
---
nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 17 +++++--
.../tech/libeufin/nexus/iso20022/Iso20022.kt | 1 -
.../kotlin/tech/libeufin/nexus/server/Helpers.kt | 39 ++++++++++++++--
.../tech/libeufin/nexus/server/NexusServer.kt | 49 ++++++++++++++------
nexus/src/test/kotlin/DbEventTest.kt | 35 ++++++++++++++
nexus/src/test/kotlin/NexusApiTest.kt | 53 +++++++++++++++++++++-
util/src/main/kotlin/DB.kt | 17 +++++--
7 files changed, 185 insertions(+), 26 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index 5679b8ca..6b152858 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -27,7 +27,7 @@ import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
-import tech.libeufin.util.EbicsInitState
+import tech.libeufin.util.*
import java.sql.Connection
import kotlin.reflect.typeOf
@@ -210,8 +210,19 @@ object NexusBankTransactionsTable : LongIdTable() {
}
class NexusBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) {
- companion object :
LongEntityClass<NexusBankTransactionEntity>(NexusBankTransactionsTable)
-
+ companion object :
LongEntityClass<NexusBankTransactionEntity>(NexusBankTransactionsTable) {
+ override fun new(init: NexusBankTransactionEntity.() -> Unit):
NexusBankTransactionEntity {
+ val ret = super.new(init)
+ if (isPostgres()) {
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_NEXUS_TX,
+ ret.bankAccount.bankAccountName
+ )
+ TransactionManager.current().postgresNotify(channelName,
ret.creditDebitIndicator)
+ }
+ return ret
+ }
+ }
var currency by NexusBankTransactionsTable.currency
var amount by NexusBankTransactionsTable.amount
var status by NexusBankTransactionsTable.status
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
index a543d22a..a8509323 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
@@ -273,7 +273,6 @@ data class Batch(
@JsonInclude(JsonInclude.Include.NON_NULL)
data class CamtBankAccountEntry(
val amount: CurrencyAmount,
-
/**
* Is this entry debiting or crediting the account
* it is reported for?
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
index 08eb2b1d..aecc58eb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
@@ -1,13 +1,46 @@
package tech.libeufin.nexus.server
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.http.*
+import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
+import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq
+import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
-import tech.libeufin.nexus.NexusBankConnectionEntity
-import tech.libeufin.nexus.NexusBankConnectionsTable
-import tech.libeufin.nexus.NexusError
+import tech.libeufin.nexus.*
+import tech.libeufin.nexus.bankaccount.getBankAccount
+import tech.libeufin.nexus.iso20022.CamtBankAccountEntry
import tech.libeufin.util.internalServerError
import tech.libeufin.util.notFound
+// Type holding parameters of GET /transactions.
+data class GetTransactionsParams(
+ val bankAccountId: String,
+ val startIndex: Long,
+ val resultSize: Long
+)
+
+/**
+ * Queries the database according to the GET /transactions
+ * parameters.
+ */
+fun getIngestedTransactions(params: GetTransactionsParams): List<JsonNode> =
+ transaction {
+ val bankAccount = getBankAccount(params.bankAccountId)
+ val maybeResult = NexusBankTransactionEntity.find {
+ NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and
(
+ NexusBankTransactionsTable.id greaterEq params.startIndex
+ )
+ }.sortedBy { it.id.value }.take(params.resultSize.toInt()) // Smallest
index (= earliest transaction) first
+ // Converting the result to the HTTP response type.
+ maybeResult.map {
+ val element: ObjectNode =
jacksonObjectMapper().readTree(it.transactionJson) as ObjectNode
+ element.put("index", it.id.value.toString())
+ return@map element
+ }
+ }
+
fun unknownBankAccount(bankAccountLabel: String): NexusError {
return NexusError(
HttpStatusCode.NotFound,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index 03f12cf6..269d6a32 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -762,21 +762,42 @@ val nexusApp: Application.() -> Unit = {
// Asks list of transactions ALREADY downloaded from the bank.
get("/bank-accounts/{accountid}/transactions") {
requireSuperuser(call.request)
- val bankAccountId = expectNonNull(call.parameters["accountid"])
- val ret = Transactions()
- transaction {
- val bankAccount =
NexusBankAccountEntity.findByName(bankAccountId)
- if (bankAccount == null) {
- throw unknownBankAccount(bankAccountId)
- }
- NexusBankTransactionEntity.find {
NexusBankTransactionsTable.bankAccount eq bankAccount.id }.map {
- val tx = jacksonObjectMapper().readValue(
- it.transactionJson, CamtBankAccountEntry::class.java
- )
- ret.transactions.add(tx)
- }
+ val accountLabel = expectNonNull(call.parameters["accountid"])
+ // Getting the URI parameters.
+ val maybeStart = call.maybeLong("start") // Earliest TX in the
result.
+ val maybeSize = call.maybeLong("size") // How many TXs at most.
+ val maybeLongPoll = call.maybeLong("long_poll_ms")
+
+ // Ask for a DB event (before the actual query),
+ // in case the DB is Postgres and the client wants.
+ val listenHandle = if (isPostgres() && maybeLongPoll != null) {
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_NEXUS_TX,
+ accountLabel
+ )
+ val listenHandle = PostgresListenHandle(channelName)
+ listenHandle.postgresListen()
+ listenHandle
+ } else null
+
+ // Try getting results, and UNLISTEN in case they exist.
+ val queryParam = GetTransactionsParams(
+ bankAccountId = accountLabel,
+ resultSize = maybeSize ?: 5,
+ startIndex = maybeStart ?: 1
+ )
+ var ret = getIngestedTransactions(queryParam)
+ if (ret.isNotEmpty() && listenHandle != null)
+ listenHandle.postgresUnlisten() // closes the PG connection
too.
+
+ // No results and a DB event is pending: wait.
+ if (ret.isEmpty() && listenHandle != null && maybeLongPoll !=
null) {
+ val isNotificationArrived =
listenHandle.waitOnIODispatchers(maybeLongPoll)
+ // The event happened, query again.
+ if (isNotificationArrived)
+ ret = getIngestedTransactions(queryParam)
}
- call.respond(ret)
+ call.respond(object {val transactions = ret})
return@get
}
diff --git a/nexus/src/test/kotlin/DbEventTest.kt
b/nexus/src/test/kotlin/DbEventTest.kt
index 745959b8..57075882 100644
--- a/nexus/src/test/kotlin/DbEventTest.kt
+++ b/nexus/src/test/kotlin/DbEventTest.kt
@@ -4,7 +4,9 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.transactions.transaction
import org.junit.Test
+import tech.libeufin.util.NotificationsChannelDomains
import tech.libeufin.util.PostgresListenHandle
+import tech.libeufin.util.buildChannelName
import tech.libeufin.util.postgresNotify
@@ -34,4 +36,37 @@ class DbEventTest {
}
}
}
+
+ /**
+ * This function tests the NOTIFY sent by a Exposed's
+ * "new {}" overridden static method.
+ */
+ @Test
+ fun automaticNotifyTest() {
+ withTestDatabase {
+ prepNexusDb()
+ val nexusTxChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_NEXUS_TX,
+ "foo" // bank account label.
+ )
+ val listenHandle = PostgresListenHandle(nexusTxChannel)
+ transaction { listenHandle.postgresListen() }
+ runBlocking {
+ launch {
+ val isArrived = listenHandle.waitOnIODispatchers(timeoutMs
= 1000L)
+ assert(isArrived)
+ }
+ launch {
+ delay(500L); // Ensures the wait helper runs first.
+ transaction {
+ newNexusBankTransaction(
+ "TESTKUDOS",
+ "2",
+ "unblocking event"
+ )
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/NexusApiTest.kt
b/nexus/src/test/kotlin/NexusApiTest.kt
index e4fcc6d0..1b5caaec 100644
--- a/nexus/src/test/kotlin/NexusApiTest.kt
+++ b/nexus/src/test/kotlin/NexusApiTest.kt
@@ -1,16 +1,67 @@
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.plugins.*
import io.ktor.client.request.*
+import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.server.testing.*
+import kotlinx.coroutines.async
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.ensureActive
+import kotlinx.coroutines.runBlocking
import org.junit.Test
import tech.libeufin.nexus.server.nexusApp
-import tech.libeufin.sandbox.sandboxApp
/**
* This class tests the API offered by Nexus,
* documented here: https://docs.taler.net/libeufin/api-nexus.html
*/
class NexusApiTest {
+ // Testing long-polling on GET /transactions
+ @Test
+ fun getTransactions() {
+ withTestDatabase {
+ prepNexusDb()
+ testApplication {
+ application(nexusApp)
+ /**
+ * Requesting /transactions with long polling, and assert that
+ * the response arrives _after_ the unblocking INSERT into the
+ * database.
+ */
+ val longPollMs = 5000
+ runBlocking {
+ val requestJob = async {
+
client.get("/bank-accounts/foo/transactions?long_poll_ms=$longPollMs") {
+ basicAuth("foo", "foo")
+ contentType(ContentType.Application.Json)
+ }
+ }
+ /**
+ * The following delay ensures that the payment below
+ * gets inserted after the client has issued the long
+ * polled request above (and it is therefore waiting)
+ */
+ delay(2000)
+ // Ensures that the request is active _before_ the
+ // upcoming payment. This ensures that the request
+ // didn't find already another payment in the database.
+ requestJob.ensureActive()
+ newNexusBankTransaction(
+ currency = "TESTKUDOS",
+ value = "2",
+ subject = "first"
+ )
+ val R = requestJob.await()
+ // Ensures that the request did NOT wait all the timeout
+ assert((R.responseTime.timestamp -
R.requestTime.timestamp) < longPollMs)
+ val body = jacksonObjectMapper().readTree(R.bodyAsText())
+ // Ensures that the unblocking payment exists in the
response.
+ val tx = body.get("transactions")
+ assert(tx.isArray && tx.size() == 1)
+ }
+ }
+ }
+ }
// Testing basic operations on facades.
@Test
fun facades() {
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index 59737a21..b0dcec9a 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -40,6 +40,12 @@ fun isPostgres(): Boolean {
}
// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
+/**
+ * Note: every domain is ALWAYS meant to be salted with
+ * a unique identifier that points to the user waiting for
+ * a notification. The reference function for salting is:
+ * "buildChannelName()", in this file.
+ */
enum class NotificationsChannelDomains(val value: Int) {
// When payments with well-formed Taler subject arrive.
LIBEUFIN_TALER_INCOMING(3000),
@@ -50,10 +56,12 @@ enum class NotificationsChannelDomains(val value: Int) {
// Happens when a customer wants to withdraw Taler coins in the
// regional currency.
LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
- // When Nexus discovers a new transactions from the bank it
- // is connected to. This even may wake up a client who is waiting
- // on Nexus' GET /transactions.
- LIBEUFIN_NEXUS_FIAT_INCOMING(3003)
+ // When Nexus has ingested a new transactions from the bank it
+ // is connected to. This event carries incoming and outgoing
+ // payments, and it specifies that in its payload. The direction
+ // codename is the same as CaMt (DBIT, CRDT), as that is also
+ // used in the database.
+ LIBEUFIN_NEXUS_TX(3003)
}
/**
@@ -77,6 +85,7 @@ fun Transaction.postgresNotify(
channel: String,
payload: String? = null
) {
+ logger.debug("Sending NOTIFY on channel '$channel' with payload
'$payload'")
if (payload != null) {
val argEnc = Base32Crockford.encode(payload.toByteArray())
if (payload.toByteArray().size > 8000)
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [libeufin] branch master updated: Performance.,
gnunet <=