Skip to content

kap-resilience

Retry, resource safety, and protection patterns. All composable in the KAP chain.

implementation("io.github.damian-rafael-lattenero:kap-resilience:2.3.0")

Depends on: kap-core. Platforms: JVM, JS (IR), Linux X64, macOS (x64/ARM64), iOS (x64/ARM64/Simulator). Tests: 164 tests across 16 test classes, including Schedule laws and CircuitBreaker concurrency tests.


Schedule — Composable Retry Policies

suspend fun <T> retryWithBackoff(
    maxAttempts: Int,
    initialDelay: Long,
    maxDelay: Long,
    factor: Double,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    repeat(maxAttempts - 1) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            if (e is CancellationException) throw e
            if (e !is RuntimeException) throw e  // only retry RuntimeException?
        }
        delay(currentDelay)
        currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
    }
    return block() // last attempt, let it throw
}

val result = retryWithBackoff(5, 10, 5000, 2.0) { flakyService() }
// Want jitter? Rewrite. Want max duration? Rewrite. Want to compose two policies? Rewrite.
val result = Schedule.recurs<Throwable>(5)
    .and(Schedule.exponential(10.milliseconds))
    .retry { flakyService() }
val policy = Schedule.times<Throwable>(5) and
    Schedule.exponential(10.milliseconds) and
    Schedule.doWhile<Throwable> { it is RuntimeException }

var attempts = 0
suspend fun flakyService(): String {
    attempts++
    if (attempts <= 2) throw RuntimeException("flake #$attempts")
    return "success on attempt $attempts"
}

val result = Async {
    Kap { flakyService() }.retry(policy)
}
// "success on attempt 3"

Building Blocks

Schedule Behavior Example
times(n) Retry up to N times Schedule.times<Throwable>(5)
spaced(d) Fixed delay between retries Schedule.spaced(1.seconds)
exponential(base, max) Exponential backoff Schedule.exponential(10.milliseconds, maxDelay = 5.seconds)
fibonacci(base) Fibonacci-sequence delays Schedule.fibonacci(10.milliseconds)
linear(base) Linearly increasing delays Schedule.linear(100.milliseconds)
forever() Retry indefinitely Schedule.forever()

Modifiers

.jittered() — Prevent thundering herd

// Without jitter: 100 clients all retry at exactly 1s, 2s, 4s — thundering herd
// With jitter: each client retries at a random time within the window
val policy = Schedule.times<Throwable>(5) and
    Schedule.exponential(100.milliseconds).jittered()

.withMaxDuration(d) — Total time cap

// Retry for at most 30 seconds total, regardless of attempt count
val policy = Schedule.forever<Throwable>() and
    Schedule.exponential(100.milliseconds) and
    Schedule.withMaxDuration(30.seconds)

.doWhile { } / .doUntil { } — Conditional retry

// Only retry on transient errors
val policy = Schedule.times<Throwable>(10) and
    Schedule.doWhile<Throwable> { it is IOException || it is TimeoutException }

// Retry until we get a non-empty response
val untilReady = Schedule.spaced<String>(1.seconds) and
    Schedule.doUntil<String> { it.isNotEmpty() }

Composition

// Both must agree to continue (intersection):
val strict = Schedule.times<Throwable>(3) and Schedule.exponential(100.milliseconds)

// Either can continue (union):
val lenient = Schedule.times<Throwable>(3) or Schedule.spaced(1.seconds)

Retry Variants

.retryOrElse(schedule, fallback) — Fallback after exhaustion

val result = Async {
    Kap { flakyService() }
        .retryOrElse(
            Schedule.times(2) and Schedule.spaced(100.milliseconds)
        ) { "fallback-after-exhaustion" }
}

.retryWithResult(schedule) — Returns full context

val retryResult = Async {
    Kap { flakyService() }.retryWithResult(
        Schedule.times<Throwable>(5) and Schedule.exponential(10.milliseconds)
    )
}
println(retryResult.value)       // "success"
println(retryResult.attempts)    // 3
println(retryResult.totalDelay)  // 70ms

CircuitBreaker

// Manual state machine — 50+ lines
class ManualCircuitBreaker(
    private val maxFailures: Int,
    private val resetTimeout: Duration,
) {
    private val mutex = Mutex()
    private var failures = 0
    private var state: State = State.Closed
    private var lastFailure: Long = 0

    enum class State { Closed, Open, HalfOpen }

    suspend fun <T> execute(block: suspend () -> T): T {
        mutex.withLock {
            when (state) {
                State.Open -> {
                    if (System.currentTimeMillis() - lastFailure > resetTimeout.inWholeMilliseconds) {
                        state = State.HalfOpen
                    } else {
                        throw RuntimeException("Circuit breaker is open")
                    }
                }
                else -> { }
            }
        }
        return try {
            val result = block()
            mutex.withLock { failures = 0; state = State.Closed }
            result
        } catch (e: Exception) {
            mutex.withLock {
                failures++
                lastFailure = System.currentTimeMillis()
                if (failures >= maxFailures) state = State.Open
            }
            throw e
        }
    }
}
val breaker = CircuitBreaker(
    maxFailures = 5,
    resetTimeout = 30.seconds,
    onStateChange = { old, new -> println("CircuitBreaker: $old -> $new") }
)

val result = Async {
    Kap { fetchUser() }
        .withCircuitBreaker(breaker)
}
// While Open: fails immediately with CircuitBreakerOpenException
// After resetTimeout: tries one request (HalfOpen)
// If it succeeds: back to Closed

Full composition

val result = Async {
    Kap { fetchUser() }
        .timeout(500.milliseconds)                    // hard timeout
        .withCircuitBreaker(breaker)                  // circuit breaker
        .retry(Schedule.times<Throwable>(3)           // retry with backoff
            and Schedule.exponential(10.milliseconds))
        .recover { "cached-user" }                    // fallback on exhaustion
}
// timeout -> circuit breaker -> retry -> recover. All composable.

timeoutRace — Parallel Fallback

// Sequential: waste time waiting for timeout before starting fallback
val result = try {
    withTimeout(100) { fetchFromPrimary() }
} catch (e: TimeoutCancellationException) {
    fetchFromFallback()  // starts AFTER 100ms timeout
}
// Total: 100ms (wasted) + fallback time
val result = Async {
    Kap { fetchFromPrimary() }
        .timeoutRace(100.milliseconds, Kap { fetchFromFallback() })
}
// Both start at t=0. Fallback wins at ~30ms. Primary cancelled.
Sequential timeout:
t=0ms    ─── primary starts ───
t=100ms  ─── timeout fires ───
t=100ms  ─── fallback starts ───     ← 100ms wasted
t=130ms  ─── fallback completes ───

timeoutRace:
t=0ms    ─── primary starts ───┐
t=0ms    ─── fallback starts ──┘     ← both at t=0
t=30ms   ─── fallback wins ───       ← 3x faster

JMH verified: 34.0ms vs sequential 87.2ms — 2.6x faster.


raceQuorum — N-of-M Successes

// Manual select + counting — fragile, hard to get right
val results = mutableListOf<String>()
val required = 2
coroutineScope {
    val jobs = listOf(
        async { fetchReplicaA() },
        async { fetchReplicaB() },
        async { fetchReplicaC() },
    )
    val channel = Channel<String>(3)
    jobs.forEach { job ->
        launch { try { channel.send(job.await()) } catch (_: Exception) { } }
    }
    repeat(required) { results.add(channel.receive()) }
    jobs.forEach { it.cancel() }  // cancel the rest
}
val quorum: List<String> = Async {
    raceQuorum(
        required = 2,
        Kap { fetchReplicaA() },  // 50ms
        Kap { fetchReplicaB() },  // 20ms
        Kap { fetchReplicaC() },  // 80ms
    )
}
// [replica-B, replica-A] — the 2 fastest. C cancelled.

Supports arities 2-22.


Resource Safety

bracket — Guaranteed cleanup

// Nested try/finally — gets ugly fast with multiple resources
val db = openDbConnection()
try {
    val cache = openCacheConnection()
    try {
        val http = openHttpClient()
        try {
            // use all three... but sequentially acquired
            val dbResult = db.query("SELECT 1")
            val cacheResult = cache.get("key")
            val httpResult = http.get("/api")
            "$dbResult|$cacheResult|$httpResult"
        } finally {
            http.close()
        }
    } finally {
        cache.close()
    }
} finally {
    db.close()
}
val result = Async {
    kap { db: String, cache: String, api: String -> "$db|$cache|$api" }
        .with(bracket(
            acquire = { openDbConnection() },
            use = { conn -> Kap { conn.query("SELECT 1") } },
            release = { conn -> conn.close() },
        ))
        .with(bracket(
            acquire = { openCacheConnection() },
            use = { conn -> Kap { conn.get("key") } },
            release = { conn -> conn.close() },
        ))
        .with(bracket(
            acquire = { openHttpClient() },
            use = { client -> Kap { client.get("/api") } },
            release = { client -> client.close() },
        ))
}
// All 3 acquired, used in PARALLEL, ALL released even on failure.
// Release runs in NonCancellable context — guaranteed.

bracketCase — Release depends on outcome

val result = Async {
    bracketCase(
        acquire = { openDbConnection() },
        use = { tx -> Kap { tx.query("INSERT 1") } },
        release = { tx, case ->
            when (case) {
                is ExitCase.Completed<*> -> { println("commit"); tx.commit() }
                is ExitCase.Failed -> { println("rollback"); tx.rollback() }
                is ExitCase.Cancelled -> { println("rollback (cancelled)"); tx.rollback() }
            }
            tx.close()
        },
    )
}

Resource — Composable resource

// Manual acquisition + cleanup ordering
val db = openDbConnection()
val cache = openCacheConnection()
val http = openHttpClient()
try {
    // use them...
} finally {
    http.close()   // reverse order? you have to remember
    cache.close()
    db.close()
}
val db = Resource({ openDbConnection() }, { it.close() })
val cache = Resource({ openCacheConnection() }, { it.close() })
val http = Resource({ openHttpClient() }, { it.close() })

val infra = Resource.zip(db, cache, http) { d, c, h -> Triple(d, c, h) }

val result = Async {
    infra.useKap { (db, cache, http) ->
        kap(::DashboardData)
            .with { db.query("SELECT 1") }
            .with { cache.get("user:prefs") }
            .with { http.get("/recommendations") }
    }
}
// All acquired, used in parallel, released in reverse order. Guaranteed.

Resource.zip supports arities 2-22.

guarantee / guaranteeCase

// guarantee: finalizer always runs, regardless of success or failure
val result = Async {
    guarantee(
        fa = { riskyOperation() },
        finalizer = { cleanup() },
    )
}

// guaranteeCase: finalizer receives the exit case
val result2 = Async {
    guaranteeCase(
        fa = { riskyOperation() },
        finalizer = { case ->
            when (case) {
                is ExitCase.Completed<*> -> println("success cleanup")
                is ExitCase.Failed -> println("failure cleanup: ${case.error}")
                is ExitCase.Cancelled -> println("cancellation cleanup")
            }
        },
    )
}

Full Production Pipeline

All features composed in one chain:

val breaker = CircuitBreaker(maxFailures = 5, resetTimeout = 30.seconds)

val result = Async {
    Kap { fetchData() }
        .timeout(2.seconds)                              // hard timeout
        .withCircuitBreaker(breaker)                     // circuit breaker
        .retry(Schedule.times<Throwable>(3)              // retry with backoff + jitter
            and Schedule.exponential(50.milliseconds)
            .jittered()
            .withMaxDuration(10.seconds))
        .recover { cachedData() }                        // fallback on exhaustion
}