[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] branch master updated (de667818 -> f2710520)
From: |
gnunet |
Subject: |
[libeufin] branch master updated (de667818 -> f2710520) |
Date: |
Thu, 06 Apr 2023 18:34:28 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a change to branch master
in repository libeufin.
from de667818 Logging: reducing verbosity.
new 6974246a scheduler helper
new f2710520 scheduler tests
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 3 +-
.../main/kotlin/tech/libeufin/nexus/Scheduling.kt | 79 ++++++++++++----------
nexus/src/test/kotlin/SchedulingTesting.kt | 43 ++++++++++++
3 files changed, 87 insertions(+), 38 deletions(-)
create mode 100644 nexus/src/test/kotlin/SchedulingTesting.kt
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index f09a152b..ac74bd79 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -33,7 +33,6 @@ import com.github.ajalt.clikt.parameters.options.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
-import kotlinx.coroutines.newSingleThreadContext
import startServer
import tech.libeufin.nexus.iso20022.parseCamtMessage
import tech.libeufin.nexus.server.client
@@ -75,7 +74,7 @@ class Serve : CliktCommand("Run nexus HTTP server") {
override fun run() {
setLogLevel(logLevel)
execThrowableOrTerminate {
dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
- CoroutineScope(Dispatchers.IO).launch(fallback) {
startOperationScheduler(client) }
+ CoroutineScope(Dispatchers.IO).launch(fallback) {
whileTrueOperationScheduler(client) }
if (withUnixSocket != null) {
startServer(
withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index 0dcea820..e23f5ffb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -34,6 +34,8 @@ import java.lang.IllegalArgumentException
import java.time.Duration
import java.time.Instant
import java.time.ZonedDateTime
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
private data class TaskSchedule(
@@ -53,7 +55,6 @@ private suspend fun runTask(client: HttpClient, sched:
TaskSchedule) {
when (sched.type) {
// Downloads and ingests the payment records from the bank.
"fetch" -> {
- @Suppress("BlockingMethodInNonBlockingContext")
val fetchSpec =
jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
fetchBankAccountTransactions(client, fetchSpec,
sched.resourceId)
/**
@@ -108,45 +109,51 @@ val fallback = CoroutineExceptionHandler { _, err ->
logger.error(err.stackTraceToString())
exitProcess(1)
}
-suspend fun startOperationScheduler(httpClient: HttpClient) {
- while (true) {
- // First, assign next execution time stamps to all tasks that need them
- transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
- }.forEach {
- val cron = try { NexusCron.parser.parse(it.taskCronspec) }
- catch (e: IllegalArgumentException) {
- logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
- return@forEach
- }
- val zonedNow = ZonedDateTime.now()
- val parsedCron = ExecutionTime.forCron(cron)
- val next = parsedCron.nextExecution(zonedNow)
- logger.info("scheduling task ${it.taskName} at $next (now is
$zonedNow)")
- it.nextScheduledExecutionSec = next.get().toEpochSecond()
+
+// Internal routine ultimately scheduling the tasks.
+private suspend fun operationScheduler(httpClient: HttpClient) {
+ // First, assign next execution time stamps to all tasks that need them
+ transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+ }.forEach {
+ val cron = try { NexusCron.parser.parse(it.taskCronspec) }
+ catch (e: IllegalArgumentException) {
+ logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
+ return@forEach
}
+ val zonedNow = ZonedDateTime.now()
+ val parsedCron = ExecutionTime.forCron(cron)
+ val next = parsedCron.nextExecution(zonedNow)
+ logger.info("Scheduling task ${it.taskName} at $next (now is
$zonedNow).")
+ it.nextScheduledExecutionSec = next.get().toEpochSecond()
}
- val nowSec = Instant.now().epochSecond
- // Second, find tasks that are due
- val dueTasks = transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec lessEq
nowSec
- }.map {
- TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
- }
- } // Execute those due tasks and reset to null the next execution time.
- dueTasks.forEach {
- runTask(httpClient, it)
- transaction {
- val t = NexusScheduledTaskEntity.findById(it.taskId)
- if (t != null) {
- // Reset next scheduled execution
- t.nextScheduledExecutionSec = null
- t.prevScheduledExecutionSec = nowSec
- }
+ }
+ val nowSec = Instant.now().epochSecond
+ // Second, find tasks that are due
+ val dueTasks = transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec
+ }.map {
+ TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
+ }
+ } // Execute those due tasks and reset to null the next execution time.
+ dueTasks.forEach {
+ runTask(httpClient, it)
+ transaction {
+ val t = NexusScheduledTaskEntity.findById(it.taskId)
+ if (t != null) {
+ // Reset next scheduled execution
+ t.nextScheduledExecutionSec = null
+ t.prevScheduledExecutionSec = nowSec
}
}
+ }
+
+}
+suspend fun whileTrueOperationScheduler(httpClient: HttpClient) {
+ while (true) {
+ operationScheduler(httpClient)
// Wait a bit
delay(Duration.ofSeconds(1))
}
diff --git a/nexus/src/test/kotlin/SchedulingTesting.kt
b/nexus/src/test/kotlin/SchedulingTesting.kt
new file mode 100644
index 00000000..4f0cc605
--- /dev/null
+++ b/nexus/src/test/kotlin/SchedulingTesting.kt
@@ -0,0 +1,43 @@
+import io.ktor.client.*
+import io.ktor.server.testing.*
+import kotlinx.coroutines.*
+import org.junit.Ignore
+import org.junit.Test
+import tech.libeufin.nexus.whileTrueOperationScheduler
+import tech.libeufin.sandbox.sandboxApp
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
+class SchedulingTesting {
+ // Testing the 'sleep' technique of the scheduler, to watch with TOP(1)
+ @Ignore // Just an experimental piece. No assertion takes place, nor its
logic is used anywhere.
+ @Test
+ fun sleep1SecWithDelay() {
+ val sched = Executors.newScheduledThreadPool(1)
+ sched.scheduleAtFixedRate(
+ { println(".") },
+ 1,
+ 1,
+ TimeUnit.SECONDS
+ )
+ runBlocking {
+ launch { awaitCancellation() }
+ }
+ }
+ // Launching the scheduler to measure its perf with TOP(1)
+ @Ignore
+ @Test
+ fun normalOperation() {
+ withTestDatabase {
+ prepNexusDb()
+ prepSandboxDb()
+ testApplication {
+ application(sandboxApp)
+ whileTrueOperationScheduler(client)
+ }
+ }
+ runBlocking {
+ launch { awaitCancellation() }
+ }
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [libeufin] branch master updated (de667818 -> f2710520),
gnunet <=