Collect on BroadcastChannel asFlow()

from the CommonsWare Community archives

At May 3, 2021, 6:41pm, Jan asked:

I have a broadcastchannel:

private val characteristicChangedChannel =
    BroadcastChannel<ChangeNotification>(1488)  // capacity set

    // because you can't use channel filters in Kotlin 1.4, changed here:
    //val notificationChannel = characteristicChangedChannel.openSubscription().filter {
    val notificationChannel = characteristicChangedChannel.asFlow().filter {it.uuid == characteristic.uuid }

Send works okay:
characteristicChangedChannel.send(
notificationBuffer.removeAt(0)
)

But nothing is ever received back in flow.

Trying to do a

notificationChannel.collect

gives compiler error messages using as collect() or as collect { }

Compiler says I need to extend collect to use as a FlowCollector
notificationChannel.collect

but I don’t know how that would help to actually get the response (and honestly no idea what to do in the extension).

so I wrote this:

fun fetchResponse() = flow<ChangeNotification?> {
        var response: String? = ""

        notificationChannel.collect {

—>>> response = readCharacteristicInFlow(it)
this line is never invoked - no collect being done
}
response
}

My question:
How do I receive or collect on something that was sent on a broadcast channel when using the
asFlow() for the response from the broadcast (what was previously a ReceiveChannel type).

I’ve been stumped for 3 days on this and scoured the internet. I tried all the suggestions on the internet.
One suggestion said to add an emit to the above flow lambda to get things going. That didn’t work.


At May 3, 2021, 10:53pm, mmurphy replied:

Um, call collect():

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

fun main() {
  val characteristicChangedChannel = BroadcastChannel<Int>(1488)
  val notificationChannel = characteristicChangedChannel.asFlow().filter { it % 2 == 0 }
  
  GlobalScope.launch(Dispatchers.Main) {
    notificationChannel.collect { println(it) }
    println("That's all folks!")
  }
  
  GlobalScope.launch(Dispatchers.Main) {
    characteristicChangedChannel.send(1)
    characteristicChangedChannel.send(2)
  }

  println("...and we're off!")
}

If you paste that into a Klassbook scratch pad and run it, you get the expected output:

2

…as the 1 output is filtered out from the flow.


At May 3, 2021, 11:02pm, Jan replied:

Interesting. It looks like Studio is right when it says I have to extend collect. I guess it only works for primitives. Thanks for the example. At least I know the approach that works.


At May 3, 2021, 11:25pm, mmurphy replied:

No. For example, this works:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

data class Thingy(val value: Int)

fun main() {
  val characteristicChangedChannel = BroadcastChannel<Thingy>(1488)
  val notificationChannel = characteristicChangedChannel.asFlow().filter { it.value % 2 == 0 }
  
  GlobalScope.launch(Dispatchers.Main) {
    notificationChannel.collect { println(it) }
    println("That's all folks!")
  }
  
  GlobalScope.launch(Dispatchers.Main) {
    characteristicChangedChannel.send(Thingy(1))
    characteristicChangedChannel.send(Thingy(2))
  }

  println("...and we're off!")
}