Channels examples (#10225)

Co-authored-by: Harihar Das <harihar.das@revolut.com>
This commit is contained in:
Harihar Das 2020-11-03 19:13:36 +01:00 committed by GitHub
parent a0828bf3d1
commit f40ed2cef5
11 changed files with 312 additions and 0 deletions

View File

@ -0,0 +1,29 @@
package com.baeldung.channles
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val basket = Channel<String>(1)
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
repeat(3) {
delay(100)
println("coroutine2: Received ${basket.receive()}")
}
}
delay(2000)
coroutineContext.cancelChildren()
}

View File

@ -0,0 +1,27 @@
package com.baeldung.channles
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val basket = Channel<String>(CONFLATED)
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
println("coroutine2: Received ${basket.receive()}")
}
delay(2000)
coroutineContext.cancelChildren()
}

View File

@ -0,0 +1,52 @@
package com.baeldung.channles
import com.baeldung.channles.OrderStatus.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
enum class OrderStatus { ORDERED, BAKED, TOPPED, SERVED }
data class PizzaOrder(val orderNumber: Int, val orderStatus: OrderStatus = ORDERED)
@ExperimentalCoroutinesApi
fun CoroutineScope.baking(orders: ReceiveChannel<PizzaOrder>) = produce {
for (order in orders) {
delay(200)
println("Baking ${order.orderNumber}")
send(order.copy(orderStatus = BAKED))
}
}
@ExperimentalCoroutinesApi
fun CoroutineScope.topping(orders: ReceiveChannel<PizzaOrder>) = produce {
for (order in orders) {
delay(50)
println("Topping ${order.orderNumber}")
send(order.copy(orderStatus = TOPPED))
}
}
@ExperimentalCoroutinesApi
fun CoroutineScope.produceOrders(count: Int) = produce {
repeat(count) {
delay(50)
send(PizzaOrder(orderNumber = it + 1))
}
}
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() = runBlocking {
val orders = produceOrders(3)
val readyOrders = topping(baking(orders))
for (order in readyOrders) {
println("Serving ${order.orderNumber}")
}
delay(3000)
println("End!")
coroutineContext.cancelChildren()
}

View File

@ -0,0 +1,22 @@
package com.baeldung.channles
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking
@ExperimentalCoroutinesApi
fun CoroutineScope.produceFruits(): ReceiveChannel<String> = produce {
val fruits = listOf("Apple", "Orange", "Apple")
for (fruit in fruits) send(fruit)
}
@ExperimentalCoroutinesApi
fun main() = runBlocking {
val fruitChannel = produceFruits()
for (fruit in fruitChannel) {
println(fruit)
}
println("End!")
}

View File

@ -0,0 +1,26 @@
package com.baeldung.channles
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking {
val basket = Channel<String>()
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
repeat(3) {
delay(100)
println("coroutine2: Received ${basket.receive()}")
}
}
delay(2000)
coroutineContext.cancelChildren()
}

View File

@ -0,0 +1,36 @@
package com.baeldung.channles
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
suspend fun fetchYoutubeVideos(channel: SendChannel<String>) {
val videos = listOf("cat video", "food video")
for (video in videos) {
delay(100)
channel.send(video)
}
}
suspend fun fetchTweets(channel: SendChannel<String>) {
val tweets = listOf("tweet: Earth is round", "tweet: Coroutines and channels are cool")
for (tweet in tweets) {
delay(100)
channel.send(tweet)
}
}
fun main() = runBlocking {
val aggregate = Channel<String>()
launch { fetchYoutubeVideos(aggregate) }
launch { fetchTweets(aggregate) }
repeat(4) {
println(aggregate.receive())
}
coroutineContext.cancelChildren()
}

View File

@ -0,0 +1,31 @@
package com.baeldung.channles
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
@ExperimentalCoroutinesApi
fun CoroutineScope.producePizzaOrders(): ReceiveChannel<String> = produce {
var x = 1
while (true) {
send("Pizza Order No. ${x++}")
delay(100)
}
}
fun CoroutineScope.pizzaOrderProcessor(id: Int, orders: ReceiveChannel<String>) = launch {
for (order in orders) {
println("Processor #$id is processing $order")
}
}
@ExperimentalCoroutinesApi
fun main() = runBlocking {
val pizzaOrders = producePizzaOrders()
repeat(3) {
pizzaOrderProcessor(it + 1, pizzaOrders)
}
delay(1000)
pizzaOrders.cancel()
}

View File

@ -0,0 +1,24 @@
package com.baeldung.channles
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.time.Duration
import kotlin.random.Random
fun stockPrice(stock: String): Double {
log("Fetching stock price of $stock")
return Random.nextDouble(2.0, 3.0)
}
fun main() = runBlocking {
val tickerChannel = ticker(Duration.ofSeconds(5).toMillis())
repeat(3) {
tickerChannel.receive()
log(stockPrice("TESLA"))
}
delay(Duration.ofSeconds(11).toMillis())
tickerChannel.cancel()
}

View File

@ -0,0 +1,28 @@
package com.baeldung.channles
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val channel = Channel<Int>(UNLIMITED)
launch { // coroutine1
repeat(100) {
println("coroutine1: Sending $it")
channel.send(it)
}
}
launch { // coroutine2
repeat(100) {
println("coroutine2: Received ${channel.receive()}")
}
}
delay(2000)
coroutineContext.cancelChildren()
}

View File

@ -0,0 +1,8 @@
package com.baeldung.channles
import java.text.SimpleDateFormat
import java.util.*
fun log(value: Any) {
println(SimpleDateFormat("HH:MM:ss").format(Date()) + " - $value")
}

View File

@ -0,0 +1,29 @@
package com.baeldung.channels
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
class ChannelsTest {
@Test
fun should_pass_data_from_one_coroutine_to_another() {
runBlocking {
// given
val channel = Channel<String>()
// when
launch { // coroutine1
channel.send("Hello World!")
}
val result = async { // coroutine 2
channel.receive()
}
// then
assertThat(result.await()).isEqualTo("Hello World!")
}
}
}