Collect on BroadcastChannel asFlow()

I have a broadcastchannel:

private val characteristicChangedChannel =
    BroadcastChannel<ChangeNotification>(1488)  // capacity set

    // because you can't use channel filters in Kotlin 1.4, changed here:
    //val notificationChannel = characteristicChangedChannel.openSubscription().filter {
    val notificationChannel = characteristicChangedChannel.asFlow().filter {it.uuid == characteristic.uuid }

Send works okay:
characteristicChangedChannel.send(
notificationBuffer.removeAt(0)
)

But nothing is ever received back in flow.

Trying to do a

notificationChannel.collect

gives compiler error messages using as collect() or as collect { }

Compiler says I need to extend collect to use as a FlowCollector
notificationChannel.collect

but I don’t know how that would help to actually get the response (and honestly no idea what to do in the extension).

so I wrote this:

fun fetchResponse() = flow<ChangeNotification?> {
        var response: String? = ""

        notificationChannel.collect {

—>>> response = readCharacteristicInFlow(it)
this line is never invoked - no collect being done
}
response
}

My question:
How do I receive or collect on something that was sent on a broadcast channel when using the
asFlow() for the response from the broadcast (what was previously a ReceiveChannel type).

I’ve been stumped for 3 days on this and scoured the internet. I tried all the suggestions on the internet.
One suggestion said to add an emit to the above flow lambda to get things going. That didn’t work.

Um, call collect():

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

fun main() {
  val characteristicChangedChannel = BroadcastChannel<Int>(1488)
  val notificationChannel = characteristicChangedChannel.asFlow().filter { it % 2 == 0 }
  
  GlobalScope.launch(Dispatchers.Main) {
    notificationChannel.collect { println(it) }
    println("That's all folks!")
  }
  
  GlobalScope.launch(Dispatchers.Main) {
    characteristicChangedChannel.send(1)
    characteristicChangedChannel.send(2)
  }

  println("...and we're off!")
}

If you paste that into a Klassbook scratch pad and run it, you get the expected output:

2

…as the 1 output is filtered out from the flow.

Interesting. It looks like Studio is right when it says I have to extend collect. I guess it only works for primitives. Thanks for the example. At least I know the approach that works.

No. For example, this works:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

data class Thingy(val value: Int)

fun main() {
  val characteristicChangedChannel = BroadcastChannel<Thingy>(1488)
  val notificationChannel = characteristicChangedChannel.asFlow().filter { it.value % 2 == 0 }
  
  GlobalScope.launch(Dispatchers.Main) {
    notificationChannel.collect { println(it) }
    println("That's all folks!")
  }
  
  GlobalScope.launch(Dispatchers.Main) {
    characteristicChangedChannel.send(Thingy(1))
    characteristicChangedChannel.send(Thingy(2))
  }

  println("...and we're off!")
}