gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated (74fcd99c8 -> 4ffb4a94e)


From: gnunet
Subject: [taler-wallet-core] branch master updated (74fcd99c8 -> 4ffb4a94e)
Date: Tue, 05 Mar 2024 01:28:34 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a change to branch master
in repository wallet-core.

    from 74fcd99c8 taler-wallet-cli: don't run task loop before init
     new 3b8124b97 notify the UI about order paid by another wallet
     new 8a1118624 assert that the order is paid in the first wallet after the 
other wallet has paid
     new 4ffb4a94e wallet-core: simplify shepherd, handle results of cancelled 
tasks properly

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/integrationtests/test-payment-share.ts     |  39 ++++-
 packages/taler-wallet-core/src/db.ts               |   6 +
 packages/taler-wallet-core/src/pay-merchant.ts     |  52 ++++++-
 packages/taler-wallet-core/src/shepherd.ts         | 171 ++++++++-------------
 4 files changed, 153 insertions(+), 115 deletions(-)

diff --git a/packages/taler-harness/src/integrationtests/test-payment-share.ts 
b/packages/taler-harness/src/integrationtests/test-payment-share.ts
index d832a7e50..4a7f853da 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-share.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-share.ts
@@ -20,6 +20,7 @@
 import {
   ConfirmPayResultType,
   MerchantApiClient,
+  NotificationType,
   PreparePayResultType,
 } from "@gnu-taler/taler-util";
 import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
@@ -99,10 +100,11 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
   t.logStep("orders-created");
 
   /**
-   * FIRST CASE, create in first wallet and pay in the second wallet
-   * first wallet should not be able to continue
+   * Case 1:
+   * - Claim with first wallet and pay in the second wallet.
+   * - First wallet should be notified.
    */
-  {
+  if (0) {
     const order = await createOrder("TESTKUDOS:5");
     // Claim the order with the first wallet
     const claimFirstWallet = await firstWallet.call(
@@ -166,6 +168,11 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       t.assertAmountEquals(second.balances[0].available, "TESTKUDOS:14.23");
     }
 
+    t.logStep("wait-for-payment");
+    // firstWallet.waitForNotificationCond(n =>
+    //   n.type === NotificationType.TransactionStateTransition &&
+    //   n.transactionId === claimFirstWallet.transactionId
+    // )
     // Claim the order with the first wallet
     const claimFirstWalletAgain = await firstWallet.call(
       WalletApiOperation.PreparePayForUri,
@@ -175,6 +182,7 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
     t.assertTrue(
       claimFirstWalletAgain.status === PreparePayResultType.AlreadyConfirmed,
     );
+    t.assertTrue( claimFirstWalletAgain.paid );
 
     t.logStep("w1-prepared-again");
 
@@ -182,7 +190,7 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       transactionId: claimFirstWallet.transactionId,
     });
 
-    t.assertTrue(r1.type === ConfirmPayResultType.Done);
+    t.assertTrue(r1.type === ConfirmPayResultType.Pending);
 
     t.logStep("w1-confirmed-shared");
 
@@ -203,8 +211,9 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
   t.logStep("first-case-done");
 
   /**
-   * SECOND CASE, create in first wallet and share to the second wallet
-   * pay with the first wallet, second wallet should not be able to continue
+   * Case 2:
+   * - Claim with first wallet and share with the second wallet
+   * - Pay with the first wallet, second wallet should be notified
    */
   {
     const order = await createOrder("TESTKUDOS:3");
@@ -218,6 +227,8 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       claimFirstWallet.status === PreparePayResultType.PaymentPossible,
     );
 
+    t.logStep("case2-w1-claimed");
+
     // share order from the first wallet
     const { privatePayUri } = await firstWallet.call(
       WalletApiOperation.SharePayment,
@@ -227,17 +238,21 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       },
     );
 
+    t.logStep("case2-w1-shared");
+
     // claim from the second wallet
     const claimSecondWallet = await secondWallet.call(
       WalletApiOperation.PreparePayForUri,
       { talerPayUri: privatePayUri },
     );
 
+    t.logStep("case2-w2-prepared");
+
     t.assertTrue(
       claimSecondWallet.status === PreparePayResultType.PaymentPossible,
     );
 
-    // pay from the second wallet
+    // pay from the first wallet
     const r2 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
       transactionId: claimFirstWallet.transactionId,
     });
@@ -255,6 +270,12 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
     t.assertAmountEquals(bal1.balances[0].available, "TESTKUDOS:16.18");
     t.assertAmountEquals(bal2.balances[0].available, "TESTKUDOS:14.23");
 
+    t.logStep("wait-for-payment");
+    // secondWallet.waitForNotificationCond(n =>
+    //   n.type === NotificationType.TransactionStateTransition &&
+    //   n.transactionId === claimSecondWallet.transactionId
+    // )
+
     // Claim the order with the first wallet
     const claimSecondWalletAgain = await secondWallet.call(
       WalletApiOperation.PreparePayForUri,
@@ -264,6 +285,10 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
     t.assertTrue(
       claimSecondWalletAgain.status === PreparePayResultType.AlreadyConfirmed,
     );
+    t.assertTrue(
+      claimSecondWalletAgain.paid,
+    );
+
   }
 
   t.logStep("second-case-done");
diff --git a/packages/taler-wallet-core/src/db.ts 
b/packages/taler-wallet-core/src/db.ts
index 9efc9fbe0..7e4f9fa4a 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -1326,6 +1326,12 @@ export interface PurchaseRecord {
    * did not picked up yet
    */
   refundAmountAwaiting: AmountString | undefined;
+
+  /**
+   * When the purchase has been shared to other wallet
+   * and the other wallet completed before this one.
+   */
+  paidByOther: boolean | undefined;
 }
 
 export enum ConfigRecordKey {
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts 
b/packages/taler-wallet-core/src/pay-merchant.ts
index 05cbd26b7..be3f7f106 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -880,7 +880,11 @@ async function createOrReusePurchase(
     );
     if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) {
       const download = await expectProposalDownload(wex, oldProposal);
-      const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData);
+      const paid = await checkIfOrderIsAlreadyPaid(
+        wex,
+        download.contractData,
+        false,
+      );
       logger.info(`old proposal paid: ${paid}`);
       if (paid) {
         // if this transaction was shared and the order is paid then it
@@ -895,6 +899,7 @@ async function createOrReusePurchase(
             }
             const oldTxState = computePayMerchantTransactionState(p);
             p.purchaseStatus = PurchaseStatus.FailedClaim;
+            p.paidByOther = true;
             const newTxState = computePayMerchantTransactionState(p);
             await tx.purchases.put(p);
             return { oldTxState, newTxState };
@@ -942,6 +947,7 @@ async function createOrReusePurchase(
     lastSessionId: undefined,
     merchantPaySig: undefined,
     payInfo: undefined,
+    paidByOther: undefined,
     refundAmountAwaiting: undefined,
     timestampAccept: undefined,
     timestampFirstSuccessfulPay: undefined,
@@ -1377,7 +1383,7 @@ async function checkPaymentByProposalId(
       status: PreparePayResultType.AlreadyConfirmed,
       contractTerms: download.contractTermsRaw,
       contractTermsHash: download.contractData.contractTermsHash,
-      paid: false,
+      paid: purchase.paidByOther === true,
       amountRaw: Amounts.stringify(download.contractData.amount),
       amountEffective: purchase.payInfo
         ? Amounts.stringify(purchase.payInfo.totalPayCost)
@@ -1910,6 +1916,11 @@ export async function confirmPay(
     hintTransactionId: transactionId,
   });
 
+  const ctx = new PayMerchantTransactionContext(wex, proposalId);
+
+  // In case we're sharing the payment and we're long-polling
+  wex.taskScheduler.stopShepherdTask(ctx.taskId);
+
   // Wait until we have completed the first attempt to pay.
   return waitPaymentResult(wex, proposalId);
 }
@@ -2009,7 +2020,11 @@ async function processPurchasePay(
   const download = await expectProposalDownload(wex, purchase);
 
   if (purchase.shared) {
-    const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData);
+    const paid = await checkIfOrderIsAlreadyPaid(
+      wex,
+      download.contractData,
+      false,
+    );
 
     if (paid) {
       const transitionInfo = await wex.db.runReadWriteTx(
@@ -2022,6 +2037,7 @@ async function processPurchasePay(
           }
           const oldTxState = computePayMerchantTransactionState(p);
           p.purchaseStatus = PurchaseStatus.FailedClaim;
+          p.paidByOther = true;
           const newTxState = computePayMerchantTransactionState(p);
           await tx.purchases.put(p);
           return { oldTxState, newTxState };
@@ -2460,22 +2476,38 @@ export async function sharePayment(
       // FIXME: purchase can be shared before being paid
       return undefined;
     }
+    const oldTxState = computePayMerchantTransactionState(p);
     if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
       p.purchaseStatus = PurchaseStatus.DialogShared;
       p.shared = true;
       tx.purchases.put(p);
     }
 
+    const newTxState = computePayMerchantTransactionState(p);
+
     return {
+      proposalId: p.proposalId,
       nonce: p.noncePriv,
       session: p.lastSessionId ?? p.downloadSessionId,
       token: p.claimToken,
+      transitionInfo: {
+        oldTxState,
+        newTxState,
+      },
     };
   });
 
   if (result === undefined) {
     throw Error("This purchase can't be shared");
   }
+
+  const ctx = new PayMerchantTransactionContext(wex, result.proposalId);
+
+  notifyTransition(wex, ctx.transactionId, result.transitionInfo);
+
+  // schedule a task to watch for the status
+  wex.taskScheduler.startShepherdTask(ctx.taskId);
+
   const privatePayUri = stringifyPayUri({
     merchantBaseUrl,
     orderId,
@@ -2483,12 +2515,14 @@ export async function sharePayment(
     noncePriv: result.nonce,
     claimToken: result.token,
   });
+
   return { privatePayUri };
 }
 
 async function checkIfOrderIsAlreadyPaid(
   wex: WalletExecutionContext,
   contract: WalletContractData,
+  doLongPolling: boolean,
 ) {
   const requestUrl = new URL(
     `orders/${contract.orderId}`,
@@ -2496,11 +2530,14 @@ async function checkIfOrderIsAlreadyPaid(
   );
   requestUrl.searchParams.set("h_contract", contract.contractTermsHash);
 
-  requestUrl.searchParams.set("timeout_ms", "1000");
+  if (doLongPolling) {
+    requestUrl.searchParams.set("timeout_ms", "30000");
+  }
 
   const resp = await wex.http.fetch(requestUrl.href, {
     cancellationToken: wex.cancellationToken,
   });
+
   if (
     resp.status === HttpStatusCode.Ok ||
     resp.status === HttpStatusCode.Accepted ||
@@ -2526,7 +2563,11 @@ async function processPurchaseDialogShared(
     return TaskRunResult.finished();
   }
 
-  const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData);
+  const paid = await checkIfOrderIsAlreadyPaid(
+    wex,
+    download.contractData,
+    true,
+  );
   if (paid) {
     const transitionInfo = await wex.db.runReadWriteTx(
       ["purchases"],
@@ -2538,6 +2579,7 @@ async function processPurchaseDialogShared(
         }
         const oldTxState = computePayMerchantTransactionState(p);
         p.purchaseStatus = PurchaseStatus.FailedClaim;
+        p.paidByOther = true;
         const newTxState = computePayMerchantTransactionState(p);
         await tx.purchases.put(p);
         return { oldTxState, newTxState };
diff --git a/packages/taler-wallet-core/src/shepherd.ts 
b/packages/taler-wallet-core/src/shepherd.ts
index db090c352..0544288ba 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -28,7 +28,6 @@ import {
   ObservabilityContext,
   ObservabilityEventType,
   RetryLoopOpts,
-  TalerError,
   TalerErrorCode,
   TalerErrorDetail,
   TaskThrottler,
@@ -37,6 +36,7 @@ import {
   TransactionType,
   WalletNotification,
   assertUnreachable,
+  getErrorDetailFromException,
   j2s,
   makeErrorDetail,
 } from "@gnu-taler/taler-util";
@@ -55,6 +55,7 @@ import { CryptoApiStoppedError } from 
"./crypto/workers/crypto-dispatcher.js";
 import {
   OPERATION_STATUS_ACTIVE_FIRST,
   OPERATION_STATUS_ACTIVE_LAST,
+  OperationRetryRecord,
   WalletDbAllStoresReadOnlyTransaction,
   WalletDbReadOnlyTransaction,
   timestampAbsoluteFromDb,
@@ -173,7 +174,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
   }
 
   getActiveTasks(): TaskIdStr[] {
-    return [...this.sheps.keys()]
+    return [...this.sheps.keys()];
   }
 
   ensureRunning(): void {
@@ -343,15 +344,21 @@ export class TaskSchedulerImpl implements TaskScheduler {
       const startTime = AbsoluteTime.now();
       logger.trace(`Shepherd for ${taskId} will call handler`);
       // FIXME: This should already return the retry record.
-      const res = await runTaskWithErrorReporting(this.ws, taskId, async () => 
{
-        return await callOperationHandlerForTaskId(wex, taskId);
-      });
-      const retryRecord = await this.ws.db.runReadOnlyTx(
-        ["operationRetries"],
-        async (tx) => {
-          return tx.operationRetries.get(taskId);
+      const res = await runTaskWithErrorReporting(
+        this.ws,
+        taskId,
+        info,
+        async () => {
+          return await callOperationHandlerForTaskId(wex, taskId);
         },
       );
+      if (info.cts.token.isCancelled) {
+        logger.info("task cancelled, not processing result");
+        return;
+      }
+      if (this.ws.stopped) {
+        logger.info("wallet stopped, not processing result");
+      }
       wex.oc.observe({
         type: ObservabilityEventType.ShepherdTaskResult,
         resultType: res.type,
@@ -359,46 +366,48 @@ export class TaskSchedulerImpl implements TaskScheduler {
       switch (res.type) {
         case TaskRunResultType.Error: {
           logger.trace(`Shepherd for ${taskId} got error result.`);
-          if (retryRecord) {
-            let delay: Duration;
-            const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
-            delay = AbsoluteTime.remaining(t);
-            logger.trace(`Waiting for ${delay.d_ms} ms`);
-            await this.wait(taskId, info, delay);
-          } else {
-            logger.trace("Retrying immediately.");
-          }
+          const retryRecord = await storePendingTaskError(
+            this.ws,
+            taskId,
+            res.errorDetail,
+          );
+          let delay: Duration;
+          const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+          delay = AbsoluteTime.remaining(t);
+          logger.trace(`Waiting for ${delay.d_ms} ms`);
+          await this.wait(taskId, info, delay);
           break;
         }
         case TaskRunResultType.Backoff: {
           logger.trace(`Shepherd for ${taskId} got backoff result.`);
-          if (retryRecord) {
-            let delay: Duration;
-            const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
-            delay = AbsoluteTime.remaining(t);
-            logger.trace(`Waiting for ${delay.d_ms} ms`);
-            await this.wait(taskId, info, delay);
-          } else {
-            logger.trace("Retrying immediately.");
-          }
+          const retryRecord = await storePendingTaskPending(this.ws, taskId);
+          let delay: Duration;
+          const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+          delay = AbsoluteTime.remaining(t);
+          logger.trace(`Waiting for ${delay.d_ms} ms`);
+          await this.wait(taskId, info, delay);
           break;
         }
         case TaskRunResultType.Progress: {
           logger.trace(
             `Shepherd for ${taskId} got progress result, re-running 
immediately.`,
           );
+          await storeTaskProgress(this.ws, taskId);
           break;
         }
         case TaskRunResultType.ScheduleLater:
           logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+          await storeTaskProgress(this.ws, taskId);
           const delay = AbsoluteTime.remaining(res.runAt);
           logger.trace(`Waiting for ${delay.d_ms} ms`);
           await this.wait(taskId, info, delay);
           break;
         case TaskRunResultType.Finished:
           logger.trace(`Shepherd for ${taskId} got finished result.`);
+          await storePendingTaskFinished(this.ws, taskId);
           return;
         case TaskRunResultType.LongpollReturnedPending: {
+          await storeTaskProgress(this.ws, taskId);
           // Make sure that we are waiting a bit if long-polling returned too 
early.
           const endTime = AbsoluteTime.now();
           const taskDuration = AbsoluteTime.difference(endTime, startTime);
@@ -425,9 +434,9 @@ async function storePendingTaskError(
   ws: InternalWalletState,
   pendingTaskId: string,
   e: TalerErrorDetail,
-): Promise<void> {
+): Promise<OperationRetryRecord> {
   logger.info(`storing pending task error for ${pendingTaskId}`);
-  const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+  const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
     let retryRecord = await tx.operationRetries.get(pendingTaskId);
     if (!retryRecord) {
       retryRecord = {
@@ -440,11 +449,15 @@ async function storePendingTaskError(
       retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
     }
     await tx.operationRetries.put(retryRecord);
-    return taskToRetryNotification(ws, tx, pendingTaskId, e);
+    return {
+      notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
+      retryRecord,
+    };
   });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
+  if (res?.notification) {
+    ws.notify(res.notification);
   }
+  return res.retryRecord;
 }
 
 /**
@@ -462,8 +475,8 @@ async function storeTaskProgress(
 async function storePendingTaskPending(
   ws: InternalWalletState,
   pendingTaskId: string,
-): Promise<void> {
-  const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+): Promise<OperationRetryRecord> {
+  const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
     let retryRecord = await tx.operationRetries.get(pendingTaskId);
     let hadError = false;
     if (!retryRecord) {
@@ -479,15 +492,24 @@ async function storePendingTaskPending(
       retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
     }
     await tx.operationRetries.put(retryRecord);
+    let notification: WalletNotification | undefined = undefined;
     if (hadError) {
-      return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
-    } else {
-      return undefined;
+      notification = await taskToRetryNotification(
+        ws,
+        tx,
+        pendingTaskId,
+        undefined,
+      );
     }
+    return {
+      notification,
+      retryRecord,
+    };
   });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
+  if (res.notification) {
+    ws.notify(res.notification);
   }
+  return res.retryRecord;
 }
 
 async function storePendingTaskFinished(
@@ -502,33 +524,11 @@ async function storePendingTaskFinished(
 async function runTaskWithErrorReporting(
   ws: InternalWalletState,
   opId: TaskIdStr,
+  info: ShepherdInfo,
   f: () => Promise<TaskRunResult>,
 ): Promise<TaskRunResult> {
-  let maybeError: TalerErrorDetail | undefined;
   try {
-    const resp = await f();
-    switch (resp.type) {
-      case TaskRunResultType.Error:
-        await storePendingTaskError(ws, opId, resp.errorDetail);
-        return resp;
-      case TaskRunResultType.Finished:
-        await storePendingTaskFinished(ws, opId);
-        return resp;
-      case TaskRunResultType.Backoff:
-        await storePendingTaskPending(ws, opId);
-        return resp;
-      case TaskRunResultType.ScheduleLater:
-        // Task succeeded but wants to be run again.
-        await storeTaskProgress(ws, opId);
-        return resp;
-      case TaskRunResultType.Progress:
-        await storeTaskProgress(ws, opId);
-        return resp;
-      case TaskRunResultType.LongpollReturnedPending:
-        // Longpoll should be run again immediately.
-        await storeTaskProgress(ws, opId);
-        return resp;
-    }
+    return await f();
   } catch (e) {
     if (e instanceof CryptoApiStoppedError) {
       if (ws.stopped) {
@@ -543,46 +543,11 @@ async function runTaskWithErrorReporting(
         };
       }
     }
-    if (e instanceof TalerError) {
-      logger.warn("operation processed resulted in error");
-      logger.warn(`error was: ${j2s(e.errorDetail)}`);
-      maybeError = e.errorDetail;
-      await storePendingTaskError(ws, opId, maybeError!);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: e.errorDetail,
-      };
-    } else if (e instanceof Error) {
-      // This is a bug, as we expect pending operations to always
-      // do their own error handling and only throw 
WALLET_PENDING_OPERATION_FAILED
-      // or return something.
-      logger.error(`Uncaught exception: ${e.message}`);
-      logger.error(`Stack: ${e.stack}`);
-      maybeError = makeErrorDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        {
-          stack: e.stack,
-        },
-        `unexpected exception (message: ${e.message})`,
-      );
-      await storePendingTaskError(ws, opId, maybeError);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: maybeError,
-      };
-    } else {
-      logger.error("Uncaught exception, value is not even an error.");
-      maybeError = makeErrorDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        {},
-        `unexpected exception (not even an error)`,
-      );
-      await storePendingTaskError(ws, opId, maybeError);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: maybeError,
-      };
-    }
+    const errorDetail = getErrorDetailFromException(e);
+    return {
+      type: TaskRunResultType.Error,
+      errorDetail,
+    };
   }
 }
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]