kap-core¶
The foundation module. Type-safe parallel orchestration with visible phases.
Depends on: kotlinx-coroutines-core only.
Platforms: JVM, JS (IR), Linux X64, macOS (x64/ARM64), iOS (x64/ARM64/Simulator).
Tests: 438 tests across 33 test classes, including property-based algebraic law verification.
What kap-core solves¶
You have multiple async calls. Some parallel, some sequential. kap-core gives you .with for independent tasks, .then for barriers, and .andThen for dependent phases. The code shape becomes the execution plan.
With @KapTypeSafe (via the kap-ksp module), you get named builder methods generated from your data class properties — .withUser {}, .thenStock {}, etc. — making chains self-documenting while retaining full compile-time type safety. The generic .with {} / .then {} API shown throughout this page is the underlying core API; named builders are the recommended user-facing pattern built on top of it.
Level 1 — Learn First¶
.with — Independent tasks in parallel¶
@KapTypeSafe
data class Dashboard(val user: String, val cart: String, val promos: String)
val dashboard: Dashboard = kap(::Dashboard)
.withUser { fetchUser() } // ┐ all three start at t=0
.withCart { fetchCart() } // │ total time = max(individual)
.withPromos { fetchPromos() } // ┘ swap any two? COMPILE ERROR
.evalGraph()
@KapTypeSafe generates .withUser {}, .withCart {}, .withPromos {} from the data class properties. The generic .with {} API is equivalent but positional — named builders enforce the correct parameter at each step.
.then — Phase barrier¶
.then creates an explicit synchronization point. Everything above must complete before anything below starts.
.andThen — Dependent phase¶
// Phase 1
val ctx = coroutineScope {
val dProfile = async { fetchProfile(userId) }
val dPrefs = async { fetchPreferences(userId) }
val dTier = async { fetchLoyaltyTier(userId) }
UserContext(dProfile.await(), dPrefs.await(), dTier.await())
}
// Phase 2 — needs ctx
val enriched = coroutineScope {
val dRecs = async { fetchRecommendations(ctx.profile) }
val dPromos = async { fetchPromotions(ctx.tier) }
val dTrending = async { fetchTrending(ctx.prefs) }
val dHistory = async { fetchHistory(ctx.profile) }
EnrichedContent(dRecs.await(), dPromos.await(), dTrending.await(), dHistory.await())
}
// Phase 3 — needs both
val dashboard = coroutineScope {
val dLayout = async { renderLayout(ctx, enriched) }
val dTrack = async { trackAnalytics(ctx, enriched) }
FinalDashboard(dLayout.await(), dTrack.await())
}
// With @KapTypeSafe on UserContext, EnrichedContent, and FinalDashboard
val dashboard: FinalDashboard = kap(::UserContext)
.withProfile { fetchProfile(userId) } // ┐
.withPreferences { fetchPreferences(userId) } // ├─ phase 1
.withLoyaltyTier { fetchLoyaltyTier(userId) } // ┘
.andThen { ctx -> // ── barrier: ctx available
kap(::EnrichedContent)
.withRecommendations { fetchRecommendations(ctx.profile) } // ┐
.withPromotions { fetchPromotions(ctx.tier) } // ├─ phase 2
.withTrending { fetchTrending(ctx.prefs) } // │
.withHistory { fetchHistory(ctx.profile) } // ┘
.andThen { enriched -> // ── barrier
kap(::FinalDashboard)
.withLayout { renderLayout(ctx, enriched) } // ┐ phase 3
.withAnalytics { trackAnalytics(ctx, enriched) } // ┘
}
}
.evalGraph()
val dashboard: FinalDashboard = kap(::UserContext)
.with { fetchProfile(userId) } // ┐
.with { fetchPreferences(userId) } // ├─ phase 1
.with { fetchLoyaltyTier(userId) } // ┘
.andThen { ctx -> // ── barrier: ctx available
kap(::EnrichedContent)
.with { fetchRecommendations(ctx.profile) } // ┐
.with { fetchPromotions(ctx.tier) } // ├─ phase 2
.with { fetchTrending(ctx.prefs) } // │
.with { fetchHistory(ctx.profile) } // ┘
.andThen { enriched -> // ── barrier
kap(::FinalDashboard)
.with { renderLayout(ctx, enriched) } // ┐ phase 3
.with { trackAnalytics(ctx, enriched) } // ┘
}
}
.evalGraph()
24 lines of nested coroutineScope/async/await vs 14 lines of flat chain. The dependency graph is the code shape.
Level 2 — Common Patterns¶
Composition styles¶
zip and combine support arities 2-22.
Partial failure¶
Use this when: one service can fail but you still want the rest.
// supervisorScope is manual and error-prone
val result = supervisorScope {
val dUser = async { fetchUserMayFail() }
val dCart = async { fetchCartAlways() }
val dConfig = async { fetchConfigAlways() }
val user = try { dUser.await() } catch (e: Exception) { "anonymous" }
val cart = dCart.await()
val config = dConfig.await()
PartialDashboard(user, cart, config)
}
Without settled — one failure cancels everything:
@KapTypeSafe
data class Dashboard(val user: String, val cart: String, val config: String)
val dashboard = kap(::Dashboard)
.withUser { fetchUser() } // throws! → cart and config CANCELLED
.withCart { fetchCart() } // never runs
.withConfig { fetchConfig() } // never runs
.evalGraph()
// RuntimeException — entire dashboard lost. Cart and config were fine.
With settled { } — failure wrapped, siblings continue:
// The type changes: user becomes Result<String> instead of String
@KapTypeSafe
data class Dashboard(val user: Result<String>, val cart: String, val config: String)
val dashboard = kap(::Dashboard)
.withUser(settled { fetchUser() }) // Result<String> — won't cancel siblings
.withCart { fetchCart() } // String — runs normally
.withConfig { fetchConfig() } // String — runs normally
.evalGraph()
// Dashboard(user=Result.failure(RuntimeException), cart=cart-ok, config=config-ok)
// Use the result with a fallback:
val userName = dashboard.user.getOrDefault("anonymous") // "anonymous"
timed { } — Measure any call without manual instrumentation¶
The timed { } shorthand wraps a call so it returns TimedResult<A> — the value plus its wall-clock duration:
@KapTypeSafe
data class Dashboard(val user: String, val latency: TimedResult<String>)
val dashboard = kap(::Dashboard)
.withUser { fetchUser() }
.withLatency(timed { fetchSlowService() }) // TimedResult(value, duration)
.evalGraph()
println(dashboard.latency.duration) // 230.ms
println(dashboard.latency.value) // "slow-result"
Like settled { }, timed { } is a top-level shorthand for Kap { block() }.timed(). Use it inline in .with calls to measure individual branches without restructuring your graph.
traverseSettled — Collect ALL results, no cancellation¶
Collections¶
Use this when: you have a list of items to process in parallel with bounded concurrency.
traverse¶
traverseDiscard — Fire-and-forget¶
sequence / sequence(concurrency)¶
Error handling¶
Use this when: you need fallbacks, timeouts, or retries.
.timeout(duration, default)¶
.recover { } / .recoverWith { }¶
.retry(maxAttempts, delay, backoff)¶
Simple retry (for composable Schedule-based retry, see kap-resilience):
.ensure(error) { predicate } / .ensureNotNull(error) { extract }¶
catching { } — Exception-safe Result¶
.orElse(other) / firstSuccessOf¶
Racing¶
Use this when: you want the fastest result from multiple sources.
raceN(c1, c2, ..., cN) — First to succeed wins, rest cancelled¶
race(fa, fb) — Two-way race¶
raceAll(list) — Race a dynamic list¶
Level 3 — Advanced¶
Construction utilities¶
Kap { } — Wrap a suspend lambda¶
val effect: Kap<String> = Kap { fetchUser() } // nothing runs yet
val result: String = effect.evalGraph() // NOW it runs
kap(f) — Curry a function for .with chains¶
Works with constructor refs, function refs, and lambdas:
// Constructor reference
@KapTypeSafe
data class Greeting(val name: String, val message: String)
val g1 = kap(::Greeting).withName { fetchName() }.withMessage { "hello" }.evalGraph()
// Lambda — use Kap.of with manual currying
val greet: (String, Int) -> String = { name, age -> "Hi $name, you're $age" }
val g2 = Kap.of { name: String -> { age: Int -> greet(name, age) } }
.with { fetchName() }.with { fetchAge() }.evalGraph()
// Function — annotate with @KapTypeSafe for named builders
@KapTypeSafe
fun buildSummary(name: String, items: Int): String = "$name has $items items"
val g3 = kap(BuildSummary).withName { fetchName() }.withItems { 5 }.evalGraph()
Kap.of(value) / Kap.empty() / Kap.failed(error) / Kap.defer { }¶
val pure: Kap<Int> = Kap.of(42) // pure value
val unit: Kap<Unit> = Kap.empty() // Unit computation
val failed: Kap<String> = Kap.failed(RuntimeException("boom")) // wrapped failure
val lazy: Kap<String> = Kap.defer { Kap { expensiveSetup() } } // lazy construction
.thenValue¶
Unlike .then which creates a real barrier, .thenValue fills a slot sequentially without blocking parallel siblings:
Flow integration¶
Flow.mapEffectOrdered — Preserve upstream order¶
// channelFlow + manual index tracking to preserve order
val results: Flow<String> = channelFlow {
val buffer = ConcurrentHashMap<Int, String>()
var nextIndex = 0
val semaphore = Semaphore(5)
userIdFlow.collectIndexed { index, id ->
semaphore.acquire()
launch {
val result = fetchUser(id)
buffer[index] = result
semaphore.release()
// Flush in-order results... (complex bookkeeping)
}
}
}
Flow.firstAsKap()¶
Memoization¶
// Manual Mutex + double-checked locking
private val mutex = Mutex()
private var cached: String? = null
suspend fun fetchOnce(): String {
cached?.let { return it }
return mutex.withLock {
cached?.let { return it }
val result = expensiveCall()
cached = result
result
}
}
// Caches failures too. Transient error? Cached forever.
Interop¶
Deferred.toKap() / Kap.toDeferred(scope)¶
Bridge between existing coroutine code and KAP. Useful when you have a Deferred from a library or legacy code and want to compose it with other Kap combinators (.map, .recover, .timeout, parallel .with chains, etc.).
val deferred: Deferred<String> = scope.async { fetchUser() }
val kap: Kap<String> = deferred.toKap()
val result = kap.evalGraph()
(suspend () -> A).toKap()¶
computation { } — Imperative builder¶
Observability¶
@KapTypeSafe
data class Dashboard(val user: String, val config: String)
val tracer = KapTracer { event ->
when (event) {
is TraceEvent.Started -> logger.info("${event.name} started")
is TraceEvent.Succeeded -> metrics.timer(event.name).record(event.duration)
is TraceEvent.Failed -> logger.error("${event.name} failed", event.error)
}
}
val result = kap(::Dashboard)
.withUser(Kap { fetchUser() }.traced("fetch-user", tracer))
.withConfig(Kap { fetchConfig() }.traced("fetch-config", tracer))
.evalGraph()
Utilities¶
.keepFirst / .keepSecond¶
Run both in parallel, keep only one result:
.discard() / .peek { }¶
val unit = Kap { fetchUser() }.discard().evalGraph() // runs but returns Unit
val user = Kap { fetchUser() }
.peek { println("Fetched: $it") } // side-effect, returns original value
.evalGraph()
.on(context) / .named(name)¶
val result = Kap { readFile() }
.on(Dispatchers.IO) // switch dispatcher
.named("file-read") // coroutine name for debugging
.evalGraph()
.evalGraph() — Execute from any suspend context¶
delayed(duration, value) / withOrNull¶
val result = delayed(100.milliseconds, "delayed-value").evalGraph()
val maybeResult: String? = withOrNull { Kap { riskyOperation() } }
Execution model¶
Kap<A> is lazy — nothing runs until .evalGraph():
@KapTypeSafe
data class Dashboard(val user: String, val cart: String, val promos: String)
val plan: Kap<Dashboard> = kap(::Dashboard)
.withUser { fetchDashUser() }
.withCart { fetchDashCart() }
.withPromos { fetchDashPromos() }
println("Plan built. Nothing has executed yet.")
println("plan is: ${plan::class.simpleName}")
val result: Dashboard = plan.evalGraph() // NOW it runs
Key guarantees:
- Structured concurrency: All parallel branches run inside
coroutineScope. One fails → siblings cancel. - Cancellation safety:
CancellationExceptionis never caught. All combinators re-throw it. - Context propagation:
.evalGraph(MDCContext())propagates context to all branches. - No reflection: All type safety is compile-time. Zero runtime overhead.
- Algebraic laws: Functor, Applicative, Monad — property-tested via Kotest. See LAWS.md.
API Reference Map¶
| I want to... | Use this |
|---|---|
| Run tasks in parallel (named) | .withParamName { } via @KapTypeSafe |
| Run tasks in parallel (generic) | .with { } |
| Wait for all before continuing | .then { } / .thenParamName { } |
| Use previous result in next phase | .andThen { ctx -> } |
| Handle one failure without cancelling rest | settled { } |
| Process a list with bounded concurrency | traverse(concurrency) { } |
| Retry on failure | .retry(schedule) |
| Timeout with fallback | .timeout(duration) { default } |
| Recover from errors | .recover { } |
| Race multiple sources | raceN(c1, c2, c3) |
| Cache computation result | .memoizeOnSuccess() |
| Guaranteed resource cleanup | bracket(acquire, use, release) |