[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[taler-wallet-core] branch master updated: introduce locking to avoid ce
From: |
gnunet |
Subject: |
[taler-wallet-core] branch master updated: introduce locking to avoid certain simultaneous requests to the exchange |
Date: |
Tue, 18 Aug 2020 14:53:15 +0200 |
This is an automated email from the git hooks/post-receive script.
dold pushed a commit to branch master
in repository wallet-core.
The following commit(s) were added to refs/heads/master by this push:
new e2f7bc79 introduce locking to avoid certain simultaneous requests to
the exchange
e2f7bc79 is described below
commit e2f7bc79cd4326f769f370f230041101a099d98c
Author: Florian Dold <florian.dold@gmail.com>
AuthorDate: Tue Aug 18 18:23:06 2020 +0530
introduce locking to avoid certain simultaneous requests to the exchange
---
packages/taler-wallet-core/src/operations/pay.ts | 6 ++-
.../taler-wallet-core/src/operations/refresh.ts | 34 ++++++++------
packages/taler-wallet-core/src/operations/state.ts | 53 +++++++++++++++++++++-
3 files changed, 77 insertions(+), 16 deletions(-)
diff --git a/packages/taler-wallet-core/src/operations/pay.ts
b/packages/taler-wallet-core/src/operations/pay.ts
index c3dd6c6d..996e1c1e 100644
--- a/packages/taler-wallet-core/src/operations/pay.ts
+++ b/packages/taler-wallet-core/src/operations/pay.ts
@@ -57,7 +57,7 @@ import { Logger } from "../util/logging";
import { parsePayUri } from "../util/taleruri";
import { guardOperationException, OperationFailedError } from "./errors";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh";
-import { InternalWalletState } from "./state";
+import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
import { getTimestampNow, timestampAddDuration } from "../util/time";
import { strcmp, canonicalJson } from "../util/helpers";
import { readSuccessResponseJsonOrThrow } from "../util/http";
@@ -796,7 +796,9 @@ export async function submitPay(
logger.trace("making pay request", JSON.stringify(reqBody, undefined, 2));
- const resp = await ws.http.postJson(payUrl, reqBody);
+ const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
+ ws.http.postJson(payUrl, reqBody),
+ );
const merchantResp = await readSuccessResponseJsonOrThrow(
resp,
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts
b/packages/taler-wallet-core/src/operations/refresh.ts
index 52325281..409ae58c 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -29,7 +29,7 @@ import {
} from "../types/dbTypes";
import { amountToPretty } from "../util/helpers";
import { TransactionHandle } from "../util/query";
-import { InternalWalletState } from "./state";
+import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
import { Logger } from "../util/logging";
import { getWithdrawDenomList } from "./withdraw";
import { updateExchangeFromUrl } from "./exchanges";
@@ -43,7 +43,7 @@ import { guardOperationException } from "./errors";
import { NotificationType } from "../types/notifications";
import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
import { getTimestampNow } from "../util/time";
-import { readSuccessResponseJsonOrThrow } from "../util/http";
+import { readSuccessResponseJsonOrThrow, HttpResponse } from "../util/http";
import {
codecForExchangeMeltResponse,
codecForExchangeRevealResponse,
@@ -248,7 +248,14 @@ async function refreshMelt(
value_with_fee: Amounts.stringify(refreshSession.amountRefreshInput),
};
logger.trace(`melt request for coin:`, meltReq);
- const resp = await ws.http.postJson(reqUrl.href, meltReq);
+
+ const resp = await ws.runSequentialized(
+ [EXCHANGE_COINS_LOCK],
+ async () => {
+ return await ws.http.postJson(reqUrl.href, meltReq);
+ },
+ );
+
const meltResponse = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangeMeltResponse(),
@@ -339,7 +346,13 @@ async function refreshReveal(
refreshSession.exchangeBaseUrl,
);
- const resp = await ws.http.postJson(reqUrl.href, req);
+ const resp = await ws.runSequentialized(
+ [EXCHANGE_COINS_LOCK],
+ async () => {
+ return await ws.http.postJson(reqUrl.href, req);
+ },
+ );
+
const reveal = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangeRevealResponse(),
@@ -446,6 +459,9 @@ async function incrementRefreshRetry(
}
}
+/**
+ * Actually process a refresh group that has been created.
+ */
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
@@ -557,15 +573,7 @@ export async function createRefreshGroup(
await tx.put(Stores.refreshGroups, refreshGroup);
- const processAsync = async (): Promise<void> => {
- try {
- await processRefreshGroup(ws, refreshGroupId);
- } catch (e) {
- logger.trace(`Error during refresh: ${e}`);
- }
- };
-
- processAsync();
+ logger.trace(`created refresh group ${refreshGroupId}`);
return {
refreshGroupId,
diff --git a/packages/taler-wallet-core/src/operations/state.ts
b/packages/taler-wallet-core/src/operations/state.ts
index cfec85d0..582dd92d 100644
--- a/packages/taler-wallet-core/src/operations/state.ts
+++ b/packages/taler-wallet-core/src/operations/state.ts
@@ -22,11 +22,15 @@ import { Logger } from "../util/logging";
import { PendingOperationsResponse } from "../types/pending";
import { WalletNotification } from "../types/notifications";
import { Database } from "../util/query";
+import { openPromise, OpenedPromise } from "../util/promiseUtils";
type NotificationListener = (n: WalletNotification) => void;
const logger = new Logger("state.ts");
+export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
+export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
+
export class InternalWalletState {
cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
@@ -41,6 +45,16 @@ export class InternalWalletState {
listeners: NotificationListener[] = [];
+ /**
+ * Promises that are waiting for a particular resource.
+ */
+ private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
+
+ /**
+ * Resources that are currently locked.
+ */
+ private resourceLocks: Set<string> = new Set();
+
constructor(
public db: Database,
public http: HttpRequestLibrary,
@@ -49,7 +63,7 @@ export class InternalWalletState {
this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
}
- public notify(n: WalletNotification): void {
+ notify(n: WalletNotification): void {
logger.trace("Notification", n);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
@@ -62,4 +76,41 @@ export class InternalWalletState {
addNotificationListener(f: (n: WalletNotification) => void): void {
this.listeners.push(f);
}
+
+ /**
+ * Run an async function after acquiring a list of locks, identified
+ * by string tokens.
+ */
+ async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
+ // Make sure locks are always acquired in the same order
+ tokens = [... tokens].sort();
+
+ for (const token of tokens) {
+ if (this.resourceLocks.has(token)) {
+ const p = openPromise<void>();
+ let waitList = this.resourceWaiters[token];
+ if (!waitList) {
+ waitList = this.resourceWaiters[token] = [];
+ }
+ waitList.push(p);
+ await p.promise;
+ }
+ this.resourceLocks.add(token);
+ }
+
+ try {
+ logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
+ const result = await f();
+ logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
+ return result;
+ } finally {
+ for (const token of tokens) {
+ this.resourceLocks.delete(token);
+ let waiter = (this.resourceWaiters[token] ?? []).shift();
+ if (waiter) {
+ waiter.resolve();
+ }
+ }
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [taler-wallet-core] branch master updated: introduce locking to avoid certain simultaneous requests to the exchange,
gnunet <=