Code samples for advanced quasar article (#7271)

This commit is contained in:
Graham Cox 2019-07-09 05:36:57 +01:00 committed by Grzegorz Piwowarek
parent e5d3e34725
commit 4b27a2f33c
4 changed files with 613 additions and 0 deletions

View File

@ -45,6 +45,27 @@
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
</dependencies>
<build>
@ -116,5 +137,7 @@
<properties>
<quasar.version>0.8.0</quasar.version>
<kotlin.version>1.3.31</kotlin.version>
<org.slf4j.version>1.7.21</org.slf4j.version>
<logback.version>1.1.7</logback.version>
</properties>
</project>

View File

@ -0,0 +1,157 @@
package com.baeldung.quasar
import co.paralleluniverse.actors.Actor
import co.paralleluniverse.actors.ActorRef
import co.paralleluniverse.actors.behaviors.*
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.SuspendableCallable
import org.junit.Test
import org.slf4j.LoggerFactory
import java.lang.Exception
class ActorsBehaviorTest {
companion object {
private val LOG = LoggerFactory.getLogger(ActorsBehaviorTest::class.java)
}
@Test
fun requestReplyHelper() {
data class TestMessage(val input: Int) : RequestMessage<Int>()
val actor = object : Actor<TestMessage, Void>("requestReplyActor", null) {
@Suspendable
override fun doRun(): Void? {
while (true) {
val msg = receive()
LOG.info("Processing message: {}", msg)
RequestReplyHelper.reply(msg, msg.input * 100)
}
}
}
val actorRef = actor.spawn()
val result = RequestReplyHelper.call(actorRef, TestMessage(50))
LOG.info("Received reply: {}", result)
}
@Test
fun server() {
val actor = ServerActor(object : AbstractServerHandler<Int, String, Float>() {
@Suspendable
override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String {
LOG.info("Called with message: {} from {} with ID {}", m, from, id)
return m.toString() ?: "None"
}
@Suspendable
override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) {
LOG.info("Cast message: {} from {} with ID {}", m, from, id)
}
})
val server = actor.spawn()
LOG.info("Call result: {}", server.call(5))
server.cast(2.5f)
server.shutdown()
}
interface Summer {
fun sum(a: Int, b: Int) : Int
}
@Test
fun proxyServer() {
val actor = ProxyServerActor(false, object : Summer {
@Synchronized
override fun sum(a: Int, b: Int): Int {
Exception().printStackTrace()
LOG.info("Adding together {} and {}", a, b)
return a + b
}
})
val summerActor = actor.spawn()
val result = (summerActor as Summer).sum(1, 2)
LOG.info("Result: {}", result)
summerActor.shutdown()
}
@Test
fun eventSource() {
val actor = EventSourceActor<String>()
val eventSource = actor.spawn()
eventSource.addHandler { msg ->
LOG.info("Sent message: {}", msg)
}
val name = "Outside Value"
eventSource.addHandler { msg ->
LOG.info("Also Sent message: {} {}", msg, name)
}
eventSource.send("Hello")
eventSource.shutdown()
}
@Test
fun finiteStateMachine() {
val actor = object : FiniteStateMachineActor() {
@Suspendable
override fun initialState(): SuspendableCallable<SuspendableCallable<*>> {
LOG.info("Starting")
return SuspendableCallable { lockedState() }
}
@Suspendable
fun lockedState() : SuspendableCallable<SuspendableCallable<*>> {
return receive {msg ->
when (msg) {
"PUSH" -> {
LOG.info("Still locked")
lockedState()
}
"COIN" -> {
LOG.info("Unlocking...")
unlockedState()
}
else -> TERMINATE
}
}
}
@Suspendable
fun unlockedState() : SuspendableCallable<SuspendableCallable<*>> {
return receive {msg ->
when (msg) {
"PUSH" -> {
LOG.info("Locking")
lockedState()
}
"COIN" -> {
LOG.info("Unlocked")
unlockedState()
}
else -> TERMINATE
}
}
}
}
val actorRef = actor.spawn()
listOf("PUSH", "COIN", "COIN", "PUSH", "PUSH").forEach {
LOG.info(it)
actorRef.sendSync(it)
}
actorRef.shutdown()
}
}

View File

@ -0,0 +1,298 @@
package com.baeldung.quasar
import co.paralleluniverse.actors.*
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.channels.Channels
import org.junit.Assert
import org.junit.Test
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
class ActorsTest {
companion object {
private val LOG = LoggerFactory.getLogger(ActorsTest::class.java)
}
@Test
fun createNoopActor() {
val actor = object : Actor<Int, String>("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
return "Hello"
}
}
actor.spawn()
println("Noop Actor: ${actor.get()}")
}
@Test
fun registerActor() {
val actor = object : Actor<Int, String>("registerActor", null) {
@Suspendable
override fun doRun(): String {
return "Hello"
}
}
val actorRef = actor.spawn()
actor.register()
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("registerActor")
Assert.assertEquals(actorRef, retrievedRef)
actor.join()
}
@Test
fun registerActorNewName() {
val actor = object : Actor<Int, String>(null, null) {
@Suspendable
override fun doRun(): String {
return "Hello"
}
}
val actorRef = actor.spawn()
actor.register("renamedActor")
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("renamedActor")
Assert.assertEquals(actorRef, retrievedRef)
actor.join()
}
@Test
fun retrieveUnknownActor() {
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("unknownActor", 1, TimeUnit.SECONDS)
Assert.assertNull(retrievedRef)
}
@Test
fun createSimpleActor() {
val actor = object : Actor<Int, Void?>("simpleActor", null) {
@Suspendable
override fun doRun(): Void? {
val msg = receive()
LOG.info("SimpleActor Received Message: {}", msg)
return null
}
}
val actorRef = actor.spawn()
actorRef.send(1)
actor.join()
}
@Test
fun createLoopingActor() {
val actor = object : Actor<Int, Void?>("loopingActor", null) {
@Suspendable
override fun doRun(): Void? {
while (true) {
val msg = receive()
if (msg > 0) {
LOG.info("LoopingActor Received Message: {}", msg)
} else {
break
}
}
return null
}
}
val actorRef = actor.spawn()
actorRef.send(3)
actorRef.send(2)
actorRef.send(1)
actorRef.send(0)
actor.join()
}
@Test
fun actorBacklog() {
val actor = object : Actor<Int, String>("backlogActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
TimeUnit.MILLISECONDS.sleep(500);
LOG.info("Backlog Actor Received: {}", receive())
try {
receive()
} catch (e: Throwable) {
LOG.info("==== Exception throws by receive() ====")
e.printStackTrace()
}
return "No Exception"
}
}
val actorRef = actor.spawn()
actorRef.send(1)
actorRef.send(2)
try {
LOG.info("Backlog Actor: {}", actor.get())
} catch (e: Exception) {
// Expected
LOG.info("==== Exception throws by get() ====")
e.printStackTrace()
}
}
@Test
fun actorBacklogTrySend() {
val actor = object : Actor<Int, String>("backlogTrySendActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
TimeUnit.MILLISECONDS.sleep(500);
LOG.info("Backlog TrySend Actor Received: {}", receive())
return "No Exception"
}
}
val actorRef = actor.spawn()
LOG.info("Backlog TrySend 1: {}", actorRef.trySend(1))
LOG.info("Backlog TrySend 1: {}", actorRef.trySend(2))
actor.join()
}
@Test
fun actorTimeoutReceive() {
val actor = object : Actor<Int, String>("TimeoutReceiveActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
LOG.info("Timeout Actor Received: {}", receive(500, TimeUnit.MILLISECONDS))
return "Finished"
}
}
val actorRef = actor.spawn()
TimeUnit.MILLISECONDS.sleep(300)
actorRef.trySend(1)
actor.join()
}
@Test
fun actorNonBlockingReceive() {
val actor = object : Actor<Int, String>("NonBlockingReceiveActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
LOG.info("NonBlocking Actor Received #1: {}", tryReceive())
TimeUnit.MILLISECONDS.sleep(500)
LOG.info("NonBlocking Actor Received #2: {}", tryReceive())
return "Finished"
}
}
val actorRef = actor.spawn()
TimeUnit.MILLISECONDS.sleep(300)
actorRef.trySend(1)
actor.join()
}
@Test
fun evenActor() {
val actor = object : Actor<Int, Void?>("EvenActor", null) {
@Suspendable
override fun filterMessage(m: Any?): Int? {
return when (m) {
is Int -> {
if (m % 2 == 0) {
m * 10
} else {
null
}
}
else -> super.filterMessage(m)
}
}
@Suspendable
override fun doRun(): Void? {
while (true) {
val msg = receive()
if (msg > 0) {
LOG.info("EvenActor Received Message: {}", msg)
} else {
break
}
}
return null
}
}
val actorRef = actor.spawn()
actorRef.send(3)
actorRef.send(2)
actorRef.send(1)
actorRef.send(0)
actor.join()
}
@Test
fun watchingActors() {
val watched = object : Actor<Int, Void?>("WatchedActor", null) {
@Suspendable
override fun doRun(): Void? {
LOG.info("WatchedActor Starting")
receive(500, TimeUnit.MILLISECONDS)
LOG.info("WatchedActor Finishing")
return null
}
}
val watcher = object : Actor<Int, Void?>("WatcherActor", null) {
@Suspendable
override fun doRun(): Void? {
LOG.info("WatcherActor Listening")
try {
LOG.info("WatcherActor received Message: {}", receive(2, TimeUnit.SECONDS))
} catch (e: Exception) {
LOG.info("WatcherActor Received Exception", e)
}
return null
}
@Suspendable
override fun handleLifecycleMessage(m: LifecycleMessage?): Int? {
LOG.info("WatcherActor Received Lifecycle Message: {}", m)
return super.handleLifecycleMessage(m)
}
}
val watcherRef = watcher.spawn()
TimeUnit.MILLISECONDS.sleep(200)
val watchedRef = watched.spawn()
watcher.link(watchedRef)
watched.join()
watcher.join()
}
}

View File

@ -0,0 +1,135 @@
package com.baeldung.quasar
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.kotlin.fiber
import co.paralleluniverse.strands.channels.Channels
import co.paralleluniverse.strands.channels.Topic
import co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams
import org.junit.Test
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
class ReactiveStreamsTest {
companion object {
private val LOG = LoggerFactory.getLogger(ReactiveStreamsTest::class.java)
}
@Test
fun publisher() {
val inputChannel = Channels.newChannel<String>(1);
val publisher = ReactiveStreams.toPublisher(inputChannel)
publisher.subscribe(object : Subscriber<String> {
@Suspendable
override fun onComplete() {
LOG.info("onComplete")
}
@Suspendable
override fun onSubscribe(s: Subscription) {
LOG.info("onSubscribe: {}", s)
s.request(2)
}
@Suspendable
override fun onNext(t: String?) {
LOG.info("onNext: {}", t)
}
@Suspendable
override fun onError(t: Throwable?) {
LOG.info("onError: {}", t)
}
})
inputChannel.send("Hello")
inputChannel.send("World")
TimeUnit.SECONDS.sleep(1)
inputChannel.close()
}
@Test
fun publisherTopic() {
val inputTopic = Topic<String>()
val publisher = ReactiveStreams.toPublisher(inputTopic)
publisher.subscribe(object : Subscriber<String> {
@Suspendable
override fun onComplete() {
LOG.info("onComplete 1")
}
@Suspendable
override fun onSubscribe(s: Subscription) {
LOG.info("onSubscribe 1: {}", s)
s.request(2)
}
@Suspendable
override fun onNext(t: String?) {
LOG.info("onNext 1: {}", t)
}
@Suspendable
override fun onError(t: Throwable?) {
LOG.info("onError 1: {}", t)
}
})
publisher.subscribe(object : Subscriber<String> {
@Suspendable
override fun onComplete() {
LOG.info("onComplete 2")
}
@Suspendable
override fun onSubscribe(s: Subscription) {
LOG.info("onSubscribe 2: {}", s)
s.request(2)
}
@Suspendable
override fun onNext(t: String?) {
LOG.info("onNext 2: {}", t)
}
@Suspendable
override fun onError(t: Throwable?) {
LOG.info("onError 2: {}", t)
}
})
inputTopic.send("Hello")
inputTopic.send("World")
TimeUnit.SECONDS.sleep(1)
inputTopic.close()
}
@Test
fun subscribe() {
val inputChannel = Channels.newChannel<String>(10);
val publisher = ReactiveStreams.toPublisher(inputChannel)
val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher)
fiber @Suspendable {
while (!channel.isClosed) {
val message = channel.receive()
LOG.info("Received: {}", message)
}
LOG.info("Stopped receiving messages")
}
inputChannel.send("Hello")
inputChannel.send("World")
TimeUnit.SECONDS.sleep(1)
inputChannel.close()
}
}