gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] 01/02: wallet-core: notification-based waiting for d


From: gnunet
Subject: [taler-wallet-core] 01/02: wallet-core: notification-based waiting for dependent transactions instead of long-polling
Date: Fri, 08 Mar 2024 18:23:18 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

commit 0a2540d676904e804544b95959bae223e42bc0c1
Author: Florian Dold <florian@dold.me>
AuthorDate: Fri Mar 8 10:30:23 2024 +0100

    wallet-core: notification-based waiting for dependent transactions instead 
of long-polling
---
 packages/taler-wallet-core/src/exchanges.ts        | 12 ++-
 packages/taler-wallet-core/src/pay-merchant.ts     |  2 +
 .../taler-wallet-core/src/pay-peer-pull-credit.ts  |  2 +
 .../taler-wallet-core/src/pay-peer-push-credit.ts  |  2 +
 .../taler-wallet-core/src/pay-peer-push-debit.ts   |  5 +-
 packages/taler-wallet-core/src/refresh.ts          | 77 ++++++++++++++++++-
 packages/taler-wallet-core/src/withdraw.ts         | 86 ++++++++++++++++++++--
 7 files changed, 177 insertions(+), 9 deletions(-)

diff --git a/packages/taler-wallet-core/src/exchanges.ts 
b/packages/taler-wallet-core/src/exchanges.ts
index 1fb3a8795..43ab8ac4e 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -999,6 +999,9 @@ async function internalWaitReadyExchange(
     exchangeBaseUrl: canonUrl,
   });
   while (true) {
+    if (wex.cancellationToken.isCancelled) {
+      throw Error("cancelled");
+    }
     logger.info(`waiting for ready exchange ${canonUrl}`);
     const { exchange, exchangeDetails, retryInfo, scopeInfo } =
       await wex.db.runReadOnlyTx(
@@ -1128,14 +1131,13 @@ export async function fetchFreshExchange(
     forceUpdate: options.forceUpdate,
   });
 
-  return waitReadyExchange(wex, canonUrl, options);
+  return await waitReadyExchange(wex, canonUrl, options);
 }
 
 async function waitReadyExchange(
   wex: WalletExecutionContext,
   canonUrl: string,
   options: {
-    cancellationToken?: CancellationToken;
     forceUpdate?: boolean;
     expectedMasterPub?: string;
   } = {},
@@ -1155,6 +1157,11 @@ async function waitReadyExchange(
     }
   });
 
+  const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+    cancelNotif();
+    exchangeNotifFlag.raise();
+  });
+
   try {
     const res = await internalWaitReadyExchange(
       wex,
@@ -1165,6 +1172,7 @@ async function waitReadyExchange(
     logger.info("done waiting for ready exchange");
     return res;
   } finally {
+    unregisterOnCancelled();
     cancelNotif();
   }
 }
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts 
b/packages/taler-wallet-core/src/pay-merchant.ts
index 4f9c20c9e..872e554c9 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -1458,6 +1458,7 @@ async function waitProposalDownloaded(
   wex: WalletExecutionContext,
   proposalId: string,
 ): Promise<void> {
+  // FIXME: This doesn't support cancellation yet
   const ctx = new PayMerchantTransactionContext(wex, proposalId);
 
   logger.info(`waiting for ${ctx.transactionId} to be downloaded`);
@@ -1711,6 +1712,7 @@ async function waitPaymentResult(
   proposalId: string,
   waitSessionId?: string,
 ): Promise<ConfirmPayResult> {
+  // FIXME: We don't support cancelletion yet!
   const ctx = new PayMerchantTransactionContext(wex, proposalId);
   wex.taskScheduler.startShepherdTask(ctx.taskId);
 
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts 
b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
index de30f66d2..96d8f65a6 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -84,6 +84,7 @@ import { WalletExecutionContext } from "./wallet.js";
 import {
   getExchangeWithdrawalInfo,
   internalCreateWithdrawalGroup,
+  waitWithdrawalFinal,
 } from "./withdraw.js";
 
 const logger = new Logger("pay-peer-pull-credit.ts");
@@ -584,6 +585,7 @@ async function handlePeerPullCreditWithdrawing(
   if (!pullIni.withdrawalGroupId) {
     throw Error("invalid db state (withdrawing, but no withdrawal group ID");
   }
+  await waitWithdrawalFinal(wex, pullIni.withdrawalGroupId);
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.PeerPullCredit,
     pursePub: pullIni.pursePub,
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts 
b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
index ecc1e827f..281b3ff61 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -88,6 +88,7 @@ import {
   getExchangeWithdrawalInfo,
   internalPerformCreateWithdrawalGroup,
   internalPrepareCreateWithdrawalGroup,
+  waitWithdrawalFinal,
 } from "./withdraw.js";
 
 const logger = new Logger("pay-peer-push-credit.ts");
@@ -789,6 +790,7 @@ async function handlePendingWithdrawing(
   if (!peerInc.withdrawalGroupId) {
     throw Error("invalid db state (withdrawing, but no withdrawal group ID");
   }
+  await waitWithdrawalFinal(wex, peerInc.withdrawalGroupId);
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.PeerPushCredit,
     peerPushCreditId: peerInc.peerPushCreditId,
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts 
b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index cf4e7b619..ab80888eb 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -72,7 +72,7 @@ import {
   getTotalPeerPaymentCost,
   queryCoinInfosForSelection,
 } from "./pay-peer-common.js";
-import { createRefreshGroup } from "./refresh.js";
+import { createRefreshGroup, waitRefreshFinal } from "./refresh.js";
 import {
   constructTransactionIdentifier,
   notifyTransition,
@@ -682,6 +682,9 @@ async function processPeerPushDebitAbortingRefreshDeleted(
     tag: TransactionType.PeerPushDebit,
     pursePub: peerPushInitiation.pursePub,
   });
+  if (peerPushInitiation.abortRefreshGroupId) {
+    await waitRefreshFinal(wex, peerPushInitiation.abortRefreshGroupId);
+  }
   const transitionInfo = await wex.db.runReadWriteTx(
     ["refreshGroups", "peerPushDebit"],
     async (tx) => {
diff --git a/packages/taler-wallet-core/src/refresh.ts 
b/packages/taler-wallet-core/src/refresh.ts
index c6a7b768d..7bf231870 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -21,6 +21,7 @@ import {
   Amounts,
   amountToPretty,
   assertUnreachable,
+  AsyncFlag,
   checkDbInvariant,
   codecForExchangeMeltResponse,
   codecForExchangeRevealResponse,
@@ -61,7 +62,6 @@ import {
   readSuccessResponseJsonOrThrow,
   readUnexpectedResponseDetails,
 } from "@gnu-taler/taler-util/http";
-import { selectWithdrawalDenominations } from "./denomSelection.js";
 import {
   constructTaskIdentifier,
   makeCoinAvailable,
@@ -92,6 +92,7 @@ import {
   WalletDbReadOnlyTransaction,
   WalletDbReadWriteTransaction,
 } from "./db.js";
+import { selectWithdrawalDenominations } from "./denomSelection.js";
 import { fetchFreshExchange } from "./exchanges.js";
 import {
   constructTransactionIdentifier,
@@ -1462,3 +1463,77 @@ export async function forceRefresh(
     refreshGroupId: res.refreshGroupId,
   };
 }
+
+/**
+ * Wait until a refresh operation is final.
+ */
+export async function waitRefreshFinal(
+  wex: WalletExecutionContext,
+  refreshGroupId: string,
+): Promise<void> {
+  const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+  wex.taskScheduler.startShepherdTask(ctx.taskId);
+
+  // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+  const refreshNotifFlag = new AsyncFlag();
+  // Raise purchaseNotifFlag whenever we get a notification
+  // about our refresh.
+  const cancelNotif = wex.ws.addNotificationListener((notif) => {
+    if (
+      notif.type === NotificationType.TransactionStateTransition &&
+      notif.transactionId === ctx.transactionId
+    ) {
+      refreshNotifFlag.raise();
+    }
+  });
+  const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+    cancelNotif();
+    refreshNotifFlag.raise();
+  });
+
+  try {
+    await internalWaitRefreshFinal(ctx, refreshNotifFlag);
+  } catch (e) {
+    unregisterOnCancelled();
+    cancelNotif();
+  }
+}
+
+async function internalWaitRefreshFinal(
+  ctx: RefreshTransactionContext,
+  flag: AsyncFlag,
+): Promise<void> {
+  while (true) {
+    if (ctx.wex.cancellationToken.isCancelled) {
+      throw Error("cancelled");
+    }
+
+    // Check if refresh is final
+    const res = await ctx.wex.db.runReadOnlyTx(
+      ["refreshGroups", "operationRetries"],
+      async (tx) => {
+        return {
+          rg: await tx.refreshGroups.get(ctx.refreshGroupId),
+        };
+      },
+    );
+    const { rg } = res;
+    if (!rg) {
+      // Must've been deleted, we consider that final.
+      return;
+    }
+    switch (rg.operationStatus) {
+      case RefreshOperationStatus.Failed:
+      case RefreshOperationStatus.Finished:
+        // Transaction is final
+        return;
+      case RefreshOperationStatus.Pending:
+      case RefreshOperationStatus.Suspended:
+        break;
+    }
+
+    // Wait for the next transition
+    await flag.wait();
+    flag.reset();
+  }
+}
diff --git a/packages/taler-wallet-core/src/withdraw.ts 
b/packages/taler-wallet-core/src/withdraw.ts
index 0f70479a5..f27e9e132 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -94,10 +94,6 @@ import {
   readSuccessResponseJsonOrThrow,
   throwUnexpectedRequestError,
 } from "@gnu-taler/taler-util/http";
-import {
-  selectForcedWithdrawalDenominations,
-  selectWithdrawalDenominations,
-} from "./denomSelection.js";
 import {
   PendingTaskType,
   TaskIdStr,
@@ -127,6 +123,10 @@ import {
   WithdrawalRecordType,
   timestampPreciseToDb,
 } from "./db.js";
+import {
+  selectForcedWithdrawalDenominations,
+  selectWithdrawalDenominations,
+} from "./denomSelection.js";
 import { isWithdrawableDenom } from "./denominations.js";
 import {
   ReadyExchangeSummary,
@@ -1935,7 +1935,8 @@ export async function getWithdrawalDetailsForUri(
             uri: talerWithdrawUri,
           });
         }
-      }).finally(() => {
+      })
+      .finally(() => {
         ongoingChecks[talerWithdrawUri] = false;
       });
   }
@@ -2703,6 +2704,7 @@ async function waitWithdrawalRegistered(
   wex: WalletExecutionContext,
   ctx: WithdrawTransactionContext,
 ): Promise<void> {
+  // FIXME: Doesn't support cancellation yet
   // FIXME: We should use Symbol.dispose magic here for cleanup!
 
   const withdrawalNotifFlag = new AsyncFlag();
@@ -2914,3 +2916,77 @@ export async function createManualWithdrawal(
     transactionId: ctx.transactionId,
   };
 }
+
+/**
+ * Wait until a refresh operation is final.
+ */
+export async function waitWithdrawalFinal(
+  wex: WalletExecutionContext,
+  withdrawalGroupId: string,
+): Promise<void> {
+  const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId);
+  wex.taskScheduler.startShepherdTask(ctx.taskId);
+
+  // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+  const withdrawalNotifFlag = new AsyncFlag();
+  // Raise purchaseNotifFlag whenever we get a notification
+  // about our refresh.
+  const cancelNotif = wex.ws.addNotificationListener((notif) => {
+    if (
+      notif.type === NotificationType.TransactionStateTransition &&
+      notif.transactionId === ctx.transactionId
+    ) {
+      withdrawalNotifFlag.raise();
+    }
+  });
+  const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+    cancelNotif();
+    withdrawalNotifFlag.raise();
+  });
+
+  try {
+    await internalWaitWithdrawalFinal(ctx, withdrawalNotifFlag);
+  } catch (e) {
+    unregisterOnCancelled();
+    cancelNotif();
+  }
+}
+
+async function internalWaitWithdrawalFinal(
+  ctx: WithdrawTransactionContext,
+  flag: AsyncFlag,
+): Promise<void> {
+  while (true) {
+    if (ctx.wex.cancellationToken.isCancelled) {
+      throw Error("cancelled");
+    }
+
+    // Check if refresh is final
+    const res = await ctx.wex.db.runReadOnlyTx(
+      ["withdrawalGroups", "operationRetries"],
+      async (tx) => {
+        return {
+          wg: await tx.withdrawalGroups.get(ctx.withdrawalGroupId),
+        };
+      },
+    );
+    const { wg } = res;
+    if (!wg) {
+      // Must've been deleted, we consider that final.
+      return;
+    }
+    switch (wg.status) {
+      case WithdrawalGroupStatus.AbortedBank:
+      case WithdrawalGroupStatus.AbortedExchange:
+      case WithdrawalGroupStatus.Done:
+      case WithdrawalGroupStatus.FailedAbortingBank:
+      case WithdrawalGroupStatus.FailedBankAborted:
+        // Transaction is final
+        return;
+    }
+
+    // Wait for the next transition
+    await flag.wait();
+    flag.reset();
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]