BAEL-1018 - Introduction to MBassador (#2551)
This commit is contained in:
parent
01529c3202
commit
3c4faf999d
@ -519,6 +519,11 @@
|
|||||||
<artifactId>yarg</artifactId>
|
<artifactId>yarg</artifactId>
|
||||||
<version>2.0.4</version>
|
<version>2.0.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>net.engio</groupId>
|
||||||
|
<artifactId>mbassador</artifactId>
|
||||||
|
<version>1.3.1</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<repositories>
|
<repositories>
|
||||||
<repository>
|
<repository>
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
public class AckMessage extends Message {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
public class Message {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,15 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
public class RejectMessage extends Message {
|
||||||
|
|
||||||
|
int code;
|
||||||
|
|
||||||
|
public int getCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCode(int code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
public class MBassadorAsyncDispatchTest {
|
||||||
|
|
||||||
|
private MBassador dispatcher = new MBassador();
|
||||||
|
private String testString;
|
||||||
|
private AtomicBoolean ready = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareTests() {
|
||||||
|
dispatcher.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenAsyncDispatched_thenMessageReceived() {
|
||||||
|
dispatcher.post("foobar").asynchronously();
|
||||||
|
await().untilAtomic(ready, equalTo(true));
|
||||||
|
assertNotNull(testString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleStringMessage(String message) {
|
||||||
|
this.testString = message;
|
||||||
|
ready.set(true);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,47 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
import net.engio.mbassy.listener.Invoke;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
public class MBassadorAsyncInvocationTest {
|
||||||
|
|
||||||
|
private MBassador dispatcher = new MBassador();
|
||||||
|
|
||||||
|
private Integer testInteger;
|
||||||
|
private String invocationThreadName;
|
||||||
|
private AtomicBoolean ready = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareTests() {
|
||||||
|
dispatcher.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenHandlerAsync_thenHandled() {
|
||||||
|
|
||||||
|
dispatcher.post(42).now();
|
||||||
|
|
||||||
|
await().untilAtomic(ready, equalTo(true));
|
||||||
|
assertNotNull(testInteger);
|
||||||
|
assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(delivery = Invoke.Asynchronously)
|
||||||
|
public void handleIntegerMessage(Integer message) {
|
||||||
|
this.invocationThreadName = Thread.currentThread().getName();
|
||||||
|
this.testInteger = message;
|
||||||
|
ready.set(true);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.bus.common.DeadMessage;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
public class MBassadorBasicTest {
|
||||||
|
|
||||||
|
private MBassador dispatcher = new MBassador();
|
||||||
|
|
||||||
|
private String messageString;
|
||||||
|
private Integer messageInteger;
|
||||||
|
private Object deadEvent;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareTests() {
|
||||||
|
dispatcher.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenStringDispatched_thenHandleString() {
|
||||||
|
dispatcher.post("TestString").now();
|
||||||
|
assertNotNull(messageString);
|
||||||
|
assertEquals("TestString", messageString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenIntegerDispatched_thenHandleInteger() {
|
||||||
|
dispatcher.post(42).now();
|
||||||
|
assertNull(messageString);
|
||||||
|
assertNotNull(messageInteger);
|
||||||
|
assertTrue(42 == messageInteger);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenLongDispatched_thenDeadEvent() {
|
||||||
|
dispatcher.post(42L).now();
|
||||||
|
assertNull(messageString);
|
||||||
|
assertNull(messageInteger);
|
||||||
|
assertNotNull(deadEvent);
|
||||||
|
assertTrue(deadEvent instanceof Long);
|
||||||
|
assertTrue(42L == (Long) deadEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleString(String message) {
|
||||||
|
messageString = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleInteger(Integer message) {
|
||||||
|
messageInteger = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleDeadEvent(DeadMessage message) {
|
||||||
|
|
||||||
|
deadEvent = message.getMessage();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,90 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||||
|
import net.engio.mbassy.bus.error.PublicationError;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class MBassadorConfigurationTest implements IPublicationErrorHandler {
|
||||||
|
|
||||||
|
private MBassador dispatcher;
|
||||||
|
private String messageString;
|
||||||
|
private Throwable errorCause;
|
||||||
|
|
||||||
|
private LinkedList<Integer> list = new LinkedList<>();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareTests() {
|
||||||
|
dispatcher = new MBassador(this);
|
||||||
|
dispatcher.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenErrorOccurs_thenErrorHandler() {
|
||||||
|
dispatcher.post("Error").now();
|
||||||
|
assertNull(messageString);
|
||||||
|
assertNotNull(errorCause);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenNoErrorOccurs_thenStringHandler() {
|
||||||
|
dispatcher.post("Errol").now();
|
||||||
|
assertNull(errorCause);
|
||||||
|
assertNotNull(messageString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenRejectDispatched_thenPriorityHandled() {
|
||||||
|
dispatcher.post(new RejectMessage()).now();
|
||||||
|
|
||||||
|
// Items should pop() off in reverse priority order
|
||||||
|
assertTrue(1 == list.pop());
|
||||||
|
assertTrue(3 == list.pop());
|
||||||
|
assertTrue(5 == list.pop());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleString(String message) {
|
||||||
|
|
||||||
|
if ("Error".equals(message)) {
|
||||||
|
throw new Error("BOOM");
|
||||||
|
}
|
||||||
|
|
||||||
|
messageString = message;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleError(PublicationError error) {
|
||||||
|
errorCause = error.getCause().getCause();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 5)
|
||||||
|
public void handleRejectMessage5(RejectMessage rejectMessage) {
|
||||||
|
list.push(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 3)
|
||||||
|
public void handleRejectMessage3(RejectMessage rejectMessage) {
|
||||||
|
list.push(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 2, rejectSubtypes = true)
|
||||||
|
public void handleMessage(Message rejectMessage) {
|
||||||
|
list.push(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 0)
|
||||||
|
public void handleRejectMessage0(RejectMessage rejectMessage) {
|
||||||
|
list.push(1);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,103 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.bus.common.DeadMessage;
|
||||||
|
import net.engio.mbassy.bus.common.FilteredMessage;
|
||||||
|
import net.engio.mbassy.listener.Filter;
|
||||||
|
import net.engio.mbassy.listener.Filters;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
public class MBassadorFilterTest {
|
||||||
|
|
||||||
|
private MBassador dispatcher = new MBassador();
|
||||||
|
|
||||||
|
private Message baseMessage;
|
||||||
|
private Message subMessage;
|
||||||
|
private String testString;
|
||||||
|
private FilteredMessage filteredMessage;
|
||||||
|
private RejectMessage rejectMessage;
|
||||||
|
private DeadMessage deadMessage;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareTests() {
|
||||||
|
dispatcher.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenMessageDispatched_thenMessageFiltered() {
|
||||||
|
dispatcher.post(new Message()).now();
|
||||||
|
assertNotNull(baseMessage);
|
||||||
|
assertNull(subMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenRejectDispatched_thenRejectFiltered() {
|
||||||
|
dispatcher.post(new RejectMessage()).now();
|
||||||
|
assertNotNull(subMessage);
|
||||||
|
assertNull(baseMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenShortStringDispatched_thenStringHandled() {
|
||||||
|
dispatcher.post("foobar").now();
|
||||||
|
assertNotNull(testString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenLongStringDispatched_thenStringFiltered() {
|
||||||
|
dispatcher.post("foobar!").now();
|
||||||
|
assertNull(testString);
|
||||||
|
// filtered only populated when messages does not pass any filters
|
||||||
|
assertNotNull(filteredMessage);
|
||||||
|
assertTrue(filteredMessage.getMessage() instanceof String);
|
||||||
|
assertNull(deadMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenWrongRejectDispatched_thenRejectFiltered() {
|
||||||
|
RejectMessage testReject = new RejectMessage();
|
||||||
|
testReject.setCode(-1);
|
||||||
|
dispatcher.post(testReject).now();
|
||||||
|
assertNull(rejectMessage);
|
||||||
|
assertNotNull(subMessage);
|
||||||
|
assertEquals(-1, ((RejectMessage) subMessage).getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
|
||||||
|
public void handleBaseMessage(Message message) {
|
||||||
|
this.baseMessage = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
|
||||||
|
public void handleSubMessage(Message message) {
|
||||||
|
this.subMessage = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(condition = "msg.length() < 7")
|
||||||
|
public void handleStringMessage(String message) {
|
||||||
|
this.testString = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(condition = "msg.getCode() != -1")
|
||||||
|
public void handleRejectMessage(RejectMessage rejectMessage) {
|
||||||
|
this.rejectMessage = rejectMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleFilterMessage(FilteredMessage message) {
|
||||||
|
this.filteredMessage = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleDeadMessage(DeadMessage deadMessage) {
|
||||||
|
this.deadMessage = deadMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
package com.baeldung.mbassador;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class MBassadorHierarchyTest {
|
||||||
|
|
||||||
|
private MBassador dispatcher = new MBassador();
|
||||||
|
|
||||||
|
private Message message;
|
||||||
|
private AckMessage ackMessage;
|
||||||
|
private RejectMessage rejectMessage;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareTests() {
|
||||||
|
dispatcher.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenMessageDispatched_thenMessageHandled() {
|
||||||
|
dispatcher.post(new Message()).now();
|
||||||
|
assertNotNull(message);
|
||||||
|
assertNull(ackMessage);
|
||||||
|
assertNull(rejectMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenRejectDispatched_thenMessageAndRejectHandled() {
|
||||||
|
dispatcher.post(new RejectMessage()).now();
|
||||||
|
assertNotNull(message);
|
||||||
|
assertNotNull(rejectMessage);
|
||||||
|
assertNull(ackMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenAckDispatched_thenMessageAndAckHandled() {
|
||||||
|
dispatcher.post(new AckMessage()).now();
|
||||||
|
assertNotNull(message);
|
||||||
|
assertNotNull(ackMessage);
|
||||||
|
assertNull(rejectMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleMessage(Message message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleRejectMessage(RejectMessage message) {
|
||||||
|
rejectMessage = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler
|
||||||
|
public void handleAckMessage(AckMessage message) {
|
||||||
|
ackMessage = message;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user