gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 01/02: scheduler helper


From: gnunet
Subject: [libeufin] 01/02: scheduler helper
Date: Thu, 06 Apr 2023 18:34:29 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit 6974246a9944845fbed3fbaabdf8ce92cacf8294
Author: MS <ms@taler.net>
AuthorDate: Thu Apr 6 18:32:46 2023 +0200

    scheduler helper
---
 nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt  |  3 +-
 .../main/kotlin/tech/libeufin/nexus/Scheduling.kt  | 79 ++++++++++++----------
 nexus/src/test/kotlin/SchedulingTesting.kt         | 42 ++++++++++++
 3 files changed, 86 insertions(+), 38 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index f09a152b..ac74bd79 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -33,7 +33,6 @@ import com.github.ajalt.clikt.parameters.options.*
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.launch
-import kotlinx.coroutines.newSingleThreadContext
 import startServer
 import tech.libeufin.nexus.iso20022.parseCamtMessage
 import tech.libeufin.nexus.server.client
@@ -75,7 +74,7 @@ class Serve : CliktCommand("Run nexus HTTP server") {
     override fun run() {
         setLogLevel(logLevel)
         execThrowableOrTerminate { 
dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
-        CoroutineScope(Dispatchers.IO).launch(fallback) { 
startOperationScheduler(client) }
+        CoroutineScope(Dispatchers.IO).launch(fallback) { 
whileTrueOperationScheduler(client) }
         if (withUnixSocket != null) {
             startServer(
                 withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index 0dcea820..e23f5ffb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -34,6 +34,8 @@ import java.lang.IllegalArgumentException
 import java.time.Duration
 import java.time.Instant
 import java.time.ZonedDateTime
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
 import kotlin.system.exitProcess
 
 private data class TaskSchedule(
@@ -53,7 +55,6 @@ private suspend fun runTask(client: HttpClient, sched: 
TaskSchedule) {
                 when (sched.type) {
                     // Downloads and ingests the payment records from the bank.
                     "fetch" -> {
-                        @Suppress("BlockingMethodInNonBlockingContext")
                         val fetchSpec = 
jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
                         fetchBankAccountTransactions(client, fetchSpec, 
sched.resourceId)
                         /**
@@ -108,45 +109,51 @@ val fallback = CoroutineExceptionHandler { _, err ->
     logger.error(err.stackTraceToString())
     exitProcess(1)
 }
-suspend fun startOperationScheduler(httpClient: HttpClient) {
-    while (true) {
-        // First, assign next execution time stamps to all tasks that need them
-        transaction {
-            NexusScheduledTaskEntity.find {
-                NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
-            }.forEach {
-                val cron = try { NexusCron.parser.parse(it.taskCronspec) }
-                catch (e: IllegalArgumentException) {
-                    logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
-                    return@forEach
-                }
-                val zonedNow = ZonedDateTime.now()
-                val parsedCron = ExecutionTime.forCron(cron)
-                val next = parsedCron.nextExecution(zonedNow)
-                logger.info("scheduling task ${it.taskName} at $next (now is 
$zonedNow)")
-                it.nextScheduledExecutionSec = next.get().toEpochSecond()
+
+// Internal routine ultimately scheduling the tasks.
+private suspend fun operationScheduler(httpClient: HttpClient) {
+    // First, assign next execution time stamps to all tasks that need them
+    transaction {
+        NexusScheduledTaskEntity.find {
+            NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+        }.forEach {
+            val cron = try { NexusCron.parser.parse(it.taskCronspec) }
+            catch (e: IllegalArgumentException) {
+                logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
+                return@forEach
             }
+            val zonedNow = ZonedDateTime.now()
+            val parsedCron = ExecutionTime.forCron(cron)
+            val next = parsedCron.nextExecution(zonedNow)
+            logger.info("Scheduling task ${it.taskName} at $next (now is 
$zonedNow).")
+            it.nextScheduledExecutionSec = next.get().toEpochSecond()
         }
-        val nowSec = Instant.now().epochSecond
-        // Second, find tasks that are due
-        val dueTasks = transaction {
-            NexusScheduledTaskEntity.find {
-                NexusScheduledTasksTable.nextScheduledExecutionSec lessEq 
nowSec
-            }.map {
-                TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
-            }
-        } // Execute those due tasks and reset to null the next execution time.
-        dueTasks.forEach {
-            runTask(httpClient, it)
-            transaction {
-                val t = NexusScheduledTaskEntity.findById(it.taskId)
-                if (t != null) {
-                    // Reset next scheduled execution
-                    t.nextScheduledExecutionSec = null
-                    t.prevScheduledExecutionSec = nowSec
-                }
+    }
+    val nowSec = Instant.now().epochSecond
+    // Second, find tasks that are due
+    val dueTasks = transaction {
+        NexusScheduledTaskEntity.find {
+            NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec
+        }.map {
+            TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
+        }
+    } // Execute those due tasks and reset to null the next execution time.
+    dueTasks.forEach {
+        runTask(httpClient, it)
+        transaction {
+            val t = NexusScheduledTaskEntity.findById(it.taskId)
+            if (t != null) {
+                // Reset next scheduled execution
+                t.nextScheduledExecutionSec = null
+                t.prevScheduledExecutionSec = nowSec
             }
         }
+    }
+
+}
+suspend fun whileTrueOperationScheduler(httpClient: HttpClient) {
+    while (true) {
+        operationScheduler(httpClient)
         // Wait a bit
         delay(Duration.ofSeconds(1))
     }
diff --git a/nexus/src/test/kotlin/SchedulingTesting.kt 
b/nexus/src/test/kotlin/SchedulingTesting.kt
new file mode 100644
index 00000000..f1307b7a
--- /dev/null
+++ b/nexus/src/test/kotlin/SchedulingTesting.kt
@@ -0,0 +1,42 @@
+import io.ktor.client.*
+import io.ktor.server.testing.*
+import kotlinx.coroutines.*
+import org.junit.Ignore
+import org.junit.Test
+import tech.libeufin.nexus.whileTrueOperationScheduler
+import tech.libeufin.sandbox.sandboxApp
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
+class SchedulingTesting {
+    // Testing the 'sleep' technique of the scheduler, to watch with TOP(1)
+    @Ignore // Just an experimental piece.  No assertion takes place, nor its 
logic is used anywhere.
+    @Test
+    fun sleep1SecWithDelay() {
+        val sched = Executors.newScheduledThreadPool(1)
+        sched.scheduleAtFixedRate(
+            { println(".") },
+            1,
+            1,
+            TimeUnit.SECONDS
+        )
+        runBlocking {
+            launch { awaitCancellation() }
+        }
+    }
+    // Launching the scheduler to measure its perf with TOP(1)
+    @Test
+    fun normalOperation() {
+        withTestDatabase {
+            prepNexusDb()
+            prepSandboxDb()
+            testApplication {
+                application(sandboxApp)
+                whileTrueOperationScheduler(client)
+            }
+        }
+        runBlocking {
+            launch { awaitCancellation() }
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]