Kotlin Flows as blackhole
This was yet another case of a service bug that stayed dormant for years and then suddenly, when the data pattern changed, surfaced and created a week long on-call sprint. One such dormant bug I had written about in the previous post.
The service under question reads data constantly from a source, applies certain transformations, and writes to a sink (S3). Quite simple actually.
Normally, the data read is under a manageable volume that kept in sync with the S3 write operations. At least till now.
And then, suddenly the service encountered a large chunk of data, and S3 write couldn’t keep up with the pace it was reading from the source. It showed signs of frequent crashes and high heap mem usage and GC activities.
After much digging around, we got some heap dumps. (Yeah, its not that easy to get the heap dumps from within the docker containers.)
Similar to a previous encounter, I got the heapdump to analyze with Eclipse MAT.
It looked something like this:
So, a single thread, a single kotlin channel, is consuming 95% of the heap? But why?
Turns out, the ‘buffer’ that was plugged between the producer (component that reads the data) and the consumer (that writes to S3) was not applying backpressure. It was thought that it did.
Here is a sample code:
fun <T> Flow<T>.buffer(
maxCapacity: Int,
sizeFn: (T) -> Int
): Flow<T> = flow {
coroutineScope {
var capacity = maxCapacity
val consumeTrigger = Channel<Unit>(capacity = Channel.CONFLATED)
val channel = produce(capacity = Channel.UNLIMITED) {
collect {
val size = sizeFn(it)
while (size > capacity) { consumeTrigger.receive() }
capacity -= size
send(it)
}
}
channel.consumeEach {
val size = sizeFn(it)
capacity += size
consumeTrigger.send(Unit)
emit(it)
}
}
}
fun <T> Flow<T>.backPressuredBuffer(
scope: CoroutineScope,
maxItemRunningSize: Int,
sizeFn: (T) -> Int
): ReceiveChannel<T> =
buffer(maxItemRunningSize, instrumentationFn, sizeFn)
.buffer(capacity = Channel.UNLIMITED)
.produceIn(scope)
So, this thing, while applied a steady flow from the first buffer, it kept on consuming even when the buffer size was limited to say 1.
As learnt the hard way, a Channel.UNLIMITED
essentially creates an infinite linkedList that takes whatever you offer it. And of course, heap memory is the limit.
Fixing it was quite simple. Just dont make it unlimited, and have a reasonable size, based on service behavior.
Here is a small sample unit test that validates if it works, or not:
@Test
fun `test buffer doesn't overflow the max size`() {
val totalItems = 1000
val maxCapacity = 1000 // bytes
// inner buffer is limited at 1k bytes
// outer buffer is limited at 10 entries = 1k/100
val randomByteArrays = List(totalItems) {
ByteArray(Random.nextInt(100, 201)).apply {
Random.nextBytes(this)
}
}
val cursor = AtomicInteger(0)
val receiveChannel = randomByteArrays.asFlow()
.produceBuffered(
scope = this,
maxItemRunningSize = maxCapacity,
sizeFn = { it.size }
)
// cursor is advanced by 1 which is not yet consumed by the buffer because the buffer is full
assertTrue(cursor.get() <= 22)
assertTrue(cursor.get() >= 11)
receiveChannel.cancel()
}