gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] branch master updated (de667818 -> f2710520)


From: gnunet
Subject: [libeufin] branch master updated (de667818 -> f2710520)
Date: Thu, 06 Apr 2023 18:34:28 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a change to branch master
in repository libeufin.

    from de667818 Logging: reducing verbosity.
     new 6974246a scheduler helper
     new f2710520 scheduler tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt  |  3 +-
 .../main/kotlin/tech/libeufin/nexus/Scheduling.kt  | 79 ++++++++++++----------
 nexus/src/test/kotlin/SchedulingTesting.kt         | 43 ++++++++++++
 3 files changed, 87 insertions(+), 38 deletions(-)
 create mode 100644 nexus/src/test/kotlin/SchedulingTesting.kt

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index f09a152b..ac74bd79 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -33,7 +33,6 @@ import com.github.ajalt.clikt.parameters.options.*
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.launch
-import kotlinx.coroutines.newSingleThreadContext
 import startServer
 import tech.libeufin.nexus.iso20022.parseCamtMessage
 import tech.libeufin.nexus.server.client
@@ -75,7 +74,7 @@ class Serve : CliktCommand("Run nexus HTTP server") {
     override fun run() {
         setLogLevel(logLevel)
         execThrowableOrTerminate { 
dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
-        CoroutineScope(Dispatchers.IO).launch(fallback) { 
startOperationScheduler(client) }
+        CoroutineScope(Dispatchers.IO).launch(fallback) { 
whileTrueOperationScheduler(client) }
         if (withUnixSocket != null) {
             startServer(
                 withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index 0dcea820..e23f5ffb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -34,6 +34,8 @@ import java.lang.IllegalArgumentException
 import java.time.Duration
 import java.time.Instant
 import java.time.ZonedDateTime
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
 import kotlin.system.exitProcess
 
 private data class TaskSchedule(
@@ -53,7 +55,6 @@ private suspend fun runTask(client: HttpClient, sched: 
TaskSchedule) {
                 when (sched.type) {
                     // Downloads and ingests the payment records from the bank.
                     "fetch" -> {
-                        @Suppress("BlockingMethodInNonBlockingContext")
                         val fetchSpec = 
jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
                         fetchBankAccountTransactions(client, fetchSpec, 
sched.resourceId)
                         /**
@@ -108,45 +109,51 @@ val fallback = CoroutineExceptionHandler { _, err ->
     logger.error(err.stackTraceToString())
     exitProcess(1)
 }
-suspend fun startOperationScheduler(httpClient: HttpClient) {
-    while (true) {
-        // First, assign next execution time stamps to all tasks that need them
-        transaction {
-            NexusScheduledTaskEntity.find {
-                NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
-            }.forEach {
-                val cron = try { NexusCron.parser.parse(it.taskCronspec) }
-                catch (e: IllegalArgumentException) {
-                    logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
-                    return@forEach
-                }
-                val zonedNow = ZonedDateTime.now()
-                val parsedCron = ExecutionTime.forCron(cron)
-                val next = parsedCron.nextExecution(zonedNow)
-                logger.info("scheduling task ${it.taskName} at $next (now is 
$zonedNow)")
-                it.nextScheduledExecutionSec = next.get().toEpochSecond()
+
+// Internal routine ultimately scheduling the tasks.
+private suspend fun operationScheduler(httpClient: HttpClient) {
+    // First, assign next execution time stamps to all tasks that need them
+    transaction {
+        NexusScheduledTaskEntity.find {
+            NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+        }.forEach {
+            val cron = try { NexusCron.parser.parse(it.taskCronspec) }
+            catch (e: IllegalArgumentException) {
+                logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
+                return@forEach
             }
+            val zonedNow = ZonedDateTime.now()
+            val parsedCron = ExecutionTime.forCron(cron)
+            val next = parsedCron.nextExecution(zonedNow)
+            logger.info("Scheduling task ${it.taskName} at $next (now is 
$zonedNow).")
+            it.nextScheduledExecutionSec = next.get().toEpochSecond()
         }
-        val nowSec = Instant.now().epochSecond
-        // Second, find tasks that are due
-        val dueTasks = transaction {
-            NexusScheduledTaskEntity.find {
-                NexusScheduledTasksTable.nextScheduledExecutionSec lessEq 
nowSec
-            }.map {
-                TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
-            }
-        } // Execute those due tasks and reset to null the next execution time.
-        dueTasks.forEach {
-            runTask(httpClient, it)
-            transaction {
-                val t = NexusScheduledTaskEntity.findById(it.taskId)
-                if (t != null) {
-                    // Reset next scheduled execution
-                    t.nextScheduledExecutionSec = null
-                    t.prevScheduledExecutionSec = nowSec
-                }
+    }
+    val nowSec = Instant.now().epochSecond
+    // Second, find tasks that are due
+    val dueTasks = transaction {
+        NexusScheduledTaskEntity.find {
+            NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec
+        }.map {
+            TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
+        }
+    } // Execute those due tasks and reset to null the next execution time.
+    dueTasks.forEach {
+        runTask(httpClient, it)
+        transaction {
+            val t = NexusScheduledTaskEntity.findById(it.taskId)
+            if (t != null) {
+                // Reset next scheduled execution
+                t.nextScheduledExecutionSec = null
+                t.prevScheduledExecutionSec = nowSec
             }
         }
+    }
+
+}
+suspend fun whileTrueOperationScheduler(httpClient: HttpClient) {
+    while (true) {
+        operationScheduler(httpClient)
         // Wait a bit
         delay(Duration.ofSeconds(1))
     }
diff --git a/nexus/src/test/kotlin/SchedulingTesting.kt 
b/nexus/src/test/kotlin/SchedulingTesting.kt
new file mode 100644
index 00000000..4f0cc605
--- /dev/null
+++ b/nexus/src/test/kotlin/SchedulingTesting.kt
@@ -0,0 +1,43 @@
+import io.ktor.client.*
+import io.ktor.server.testing.*
+import kotlinx.coroutines.*
+import org.junit.Ignore
+import org.junit.Test
+import tech.libeufin.nexus.whileTrueOperationScheduler
+import tech.libeufin.sandbox.sandboxApp
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
+class SchedulingTesting {
+    // Testing the 'sleep' technique of the scheduler, to watch with TOP(1)
+    @Ignore // Just an experimental piece.  No assertion takes place, nor its 
logic is used anywhere.
+    @Test
+    fun sleep1SecWithDelay() {
+        val sched = Executors.newScheduledThreadPool(1)
+        sched.scheduleAtFixedRate(
+            { println(".") },
+            1,
+            1,
+            TimeUnit.SECONDS
+        )
+        runBlocking {
+            launch { awaitCancellation() }
+        }
+    }
+    // Launching the scheduler to measure its perf with TOP(1)
+    @Ignore
+    @Test
+    fun normalOperation() {
+        withTestDatabase {
+            prepNexusDb()
+            prepSandboxDb()
+            testApplication {
+                application(sandboxApp)
+                whileTrueOperationScheduler(client)
+            }
+        }
+        runBlocking {
+            launch { awaitCancellation() }
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]