Into the Flow: Kotlin cold streams primer
If you are a regular user in the Kotlin community you must have heard about Kotlin Flows. At least, I've heard about them but never spend the time looking into them. But lately, when I was about to start a new Android project I decided it was finally time to look into them. Maybe everybody was talking about them for a reason.
Why Flows
We already have suspending functions. We could do async operations without blocking, by using suspend fun doSomething(): List<Something>
for instance. But that would mean that we need to calculate all the results before starting to process them. Using fun doSomething(): Flow<Something>
we can process the results as soon as they are ready in a non-blocking way.
It's Cold Flows actually
You might have noticed that the function returning the Flow
does not have the suspend
prefix. This is because it returns immediately and doesn't do any processing. The actual work happens when the flow is "collected" (or more generally speaking a terminal operator is applied on the flow), e.g. flow.collect { something -> println(something) }
. Every time we call collect
, the flow code will be executed again.
It's referred to as "cold", because the flow (or stream if you like) doesn't exist before collect
is called and is created again every time collect
is called. If the flow was there anyway and we just listened to it, then it would be "hot".
Create and terminate a Flow
The most basic way to create a Flow is using the flow { ... }
builder (unexpected, right?)
fun doSomething(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
A new item in the flow can be "emitted" using emit
.
Code inside the flow { ... }
builder block can suspend (delay
is a suspend
function). But, as we said before, doSomething()
is not a suspend
function and doesn't block.
val something = doSomething()
viewModelScope.launch {
something.collect { value -> println(value) }
}
On the other hand, collect
is a suspend function and needs to run inside a Coroutine Scope.
Other ways to create a Flow include converting regular collections (e.g. listOf(1,2,3).asFlow()
) or fixed set of items into Flows (e.g. (1..3).asFlow()
).
collect
is just one way to terminate a Flow. Other ways include getting the first
, ensuring only a single
value is emitted, reducing, folding, converting toList
, etc.
Transforming Flows
Transforming is quite similar to how you would transform a regular collection. Familiar operators such as map
, filter
, take
exist. I think there are 2 major differences from the regular collection transformation:
- Code inside those transformation functions can
suspend
- You don't have to return a single value, you can call
emit
as many times as you want usingtransform
:
(1..3).asFlow()
.transform { number ->
emit(number*2)
delay(100)
emit(number*4)
}
.collect { number -> println(number) }
Coroutine Scope
When we collect
a Flow we are executing a suspend
function. This means that both the code inside flow { ... }
and inside collect { ... }
will run on the current Coroutine Scope. But there are cases we want those 2 code blocks to run in different Coroutine Context (usually to run them in different Dispatchers, for not blocking the Main/UI thread for instance).
We shouldn't use the familiar withContext(Dispatchers.IO)
in this case. Flows provide a dedicated flowOn
operator for this.
fun doSomething(): Flow<Int> = flow {
// This runs on Dispatchers.IO
for (i in 1..3) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.IO)
[...]
viewModelScope.launch {
doSomething().collect { value ->
// This runs on Main thread and just prints those values
print (value)
}
}
Composing Flows
One of the most powerful things you can do with Flows it to compose them in different ways.
-
zip
them, i.e. combine the 1st item of Flow A with the 1st item of Flow B, etc
val flowA = (1..3).asFlow()
val flowB = flowOf("one", "two", "three")
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
// Output:
// 1 and one
// 2 and two
// 3 and three
combine
them, i.e. every time a new value is emitted from Flow A combine it with the latest value of Flow B
val flowA = (1..3).asFlow()
val flowB = flowOf("single item")
flowA.combine(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
// Output:
// 1 and single item
// 2 and single item
// 3 and single item
Flattening Flows
Getting to the situation where a Flow transforms into another "sub"-Flow and you end up with Flow<Flow<X>>
is quite common. Similarly to collections, there is a number of flat*
operations.
flatMapConcat
for waiting for each "sub"-Flow to complete before collecting (i.e. collecting Flows sequentially)flatMapMerge
for collecting whatever item is emitted (i.e. collecting the Flows in parallel)flatMapLatest
for collecting the latest "sub"-Flow that is emitted, while canceling the previous "sub"-Flow
Exceptions
Whatever exception is thrown in flow { ... }
or collect { ... }
can be caught with regular try - catch
statements.
But there is a more declarative way, with the catch
operator. Note that when you "catch" the exception, you can emit
again, throw
, log, etc.
doSomethingThatMightThrow()
.catch { e -> emit("Caught $e but emitting something else") }
.collect { value -> println(value) }
Another important thing to remember is that this operator is "upstream" only, i.e. it does not "catches" anything after it's declared.
doSomethingThatMightThrow()
.catch { e -> emit("Caught $e but emitting something else") }
.collect {
// Something gone wrong here, will not be caught
}
My impression after looking into Flows is that they can handle more than the basic "cold stream" needs. They are not as comprehensive and powerful as other reactive libraries (e.g. RxJava) but you can always add them to your project later when needed (actually converters between Flows and other reactive libraries are provided).
For those of you wanting to read more on the subject, I would recommend the official docs and these blog posts with insights into the design of Flows (1, 2).