[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] 01/02: scheduler helper
From: |
gnunet |
Subject: |
[libeufin] 01/02: scheduler helper |
Date: |
Thu, 06 Apr 2023 18:34:29 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
commit 6974246a9944845fbed3fbaabdf8ce92cacf8294
Author: MS <ms@taler.net>
AuthorDate: Thu Apr 6 18:32:46 2023 +0200
scheduler helper
---
nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 3 +-
.../main/kotlin/tech/libeufin/nexus/Scheduling.kt | 79 ++++++++++++----------
nexus/src/test/kotlin/SchedulingTesting.kt | 42 ++++++++++++
3 files changed, 86 insertions(+), 38 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index f09a152b..ac74bd79 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -33,7 +33,6 @@ import com.github.ajalt.clikt.parameters.options.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
-import kotlinx.coroutines.newSingleThreadContext
import startServer
import tech.libeufin.nexus.iso20022.parseCamtMessage
import tech.libeufin.nexus.server.client
@@ -75,7 +74,7 @@ class Serve : CliktCommand("Run nexus HTTP server") {
override fun run() {
setLogLevel(logLevel)
execThrowableOrTerminate {
dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
- CoroutineScope(Dispatchers.IO).launch(fallback) {
startOperationScheduler(client) }
+ CoroutineScope(Dispatchers.IO).launch(fallback) {
whileTrueOperationScheduler(client) }
if (withUnixSocket != null) {
startServer(
withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index 0dcea820..e23f5ffb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -34,6 +34,8 @@ import java.lang.IllegalArgumentException
import java.time.Duration
import java.time.Instant
import java.time.ZonedDateTime
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
private data class TaskSchedule(
@@ -53,7 +55,6 @@ private suspend fun runTask(client: HttpClient, sched:
TaskSchedule) {
when (sched.type) {
// Downloads and ingests the payment records from the bank.
"fetch" -> {
- @Suppress("BlockingMethodInNonBlockingContext")
val fetchSpec =
jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
fetchBankAccountTransactions(client, fetchSpec,
sched.resourceId)
/**
@@ -108,45 +109,51 @@ val fallback = CoroutineExceptionHandler { _, err ->
logger.error(err.stackTraceToString())
exitProcess(1)
}
-suspend fun startOperationScheduler(httpClient: HttpClient) {
- while (true) {
- // First, assign next execution time stamps to all tasks that need them
- transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
- }.forEach {
- val cron = try { NexusCron.parser.parse(it.taskCronspec) }
- catch (e: IllegalArgumentException) {
- logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
- return@forEach
- }
- val zonedNow = ZonedDateTime.now()
- val parsedCron = ExecutionTime.forCron(cron)
- val next = parsedCron.nextExecution(zonedNow)
- logger.info("scheduling task ${it.taskName} at $next (now is
$zonedNow)")
- it.nextScheduledExecutionSec = next.get().toEpochSecond()
+
+// Internal routine ultimately scheduling the tasks.
+private suspend fun operationScheduler(httpClient: HttpClient) {
+ // First, assign next execution time stamps to all tasks that need them
+ transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+ }.forEach {
+ val cron = try { NexusCron.parser.parse(it.taskCronspec) }
+ catch (e: IllegalArgumentException) {
+ logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
+ return@forEach
}
+ val zonedNow = ZonedDateTime.now()
+ val parsedCron = ExecutionTime.forCron(cron)
+ val next = parsedCron.nextExecution(zonedNow)
+ logger.info("Scheduling task ${it.taskName} at $next (now is
$zonedNow).")
+ it.nextScheduledExecutionSec = next.get().toEpochSecond()
}
- val nowSec = Instant.now().epochSecond
- // Second, find tasks that are due
- val dueTasks = transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec lessEq
nowSec
- }.map {
- TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
- }
- } // Execute those due tasks and reset to null the next execution time.
- dueTasks.forEach {
- runTask(httpClient, it)
- transaction {
- val t = NexusScheduledTaskEntity.findById(it.taskId)
- if (t != null) {
- // Reset next scheduled execution
- t.nextScheduledExecutionSec = null
- t.prevScheduledExecutionSec = nowSec
- }
+ }
+ val nowSec = Instant.now().epochSecond
+ // Second, find tasks that are due
+ val dueTasks = transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec
+ }.map {
+ TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
+ }
+ } // Execute those due tasks and reset to null the next execution time.
+ dueTasks.forEach {
+ runTask(httpClient, it)
+ transaction {
+ val t = NexusScheduledTaskEntity.findById(it.taskId)
+ if (t != null) {
+ // Reset next scheduled execution
+ t.nextScheduledExecutionSec = null
+ t.prevScheduledExecutionSec = nowSec
}
}
+ }
+
+}
+suspend fun whileTrueOperationScheduler(httpClient: HttpClient) {
+ while (true) {
+ operationScheduler(httpClient)
// Wait a bit
delay(Duration.ofSeconds(1))
}
diff --git a/nexus/src/test/kotlin/SchedulingTesting.kt
b/nexus/src/test/kotlin/SchedulingTesting.kt
new file mode 100644
index 00000000..f1307b7a
--- /dev/null
+++ b/nexus/src/test/kotlin/SchedulingTesting.kt
@@ -0,0 +1,42 @@
+import io.ktor.client.*
+import io.ktor.server.testing.*
+import kotlinx.coroutines.*
+import org.junit.Ignore
+import org.junit.Test
+import tech.libeufin.nexus.whileTrueOperationScheduler
+import tech.libeufin.sandbox.sandboxApp
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
+class SchedulingTesting {
+ // Testing the 'sleep' technique of the scheduler, to watch with TOP(1)
+ @Ignore // Just an experimental piece. No assertion takes place, nor its
logic is used anywhere.
+ @Test
+ fun sleep1SecWithDelay() {
+ val sched = Executors.newScheduledThreadPool(1)
+ sched.scheduleAtFixedRate(
+ { println(".") },
+ 1,
+ 1,
+ TimeUnit.SECONDS
+ )
+ runBlocking {
+ launch { awaitCancellation() }
+ }
+ }
+ // Launching the scheduler to measure its perf with TOP(1)
+ @Test
+ fun normalOperation() {
+ withTestDatabase {
+ prepNexusDb()
+ prepSandboxDb()
+ testApplication {
+ application(sandboxApp)
+ whileTrueOperationScheduler(client)
+ }
+ }
+ runBlocking {
+ launch { awaitCancellation() }
+ }
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.