gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 01/03: Conversion service.


From: gnunet
Subject: [libeufin] 01/03: Conversion service.
Date: Fri, 21 Apr 2023 20:23:31 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit 8611d22879eb802bc69077f77543004adc4970f7
Author: MS <ms@taler.net>
AuthorDate: Fri Apr 21 20:14:18 2023 +0200

    Conversion service.
    
    Implementing the buy-in side.
---
 .../tech/libeufin/sandbox/ConversionService.kt     | 221 ++++++++++++++++-----
 .../src/main/kotlin/tech/libeufin/sandbox/DB.kt    |  11 +
 2 files changed, 184 insertions(+), 48 deletions(-)

diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
index c52d74dd..a4cfccbb 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -1,16 +1,22 @@
 package tech.libeufin.sandbox
 
+import CamtBankAccountEntry
+import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.jsonMapper
 import io.ktor.client.*
 import io.ktor.client.plugins.*
 import io.ktor.client.request.*
 import io.ktor.client.statement.*
 import io.ktor.http.*
+import io.ktor.utils.io.jvm.javaio.*
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.runBlocking
 import org.jetbrains.exposed.sql.and
 import org.jetbrains.exposed.sql.transactions.transaction
 import tech.libeufin.util.*
+import java.math.BigDecimal
+import kotlin.system.exitProcess
 
 /**
  * This file contains the logic for downloading/submitting incoming/outgoing
@@ -27,11 +33,13 @@ import tech.libeufin.util.*
  * 2.  The time to submit a new payment is as soon as "admin" receives one
  *     incoming regional payment.
  * 3.  At this time, Nexus does NOT offer long polling when it serves the
- *     transactions via its JSON API.
+ *     transactions via its JSON API. => Fixed.
  * 4.  At this time, Nexus does NOT offer any filter when it serves the
- *     transactions via its JSON API.
+ *     transactions via its JSON API. => Can be fixed by using the TWG.
  */
 
+// DEFINITIONS AND HELPERS
+
 /**
  * Timeout the HTTP client waits for the server to respond,
  * after the request is made.
@@ -45,6 +53,17 @@ val waitTimeout = 30000L
  */
 val newIterationTimeout = 2000L
 
+/**
+ * Response format of Nexus GET /transactions.
+ */
+data class TransactionItem(
+    val index: String,
+    val camtData: CamtBankAccountEntry
+)
+data class NexusTransactions(
+    val transactions: List<TransactionItem>
+)
+
 /**
  * Executes the 'block' function every 'loopNewReqMs' milliseconds.
  * Does not exit/fail the process upon exceptions - just logs them.
@@ -67,20 +86,125 @@ fun downloadLoop(block: () -> Unit) {
     }
 }
 
+// BUY-IN SIDE.
+
 /**
- * This function downloads the incoming fiat transactions from Nexus,
- * stores them into the database and signals their arrival 
(LIBEUFIN_FIAT_INCOMING)
- * to allow crediting the "admin" account.
+ * Applies the buy-in ratio and fees to the fiat amount
+ * that came from Nexus.  The result is the regional amount
+ * that will be wired to the exchange Sandbox account.
  */
-// fetchTransactions()
-
+private fun applyBuyinRatioAndFees(
+    amount: BigDecimal,
+    ratioAndFees: RatioAndFees
+): BigDecimal =
+    ((amount * ratiosAndFees.buy_at_ratio.toBigDecimal())
+            - ratiosAndFees.buy_in_fee.toBigDecimal()).roundToTwoDigits()
 /**
- * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
- * and credits the "admin" account as a reaction.  Lastly, the Nexus instance
- * wired to Sandbox will pick the new payment and serve it via its TWG, but
- * this is OUT of the Sandbox scope.
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and triggers the related wire transfer
+ * to the Taler exchange (to be specified in 'accountToCredit').  In case
+ * of errors, it pauses and retries when the server fails, but _fails_ when
+ * the client does.
  */
-// creditAdmin()
+fun buyinMonitor(
+    demobankName: String, // used to get config values.
+    client: HttpClient,
+    accountToCredit: String,
+    accountToDebit: String = "admin"
+) {
+    val demobank = ensureDemobank(demobankName)
+    val nexusBaseUrl = getConfigValueOrThrow(demobank.config::nexusBaseUrl)
+    val usernameAtNexus = 
getConfigValueOrThrow(demobank.config::usernameAtNexus)
+    val passwordAtNexus = 
getConfigValueOrThrow(demobank.config::passwordAtNexus)
+    val endpoint = "bank-accounts/$usernameAtNexus/transactions"
+    val uriWithoutStart = joinUrl(nexusBaseUrl, endpoint) + 
"?long_poll_ms=$waitTimeout"
+
+    // downloadLoop does already try-catch (without failing the process).
+    downloadLoop {
+        val debitBankAccount = getBankAccountFromLabel(accountToDebit)
+        val uriWithStart = 
"$uriWithoutStart&start=${debitBankAccount.lastFiatFetch}"
+        runBlocking {
+            // Maybe get new fiat transactions.
+            logger.debug("GETting fiat transactions from: ${uriWithStart}")
+            val resp = client.get(uriWithStart) { basicAuth(usernameAtNexus, 
passwordAtNexus) }
+            // The server failed, pause and try again
+            if (resp.status.value.toString().startsWith('5')) {
+                logger.error("Buy-in monitor caught a failing to Nexus.  Pause 
and retry.")
+                logger.error("Nexus responded: ${resp.bodyAsText()}")
+                delay(2000L)
+                return@runBlocking
+            }
+            // The client failed, fail the process.
+            if (resp.status.value.toString().startsWith('4')) {
+                logger.error("Buy-in monitor failed at GETting to Nexus.  Fail 
Sandbox.")
+                logger.error("Nexus responded: ${resp.bodyAsText()}")
+                exitProcess(1)
+            }
+            // Expect 200 OK.  What if 3xx?
+            if (resp.status.value != HttpStatusCode.OK.value) {
+                logger.error("Unhandled response status ${resp.status.value}, 
failing Sandbox")
+                exitProcess(1)
+            }
+            // Nexus responded 200 OK, analyzing the result.
+            /**
+             * Wire to "admin" if the subject is a public key, or do
+             * nothing otherwise.
+             */
+            val respObj = jacksonObjectMapper().readValue(
+                resp.bodyAsText(),
+                NexusTransactions::class.java
+            ) // errors are logged by the caller (without failing).
+            respObj.transactions.forEach {
+                /**
+                 * If the payment doesn't contain a reserve public key,
+                 * continue the iteration with the new payment.
+                 */
+                if 
(extractReservePubFromSubject(it.camtData.getSingletonSubject()) == null)
+                    return@forEach
+                /**
+                 * The payment had a reserve public key in the subject, wire 
it to
+                 * the exchange.  NOTE: this ensures that all the payments 
that the
+                 * exchange gets will NOT trigger any reimbursement, because 
they have
+                 * a valid reserve public key.  Reimbursements would in fact 
introduce
+                 * significant friction, because they need to target _fiat_ 
bank accounts
+                 * (the customers'), whereas the entity that _now_ pays the 
exchange is
+                 * "admin", which lives in the regional circuit.
+                 */
+                // Extracts the amount and checks it's at most two fractional 
digits.
+                val maybeValidAmount = it.camtData.amount.value
+                if (!validatePlainAmount(maybeValidAmount)) {
+                    logger.error("Nexus gave one amount with invalid 
fractional digits: $maybeValidAmount." +
+                            "  The transaction has index ${it.index}")
+                    // Advancing the last fetched pointer, to avoid GETting
+                    // this invalid payment again.
+                    transaction {
+                        debitBankAccount.refresh()
+                        debitBankAccount.lastFiatFetch = it.index
+                    }
+                }
+                val convertedAmount = applyBuyinRatioAndFees(
+                    maybeValidAmount.toBigDecimal(),
+                    ratiosAndFees
+                )
+                transaction {
+                    wireTransfer(
+                        debitAccount = accountToDebit,
+                        creditAccount = accountToCredit,
+                        demobank = demobankName,
+                        subject = it.camtData.getSingletonSubject(),
+                        amount = "${demobank.config.currency}:$convertedAmount"
+                    )
+                    // Nexus enqueues the transactions such that the index 
increases.
+                    // If Sandbox crashes here, it'll ask again using the last 
successful
+                    // index as the start parameter.  Being this an exclusive 
bound, only
+                    // transactions later than it are expected.
+                    debitBankAccount.refresh()
+                    debitBankAccount.lastFiatFetch = it.index
+                }
+            }
+        }
+    }
+}
 
 // DB query helper.  The List return type (instead of SizedIterable) lets
 // the caller NOT open a transaction block to access the values -- although
@@ -100,6 +224,8 @@ private fun getUnsubmittedTransactions(bankAccountLabel: 
String): List<BankAccou
     }
 }
 
+// CASH-OUT SIDE.
+
 /**
  * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
  * on the 'watchedBankAccount' and submits the related cash-out payment
@@ -109,7 +235,8 @@ private fun getUnsubmittedTransactions(bankAccountLabel: 
String): List<BankAccou
 suspend fun cashoutMonitor(
     httpClient: HttpClient,
     watchedBankAccount: String = "admin",
-    demobankName: String = "default" // used to get config values.
+    demobankName: String = "default", // used to get config values.
+    dbEventTimeout: Long = 0 // 0 waits forever.
 ) {
     // Register for a REGIO_TX event.
     val eventChannel = buildChannelName(
@@ -132,15 +259,11 @@ suspend fun cashoutMonitor(
         /**
          * WARNING: Nexus gives the possibility to have bank account names
          * DIFFERENT from their owner's username.  Sandbox however MUST have
-         * its Nexus bank account named THE SAME as its username (until the
-         * config will allow to change).
+         * its Nexus bank account named THE SAME as its username.
          */
         ret + "bank-accounts/$usernameAtNexus/payment-initiations"
     }
     while (true) {
-        // delaying here avoids to delay in multiple places (errors,
-        // lack of action, success)
-        delay(2000)
         val listenHandle = PostgresListenHandle(eventChannel)
         // pessimistically LISTEN
         listenHandle.postgresListen()
@@ -152,11 +275,7 @@ suspend fun cashoutMonitor(
             listenHandle.postgresUnlisten()
         // Data not found, wait.
         else {
-            // OK to block, because the next event is going to
-            // be _this_ one.  The caller should however execute
-            // this whole logic in a thread other than the main
-            // HTTP server.
-            val isNotificationArrived = 
listenHandle.postgresGetNotifications(waitTimeout)
+            val isNotificationArrived = 
listenHandle.waitOnIODispatchers(dbEventTimeout)
             if (isNotificationArrived && listenHandle.receivedPayload == 
"CRDT")
                 newTxs = getUnsubmittedTransactions(watchedBankAccount)
         }
@@ -188,35 +307,41 @@ suspend fun cashoutMonitor(
             }
             // Hard-error, response did not even arrive.
             catch (e: Exception) {
+                logger.error("Cash-out monitor could not reach Nexus.  Pause 
and retry")
                 logger.error(e.message)
-                // mark as failed and proceed to the next one.
-                transaction {
-                    CashoutSubmissionEntity.new {
-                        this.localTransaction = it.id
-                        this.hasErrors = true
-                    }
-                    bankAccount.lastFiatSubmission = it
-                }
+                delay(2000)
                 return@forEach
             }
-            // Handle the non 2xx error case.  Here we try
-            // to store the response from Nexus.
+            // Server fault.  Pause and retry.
+            if (resp.status.value.toString().startsWith('5')) {
+                logger.error("Cash-out monitor POSTed to a failing Nexus.  
Pause and retry")
+                logger.error(resp.bodyAsText())
+                delay(2000L)
+            }
+            // Client fault, fail Sandbox.
+            if (resp.status.value.toString().startsWith('4')) {
+                logger.error("Cash-out monitor failed at POSTing to Nexus.  
Fail Sandbox")
+                logger.error("Nexus responded: ${resp.bodyAsText()}")
+                exitProcess(1)
+            }
+            // Expecting 200 OK.  What if 3xx?
             if (resp.status.value != HttpStatusCode.OK.value) {
-                val maybeResponseBody = resp.bodyAsText()
-                logger.error(
-                    "Fiat submission response was: $maybeResponseBody," +
-                            " status: ${resp.status.value}"
-                )
-                transaction {
-                    CashoutSubmissionEntity.new {
-                        localTransaction = it.id
-                        this.hasErrors = true
-                        if (maybeResponseBody.length > 0)
-                            this.maybeNexusResposnse = maybeResponseBody
+                logger.error("Cash-out monitor, unhandled response status: 
${resp.status.value}.  Fail Sandbox")
+                exitProcess(1)
+
+                // Previous versions use to store the faulty transaction
+                // and continue the execution.  The block below shows how
+                // to do that.
+
+                /*transaction {
+                  CashoutSubmissionEntity.new {
+                    localTransaction = it.id
+                    this.hasErrors = true
+                    if (maybeResponseBody.isNotEmpty())
+                      this.maybeNexusResposnse = maybeResponseBody
                     }
-                    bankAccount.lastFiatSubmission = it
-                }
-                return@forEach
+                  bankAccount.lastFiatSubmission = it
+                }*/
             }
             // Successful case, mark the wire transfer as submitted,
             // and advance the pointer to the last submitted payment.
@@ -231,7 +356,7 @@ suspend fun cashoutMonitor(
                     // unique identifier _as assigned by Nexus_.  Not
                     // currently used by Sandbox, but may help to resolve
                     // disputes.
-                    if (responseBody.length > 0)
+                    if (responseBody.isNotEmpty())
                         maybeNexusResposnse = responseBody
                 }
                 // Advancing the 'last submitted bookmark', to avoid
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index c8a1df18..ca83d31a 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -515,6 +515,16 @@ object BankAccountsTable : IntIdTable() {
      * cash-out operation.
      */
     val lastFiatSubmission = reference("lastFiatSubmission", 
BankAccountTransactionsTable).nullable()
+
+    /**
+     * Tracks the last fiat payment that was read from Nexus.  This tracker
+     * gets updated ONLY IF the exchange gets successfully paid with the 
related
+     * amount in the regional currency.  NOTE: in case of disputes, the 
customer
+     * will provide the date of a problematic withdrawal, and the regional 
currency
+     * administrator should check into the "admin" (regional) outgoing history 
by
+     * using such date as filter.
+     */
+    val lastFiatFetch = text("lastFiatFetch").default("0")
 }
 
 class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) {
@@ -528,6 +538,7 @@ class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) {
     var demoBank by DemobankConfigEntity referencedOn 
BankAccountsTable.demoBank
     var lastTransaction by BankAccountTransactionEntity optionalReferencedOn 
BankAccountsTable.lastTransaction
     var lastFiatSubmission by BankAccountTransactionEntity 
optionalReferencedOn BankAccountsTable.lastFiatSubmission
+    var lastFiatFetch by BankAccountsTable.lastFiatFetch
 }
 
 object BankAccountStatementsTable : IntIdTable() {

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]