ARTEMIS-697 Making JChannelManager a singleton, and fixing tests

This commit is contained in:
Clebert Suconic 2016-08-24 10:31:04 -04:00
parent bf4796c5d3
commit 858d7a1a02
7 changed files with 160 additions and 118 deletions

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.activemq.artemis.api.core; package org.apache.activemq.artemis.api.core;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.jgroups.JChannel; import org.jgroups.JChannel;
@ -38,32 +35,9 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
private final JChannelManager manager; private final JChannelManager manager;
private static final Map<JChannel, JChannelManager> managers = new ConcurrentHashMap<>(); private static final JChannelManager singletonManager = JChannelManager.getInstance();
private static final JChannelManager singletonManager = new JChannelManager();
// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly
//
// private static JChannelManager recoverManager(JChannel channel) {
// JChannelManager manager = managers.get(channel);
// if (manager == null) {
// if (logger.isTraceEnabled()) {
// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
// }
// manager = new JChannelManager();
// managers.put(channel, manager);
// }
// else {
// if (logger.isTraceEnabled()) {
// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace"));
// }
//
// }
//
// return manager;
// }
//
public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) { public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) {
// TODO: use recoverManager(channel)
this(singletonManager, channel, channelName); this(singletonManager, channel, channelName);
} }

View File

@ -25,7 +25,7 @@ public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFac
private String channelName; private String channelName;
private final JChannelManager manager = new JChannelManager(); private final JChannelManager manager = JChannelManager.getInstance();
@Override @Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception { public BroadcastEndpoint createBroadcastEndpoint() throws Exception {

View File

@ -24,7 +24,7 @@ public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpo
private String channelName; private String channelName;
private final JChannelManager manager = new JChannelManager(); private final JChannelManager manager = JChannelManager.getInstance();
@Override @Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception { public BroadcastEndpoint createBroadcastEndpoint() throws Exception {

View File

@ -32,15 +32,43 @@ import org.jboss.logging.Logger;
*/ */
public class JChannelManager { public class JChannelManager {
private static final Logger logger = Logger.getLogger(JChannelManager.class); private static final JChannelManager theInstance = new JChannelManager();
private static Map<String, JChannelWrapper> channels; public static JChannelManager getInstance() {
return theInstance;
}
private JChannelManager() {
}
public synchronized JChannelManager clear() {
for (JChannelWrapper wrapper : channels.values()) {
wrapper.closeChannel();
}
channels.clear();
setLoopbackMessages(false);
return this;
}
// if true, messages will be loopbacked
// this is useful for testcases using a single channel.
private boolean loopbackMessages = false;
private final Logger logger = Logger.getLogger(JChannelManager.class);
private static final Map<String, JChannelWrapper> channels = new HashMap<>();
public boolean isLoopbackMessages() {
return loopbackMessages;
}
public JChannelManager setLoopbackMessages(boolean loopbackMessages) {
this.loopbackMessages = loopbackMessages;
return this;
}
public synchronized JChannelWrapper getJChannel(String channelName, public synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception { JGroupsBroadcastEndpoint endpoint) throws Exception {
if (channels == null) {
channels = new HashMap<>();
}
JChannelWrapper wrapper = channels.get(channelName); JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null) { if (wrapper == null) {
wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel()); wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());

View File

@ -86,15 +86,21 @@ public class JChannelWrapper {
if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace")); if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
if (refCount == 0) { if (refCount == 0) {
if (closeWrappedChannel) { if (closeWrappedChannel) {
connected = false; closeChannel();
channel.setReceiver(null);
logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
channel.close();
manager.removeChannel(channelName);
} }
manager.removeChannel(channelName);
} }
} }
public synchronized void closeChannel() {
connected = false;
channel.setReceiver(null);
if (logger.isTraceEnabled()) {
logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
}
channel.close();
}
public void removeReceiver(JGroupsReceiver receiver) { public void removeReceiver(JGroupsReceiver receiver) {
if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace")); if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace"));
synchronized (receivers) { synchronized (receivers) {
@ -128,7 +134,9 @@ public class JChannelWrapper {
public void send(org.jgroups.Message msg) throws Exception { public void send(org.jgroups.Message msg) throws Exception {
if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg); if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); if (!manager.isLoopbackMessages()) {
msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
}
channel.send(msg); channel.send(msg);
} }

View File

@ -19,15 +19,29 @@ package org.apache.activemq.artemis.tests.integration.broadcast;
import org.apache.activemq.artemis.api.core.BroadcastEndpoint; import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
import org.jgroups.JChannel; import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator; import org.jgroups.conf.PlainConfigurator;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
public class JGroupsBroadcastTest { public class JGroupsBroadcastTest {
@After
public void cleanupJChannel() {
JChannelManager.getInstance().clear();
}
@Before
public void prepareJChannel() {
JChannelManager.getInstance().setLoopbackMessages(true);
}
@Rule @Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
@ -70,9 +72,15 @@ public class DiscoveryTest extends DiscoveryBaseTest {
BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null; BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null;
DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null; DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null;
@Before
public void prepareLoopback() {
JChannelManager.getInstance().setLoopbackMessages(true);
}
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
JChannelManager.getInstance().clear().setLoopbackMessages(false);
/** This file path is defined at {@link #TEST_JGROUPS_CONF_FILE} */ /** This file path is defined at {@link #TEST_JGROUPS_CONF_FILE} */
deleteDirectory(new File("./target/tmp/amqtest.ping.dir")); deleteDirectory(new File("./target/tmp/amqtest.ping.dir"));
for (ActiveMQComponent component : new ActiveMQComponent[]{bg, bg1, bg2, bg3, dg, dg1, dg2, dg3}) { for (ActiveMQComponent component : new ActiveMQComponent[]{bg, bg1, bg2, bg3, dg, dg1, dg2, dg3}) {
@ -140,47 +148,52 @@ public class DiscoveryTest extends DiscoveryBaseTest {
BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint(); BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint();
broadcaster.openBroadcaster(); broadcaster.openBroadcaster();
int num = 100; try {
BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
for (int i = 0; i < num; i++) { int num = 100;
receivers[i] = factory.createBroadcastEndpoint(); BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
receivers[i].openClient(); for (int i = 0; i < num; i++) {
receivers[i] = factory.createBroadcastEndpoint();
receivers[i].openClient();
}
final byte[] data = new byte[]{1, 2, 3, 4, 5};
broadcaster.broadcast(data);
for (int i = 0; i < num; i++) {
byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
assertNotNull(received);
assertEquals(5, received.length);
assertEquals(1, received[0]);
assertEquals(2, received[1]);
assertEquals(3, received[2]);
assertEquals(4, received[3]);
assertEquals(5, received[4]);
}
for (int i = 0; i < num - 1; i++) {
receivers[i].close(false);
}
byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
assertNull(data1);
broadcaster.broadcast(data);
data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
assertNotNull(data1);
assertEquals(5, data1.length);
assertEquals(1, data1[0]);
assertEquals(2, data1[1]);
assertEquals(3, data1[2]);
assertEquals(4, data1[3]);
assertEquals(5, data1[4]);
receivers[num - 1].close(false);
} }
finally {
final byte[] data = new byte[]{1, 2, 3, 4, 5}; broadcaster.close(true);
broadcaster.broadcast(data);
for (int i = 0; i < num; i++) {
byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
assertNotNull(received);
assertEquals(5, received.length);
assertEquals(1, received[0]);
assertEquals(2, received[1]);
assertEquals(3, received[2]);
assertEquals(4, received[3]);
assertEquals(5, received[4]);
} }
for (int i = 0; i < num - 1; i++) {
receivers[i].close(false);
}
byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
assertNull(data1);
broadcaster.broadcast(data);
data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
assertNotNull(data1);
assertEquals(5, data1.length);
assertEquals(1, data1[0]);
assertEquals(2, data1[1]);
assertEquals(3, data1[2]);
assertEquals(4, data1[3]);
assertEquals(5, data1[4]);
receivers[num - 1].close(false);
broadcaster.close(true);
} }
/** /**
@ -195,7 +208,6 @@ public class DiscoveryTest extends DiscoveryBaseTest {
BroadcastEndpointFactory factory = new JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE); BroadcastEndpointFactory factory = new JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE);
BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint(); BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint();
broadcaster.openBroadcaster(); broadcaster.openBroadcaster();
int num = 50; int num = 50;
BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
@ -203,47 +215,53 @@ public class DiscoveryTest extends DiscoveryBaseTest {
receivers[i].openClient(); receivers[i].openClient();
} }
final byte[] data = new byte[]{1, 2, 3, 4, 5};
broadcaster.broadcast(data);
for (int i = 0; i < num; i++) { try {
byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
assertNotNull(received); final byte[] data = new byte[]{1, 2, 3, 4, 5};
assertEquals(5, received.length); broadcaster.broadcast(data);
assertEquals(1, received[0]);
assertEquals(2, received[1]); for (int i = 0; i < num; i++) {
assertEquals(3, received[2]); byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
assertEquals(4, received[3]); assertNotNull(received);
assertEquals(5, received[4]); assertEquals(5, received.length);
assertEquals(1, received[0]);
assertEquals(2, received[1]);
assertEquals(3, received[2]);
assertEquals(4, received[3]);
assertEquals(5, received[4]);
}
for (int i = 0; i < num; i++) {
receivers[i].close(false);
}
//new ones
for (int i = 0; i < num; i++) {
receivers[i] = factory.createBroadcastEndpoint();
receivers[i].openClient();
}
broadcaster.broadcast(data);
for (int i = 0; i < num; i++) {
byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
assertNotNull(received);
assertEquals(5, received.length);
assertEquals(1, received[0]);
assertEquals(2, received[1]);
assertEquals(3, received[2]);
assertEquals(4, received[3]);
assertEquals(5, received[4]);
}
} }
finally {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
receivers[i].close(false); receivers[i].close(false);
}
broadcaster.close(true);
} }
//new ones
for (int i = 0; i < num; i++) {
receivers[i] = factory.createBroadcastEndpoint();
receivers[i].openClient();
}
broadcaster.broadcast(data);
for (int i = 0; i < num; i++) {
byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
assertNotNull(received);
assertEquals(5, received.length);
assertEquals(1, received[0]);
assertEquals(2, received[1]);
assertEquals(3, received[2]);
assertEquals(4, received[3]);
assertEquals(5, received[4]);
}
for (int i = 0; i < num; i++) {
receivers[i].close(false);
}
broadcaster.close(true);
} }
/** /**