gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] 03/03: wallet-core: simplify shepherd, handle result


From: gnunet
Subject: [taler-wallet-core] 03/03: wallet-core: simplify shepherd, handle results of cancelled tasks properly
Date: Tue, 05 Mar 2024 01:28:37 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

commit 4ffb4a94e8279896a11d65b66d71beb66ed6d009
Author: Florian Dold <florian@dold.me>
AuthorDate: Tue Mar 5 01:28:30 2024 +0100

    wallet-core: simplify shepherd, handle results of cancelled tasks properly
---
 .../src/integrationtests/test-payment-share.ts     |  36 +++--
 packages/taler-wallet-core/src/pay-merchant.ts     |  38 ++++-
 packages/taler-wallet-core/src/shepherd.ts         | 171 ++++++++-------------
 3 files changed, 123 insertions(+), 122 deletions(-)

diff --git a/packages/taler-harness/src/integrationtests/test-payment-share.ts 
b/packages/taler-harness/src/integrationtests/test-payment-share.ts
index ec7391586..4a7f853da 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-share.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-share.ts
@@ -100,10 +100,11 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
   t.logStep("orders-created");
 
   /**
-   * FIRST CASE, create in first wallet and pay in the second wallet
-   * first wallet should not be able to continue
+   * Case 1:
+   * - Claim with first wallet and pay in the second wallet.
+   * - First wallet should be notified.
    */
-  {
+  if (0) {
     const order = await createOrder("TESTKUDOS:5");
     // Claim the order with the first wallet
     const claimFirstWallet = await firstWallet.call(
@@ -168,10 +169,10 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
     }
 
     t.logStep("wait-for-payment");
-    firstWallet.waitForNotificationCond(n =>
-      n.type === NotificationType.TransactionStateTransition &&
-      n.transactionId === claimFirstWallet.transactionId
-    )
+    // firstWallet.waitForNotificationCond(n =>
+    //   n.type === NotificationType.TransactionStateTransition &&
+    //   n.transactionId === claimFirstWallet.transactionId
+    // )
     // Claim the order with the first wallet
     const claimFirstWalletAgain = await firstWallet.call(
       WalletApiOperation.PreparePayForUri,
@@ -210,8 +211,9 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
   t.logStep("first-case-done");
 
   /**
-   * SECOND CASE, create in first wallet and share to the second wallet
-   * pay with the first wallet, second wallet should not be able to continue
+   * Case 2:
+   * - Claim with first wallet and share with the second wallet
+   * - Pay with the first wallet, second wallet should be notified
    */
   {
     const order = await createOrder("TESTKUDOS:3");
@@ -225,6 +227,8 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       claimFirstWallet.status === PreparePayResultType.PaymentPossible,
     );
 
+    t.logStep("case2-w1-claimed");
+
     // share order from the first wallet
     const { privatePayUri } = await firstWallet.call(
       WalletApiOperation.SharePayment,
@@ -234,17 +238,21 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       },
     );
 
+    t.logStep("case2-w1-shared");
+
     // claim from the second wallet
     const claimSecondWallet = await secondWallet.call(
       WalletApiOperation.PreparePayForUri,
       { talerPayUri: privatePayUri },
     );
 
+    t.logStep("case2-w2-prepared");
+
     t.assertTrue(
       claimSecondWallet.status === PreparePayResultType.PaymentPossible,
     );
 
-    // pay from the second wallet
+    // pay from the first wallet
     const r2 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
       transactionId: claimFirstWallet.transactionId,
     });
@@ -263,10 +271,10 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
     t.assertAmountEquals(bal2.balances[0].available, "TESTKUDOS:14.23");
 
     t.logStep("wait-for-payment");
-    secondWallet.waitForNotificationCond(n =>
-      n.type === NotificationType.TransactionStateTransition &&
-      n.transactionId === claimSecondWallet.transactionId
-    )
+    // secondWallet.waitForNotificationCond(n =>
+    //   n.type === NotificationType.TransactionStateTransition &&
+    //   n.transactionId === claimSecondWallet.transactionId
+    // )
 
     // Claim the order with the first wallet
     const claimSecondWalletAgain = await secondWallet.call(
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts 
b/packages/taler-wallet-core/src/pay-merchant.ts
index 8eff7e17b..be3f7f106 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -880,7 +880,11 @@ async function createOrReusePurchase(
     );
     if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) {
       const download = await expectProposalDownload(wex, oldProposal);
-      const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, 
false);
+      const paid = await checkIfOrderIsAlreadyPaid(
+        wex,
+        download.contractData,
+        false,
+      );
       logger.info(`old proposal paid: ${paid}`);
       if (paid) {
         // if this transaction was shared and the order is paid then it
@@ -1912,6 +1916,11 @@ export async function confirmPay(
     hintTransactionId: transactionId,
   });
 
+  const ctx = new PayMerchantTransactionContext(wex, proposalId);
+
+  // In case we're sharing the payment and we're long-polling
+  wex.taskScheduler.stopShepherdTask(ctx.taskId);
+
   // Wait until we have completed the first attempt to pay.
   return waitPaymentResult(wex, proposalId);
 }
@@ -2011,7 +2020,11 @@ async function processPurchasePay(
   const download = await expectProposalDownload(wex, purchase);
 
   if (purchase.shared) {
-    const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, 
false);
+    const paid = await checkIfOrderIsAlreadyPaid(
+      wex,
+      download.contractData,
+      false,
+    );
 
     if (paid) {
       const transitionInfo = await wex.db.runReadWriteTx(
@@ -2463,17 +2476,24 @@ export async function sharePayment(
       // FIXME: purchase can be shared before being paid
       return undefined;
     }
+    const oldTxState = computePayMerchantTransactionState(p);
     if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
       p.purchaseStatus = PurchaseStatus.DialogShared;
       p.shared = true;
       tx.purchases.put(p);
     }
 
+    const newTxState = computePayMerchantTransactionState(p);
+
     return {
       proposalId: p.proposalId,
       nonce: p.noncePriv,
       session: p.lastSessionId ?? p.downloadSessionId,
       token: p.claimToken,
+      transitionInfo: {
+        oldTxState,
+        newTxState,
+      },
     };
   });
 
@@ -2481,8 +2501,11 @@ export async function sharePayment(
     throw Error("This purchase can't be shared");
   }
 
-  // schedule a task to watch for the status
   const ctx = new PayMerchantTransactionContext(wex, result.proposalId);
+
+  notifyTransition(wex, ctx.transactionId, result.transitionInfo);
+
+  // schedule a task to watch for the status
   wex.taskScheduler.startShepherdTask(ctx.taskId);
 
   const privatePayUri = stringifyPayUri({
@@ -2514,6 +2537,7 @@ async function checkIfOrderIsAlreadyPaid(
   const resp = await wex.http.fetch(requestUrl.href, {
     cancellationToken: wex.cancellationToken,
   });
+
   if (
     resp.status === HttpStatusCode.Ok ||
     resp.status === HttpStatusCode.Accepted ||
@@ -2539,7 +2563,11 @@ async function processPurchaseDialogShared(
     return TaskRunResult.finished();
   }
 
-  const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, 
true);
+  const paid = await checkIfOrderIsAlreadyPaid(
+    wex,
+    download.contractData,
+    true,
+  );
   if (paid) {
     const transitionInfo = await wex.db.runReadWriteTx(
       ["purchases"],
@@ -2551,7 +2579,7 @@ async function processPurchaseDialogShared(
         }
         const oldTxState = computePayMerchantTransactionState(p);
         p.purchaseStatus = PurchaseStatus.FailedClaim;
-        p.paidByOther = true
+        p.paidByOther = true;
         const newTxState = computePayMerchantTransactionState(p);
         await tx.purchases.put(p);
         return { oldTxState, newTxState };
diff --git a/packages/taler-wallet-core/src/shepherd.ts 
b/packages/taler-wallet-core/src/shepherd.ts
index db090c352..0544288ba 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -28,7 +28,6 @@ import {
   ObservabilityContext,
   ObservabilityEventType,
   RetryLoopOpts,
-  TalerError,
   TalerErrorCode,
   TalerErrorDetail,
   TaskThrottler,
@@ -37,6 +36,7 @@ import {
   TransactionType,
   WalletNotification,
   assertUnreachable,
+  getErrorDetailFromException,
   j2s,
   makeErrorDetail,
 } from "@gnu-taler/taler-util";
@@ -55,6 +55,7 @@ import { CryptoApiStoppedError } from 
"./crypto/workers/crypto-dispatcher.js";
 import {
   OPERATION_STATUS_ACTIVE_FIRST,
   OPERATION_STATUS_ACTIVE_LAST,
+  OperationRetryRecord,
   WalletDbAllStoresReadOnlyTransaction,
   WalletDbReadOnlyTransaction,
   timestampAbsoluteFromDb,
@@ -173,7 +174,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
   }
 
   getActiveTasks(): TaskIdStr[] {
-    return [...this.sheps.keys()]
+    return [...this.sheps.keys()];
   }
 
   ensureRunning(): void {
@@ -343,15 +344,21 @@ export class TaskSchedulerImpl implements TaskScheduler {
       const startTime = AbsoluteTime.now();
       logger.trace(`Shepherd for ${taskId} will call handler`);
       // FIXME: This should already return the retry record.
-      const res = await runTaskWithErrorReporting(this.ws, taskId, async () => 
{
-        return await callOperationHandlerForTaskId(wex, taskId);
-      });
-      const retryRecord = await this.ws.db.runReadOnlyTx(
-        ["operationRetries"],
-        async (tx) => {
-          return tx.operationRetries.get(taskId);
+      const res = await runTaskWithErrorReporting(
+        this.ws,
+        taskId,
+        info,
+        async () => {
+          return await callOperationHandlerForTaskId(wex, taskId);
         },
       );
+      if (info.cts.token.isCancelled) {
+        logger.info("task cancelled, not processing result");
+        return;
+      }
+      if (this.ws.stopped) {
+        logger.info("wallet stopped, not processing result");
+      }
       wex.oc.observe({
         type: ObservabilityEventType.ShepherdTaskResult,
         resultType: res.type,
@@ -359,46 +366,48 @@ export class TaskSchedulerImpl implements TaskScheduler {
       switch (res.type) {
         case TaskRunResultType.Error: {
           logger.trace(`Shepherd for ${taskId} got error result.`);
-          if (retryRecord) {
-            let delay: Duration;
-            const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
-            delay = AbsoluteTime.remaining(t);
-            logger.trace(`Waiting for ${delay.d_ms} ms`);
-            await this.wait(taskId, info, delay);
-          } else {
-            logger.trace("Retrying immediately.");
-          }
+          const retryRecord = await storePendingTaskError(
+            this.ws,
+            taskId,
+            res.errorDetail,
+          );
+          let delay: Duration;
+          const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+          delay = AbsoluteTime.remaining(t);
+          logger.trace(`Waiting for ${delay.d_ms} ms`);
+          await this.wait(taskId, info, delay);
           break;
         }
         case TaskRunResultType.Backoff: {
           logger.trace(`Shepherd for ${taskId} got backoff result.`);
-          if (retryRecord) {
-            let delay: Duration;
-            const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
-            delay = AbsoluteTime.remaining(t);
-            logger.trace(`Waiting for ${delay.d_ms} ms`);
-            await this.wait(taskId, info, delay);
-          } else {
-            logger.trace("Retrying immediately.");
-          }
+          const retryRecord = await storePendingTaskPending(this.ws, taskId);
+          let delay: Duration;
+          const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+          delay = AbsoluteTime.remaining(t);
+          logger.trace(`Waiting for ${delay.d_ms} ms`);
+          await this.wait(taskId, info, delay);
           break;
         }
         case TaskRunResultType.Progress: {
           logger.trace(
             `Shepherd for ${taskId} got progress result, re-running 
immediately.`,
           );
+          await storeTaskProgress(this.ws, taskId);
           break;
         }
         case TaskRunResultType.ScheduleLater:
           logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+          await storeTaskProgress(this.ws, taskId);
           const delay = AbsoluteTime.remaining(res.runAt);
           logger.trace(`Waiting for ${delay.d_ms} ms`);
           await this.wait(taskId, info, delay);
           break;
         case TaskRunResultType.Finished:
           logger.trace(`Shepherd for ${taskId} got finished result.`);
+          await storePendingTaskFinished(this.ws, taskId);
           return;
         case TaskRunResultType.LongpollReturnedPending: {
+          await storeTaskProgress(this.ws, taskId);
           // Make sure that we are waiting a bit if long-polling returned too 
early.
           const endTime = AbsoluteTime.now();
           const taskDuration = AbsoluteTime.difference(endTime, startTime);
@@ -425,9 +434,9 @@ async function storePendingTaskError(
   ws: InternalWalletState,
   pendingTaskId: string,
   e: TalerErrorDetail,
-): Promise<void> {
+): Promise<OperationRetryRecord> {
   logger.info(`storing pending task error for ${pendingTaskId}`);
-  const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+  const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
     let retryRecord = await tx.operationRetries.get(pendingTaskId);
     if (!retryRecord) {
       retryRecord = {
@@ -440,11 +449,15 @@ async function storePendingTaskError(
       retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
     }
     await tx.operationRetries.put(retryRecord);
-    return taskToRetryNotification(ws, tx, pendingTaskId, e);
+    return {
+      notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
+      retryRecord,
+    };
   });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
+  if (res?.notification) {
+    ws.notify(res.notification);
   }
+  return res.retryRecord;
 }
 
 /**
@@ -462,8 +475,8 @@ async function storeTaskProgress(
 async function storePendingTaskPending(
   ws: InternalWalletState,
   pendingTaskId: string,
-): Promise<void> {
-  const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+): Promise<OperationRetryRecord> {
+  const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
     let retryRecord = await tx.operationRetries.get(pendingTaskId);
     let hadError = false;
     if (!retryRecord) {
@@ -479,15 +492,24 @@ async function storePendingTaskPending(
       retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
     }
     await tx.operationRetries.put(retryRecord);
+    let notification: WalletNotification | undefined = undefined;
     if (hadError) {
-      return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
-    } else {
-      return undefined;
+      notification = await taskToRetryNotification(
+        ws,
+        tx,
+        pendingTaskId,
+        undefined,
+      );
     }
+    return {
+      notification,
+      retryRecord,
+    };
   });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
+  if (res.notification) {
+    ws.notify(res.notification);
   }
+  return res.retryRecord;
 }
 
 async function storePendingTaskFinished(
@@ -502,33 +524,11 @@ async function storePendingTaskFinished(
 async function runTaskWithErrorReporting(
   ws: InternalWalletState,
   opId: TaskIdStr,
+  info: ShepherdInfo,
   f: () => Promise<TaskRunResult>,
 ): Promise<TaskRunResult> {
-  let maybeError: TalerErrorDetail | undefined;
   try {
-    const resp = await f();
-    switch (resp.type) {
-      case TaskRunResultType.Error:
-        await storePendingTaskError(ws, opId, resp.errorDetail);
-        return resp;
-      case TaskRunResultType.Finished:
-        await storePendingTaskFinished(ws, opId);
-        return resp;
-      case TaskRunResultType.Backoff:
-        await storePendingTaskPending(ws, opId);
-        return resp;
-      case TaskRunResultType.ScheduleLater:
-        // Task succeeded but wants to be run again.
-        await storeTaskProgress(ws, opId);
-        return resp;
-      case TaskRunResultType.Progress:
-        await storeTaskProgress(ws, opId);
-        return resp;
-      case TaskRunResultType.LongpollReturnedPending:
-        // Longpoll should be run again immediately.
-        await storeTaskProgress(ws, opId);
-        return resp;
-    }
+    return await f();
   } catch (e) {
     if (e instanceof CryptoApiStoppedError) {
       if (ws.stopped) {
@@ -543,46 +543,11 @@ async function runTaskWithErrorReporting(
         };
       }
     }
-    if (e instanceof TalerError) {
-      logger.warn("operation processed resulted in error");
-      logger.warn(`error was: ${j2s(e.errorDetail)}`);
-      maybeError = e.errorDetail;
-      await storePendingTaskError(ws, opId, maybeError!);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: e.errorDetail,
-      };
-    } else if (e instanceof Error) {
-      // This is a bug, as we expect pending operations to always
-      // do their own error handling and only throw 
WALLET_PENDING_OPERATION_FAILED
-      // or return something.
-      logger.error(`Uncaught exception: ${e.message}`);
-      logger.error(`Stack: ${e.stack}`);
-      maybeError = makeErrorDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        {
-          stack: e.stack,
-        },
-        `unexpected exception (message: ${e.message})`,
-      );
-      await storePendingTaskError(ws, opId, maybeError);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: maybeError,
-      };
-    } else {
-      logger.error("Uncaught exception, value is not even an error.");
-      maybeError = makeErrorDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        {},
-        `unexpected exception (not even an error)`,
-      );
-      await storePendingTaskError(ws, opId, maybeError);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: maybeError,
-      };
-    }
+    const errorDetail = getErrorDetailFromException(e);
+    return {
+      type: TaskRunResultType.Error,
+      errorDetail,
+    };
   }
 }
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]