From 858d7a1a02ec18e253a9caba54e8d3cad9f04eed Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 24 Aug 2016 10:31:04 -0400 Subject: [PATCH] ARTEMIS-697 Making JChannelManager a singleton, and fixing tests --- .../core/ChannelBroadcastEndpointFactory.java | 28 +-- .../JGroupsFileBroadcastEndpointFactory.java | 2 +- ...upsPropertiesBroadcastEndpointFactory.java | 2 +- .../api/core/jgroups/JChannelManager.java | 38 +++- .../api/core/jgroups/JChannelWrapper.java | 20 +- .../broadcast/JGroupsBroadcastTest.java | 14 ++ .../integration/discovery/DiscoveryTest.java | 174 ++++++++++-------- 7 files changed, 160 insertions(+), 118 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java index af0df2e9b8..66b61d3153 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.api.core; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.jboss.logging.Logger; import org.jgroups.JChannel; @@ -38,32 +35,9 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory private final JChannelManager manager; - private static final Map managers = new ConcurrentHashMap<>(); + private static final JChannelManager singletonManager = JChannelManager.getInstance(); - private static final JChannelManager singletonManager = new JChannelManager(); -// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly -// -// private static JChannelManager recoverManager(JChannel channel) { -// JChannelManager manager = managers.get(channel); -// if (manager == null) { -// if (logger.isTraceEnabled()) { -// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace")); -// } -// manager = new JChannelManager(); -// managers.put(channel, manager); -// } -// else { -// if (logger.isTraceEnabled()) { -// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace")); -// } -// -// } -// -// return manager; -// } -// public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) { - // TODO: use recoverManager(channel) this(singletonManager, channel, channelName); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java index 9f783e7813..f560c71a58 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java @@ -25,7 +25,7 @@ public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFac private String channelName; - private final JChannelManager manager = new JChannelManager(); + private final JChannelManager manager = JChannelManager.getInstance(); @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java index 8ed03ab7d0..05867d72d0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java @@ -24,7 +24,7 @@ public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpo private String channelName; - private final JChannelManager manager = new JChannelManager(); + private final JChannelManager manager = JChannelManager.getInstance(); @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java index 1db4327cdf..682bf7645e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java @@ -32,15 +32,43 @@ import org.jboss.logging.Logger; */ public class JChannelManager { - private static final Logger logger = Logger.getLogger(JChannelManager.class); + private static final JChannelManager theInstance = new JChannelManager(); - private static Map channels; + public static JChannelManager getInstance() { + return theInstance; + } + + private JChannelManager() { + } + + public synchronized JChannelManager clear() { + for (JChannelWrapper wrapper : channels.values()) { + wrapper.closeChannel(); + } + channels.clear(); + setLoopbackMessages(false); + return this; + } + + // if true, messages will be loopbacked + // this is useful for testcases using a single channel. + private boolean loopbackMessages = false; + + private final Logger logger = Logger.getLogger(JChannelManager.class); + + private static final Map channels = new HashMap<>(); + + public boolean isLoopbackMessages() { + return loopbackMessages; + } + + public JChannelManager setLoopbackMessages(boolean loopbackMessages) { + this.loopbackMessages = loopbackMessages; + return this; + } public synchronized JChannelWrapper getJChannel(String channelName, JGroupsBroadcastEndpoint endpoint) throws Exception { - if (channels == null) { - channels = new HashMap<>(); - } JChannelWrapper wrapper = channels.get(channelName); if (wrapper == null) { wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel()); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java index eb61ffb384..e83a33dd8e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java @@ -86,15 +86,21 @@ public class JChannelWrapper { if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace")); if (refCount == 0) { if (closeWrappedChannel) { - connected = false; - channel.setReceiver(null); - logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace")); - channel.close(); - manager.removeChannel(channelName); + closeChannel(); } + manager.removeChannel(channelName); } } + public synchronized void closeChannel() { + connected = false; + channel.setReceiver(null); + if (logger.isTraceEnabled()) { + logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace")); + } + channel.close(); + } + public void removeReceiver(JGroupsReceiver receiver) { if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace")); synchronized (receivers) { @@ -128,7 +134,9 @@ public class JChannelWrapper { public void send(org.jgroups.Message msg) throws Exception { if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg); - msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); + if (!manager.isLoopbackMessages()) { + msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); + } channel.send(msg); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java index 53a678347b..5bf36e9ae6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java @@ -19,15 +19,29 @@ package org.apache.activemq.artemis.tests.integration.broadcast; import org.apache.activemq.artemis.api.core.BroadcastEndpoint; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.jgroups.JChannel; import org.jgroups.conf.PlainConfigurator; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; public class JGroupsBroadcastTest { + @After + public void cleanupJChannel() { + JChannelManager.getInstance().clear(); + } + + @Before + public void prepareJChannel() { + JChannelManager.getInstance().setLoopbackMessages(true); + } + + @Rule public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java index 0c667a8e9d..a1faedca0c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; @@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -70,9 +72,15 @@ public class DiscoveryTest extends DiscoveryBaseTest { BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null; DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null; + @Before + public void prepareLoopback() { + JChannelManager.getInstance().setLoopbackMessages(true); + } + @Override @After public void tearDown() throws Exception { + JChannelManager.getInstance().clear().setLoopbackMessages(false); /** This file path is defined at {@link #TEST_JGROUPS_CONF_FILE} */ deleteDirectory(new File("./target/tmp/amqtest.ping.dir")); for (ActiveMQComponent component : new ActiveMQComponent[]{bg, bg1, bg2, bg3, dg, dg1, dg2, dg3}) { @@ -140,47 +148,52 @@ public class DiscoveryTest extends DiscoveryBaseTest { BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint(); broadcaster.openBroadcaster(); - int num = 100; - BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; - for (int i = 0; i < num; i++) { - receivers[i] = factory.createBroadcastEndpoint(); - receivers[i].openClient(); + try { + + int num = 100; + BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; + for (int i = 0; i < num; i++) { + receivers[i] = factory.createBroadcastEndpoint(); + receivers[i].openClient(); + } + + final byte[] data = new byte[]{1, 2, 3, 4, 5}; + broadcaster.broadcast(data); + + for (int i = 0; i < num; i++) { + byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); + assertNotNull(received); + assertEquals(5, received.length); + assertEquals(1, received[0]); + assertEquals(2, received[1]); + assertEquals(3, received[2]); + assertEquals(4, received[3]); + assertEquals(5, received[4]); + } + + for (int i = 0; i < num - 1; i++) { + receivers[i].close(false); + } + + byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); + assertNull(data1); + + broadcaster.broadcast(data); + data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); + + assertNotNull(data1); + assertEquals(5, data1.length); + assertEquals(1, data1[0]); + assertEquals(2, data1[1]); + assertEquals(3, data1[2]); + assertEquals(4, data1[3]); + assertEquals(5, data1[4]); + + receivers[num - 1].close(false); } - - final byte[] data = new byte[]{1, 2, 3, 4, 5}; - broadcaster.broadcast(data); - - for (int i = 0; i < num; i++) { - byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); - assertNotNull(received); - assertEquals(5, received.length); - assertEquals(1, received[0]); - assertEquals(2, received[1]); - assertEquals(3, received[2]); - assertEquals(4, received[3]); - assertEquals(5, received[4]); + finally { + broadcaster.close(true); } - - for (int i = 0; i < num - 1; i++) { - receivers[i].close(false); - } - - byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); - assertNull(data1); - - broadcaster.broadcast(data); - data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); - - assertNotNull(data1); - assertEquals(5, data1.length); - assertEquals(1, data1[0]); - assertEquals(2, data1[1]); - assertEquals(3, data1[2]); - assertEquals(4, data1[3]); - assertEquals(5, data1[4]); - - receivers[num - 1].close(false); - broadcaster.close(true); } /** @@ -195,7 +208,6 @@ public class DiscoveryTest extends DiscoveryBaseTest { BroadcastEndpointFactory factory = new JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE); BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint(); broadcaster.openBroadcaster(); - int num = 50; BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; for (int i = 0; i < num; i++) { @@ -203,47 +215,53 @@ public class DiscoveryTest extends DiscoveryBaseTest { receivers[i].openClient(); } - final byte[] data = new byte[]{1, 2, 3, 4, 5}; - broadcaster.broadcast(data); - for (int i = 0; i < num; i++) { - byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); - assertNotNull(received); - assertEquals(5, received.length); - assertEquals(1, received[0]); - assertEquals(2, received[1]); - assertEquals(3, received[2]); - assertEquals(4, received[3]); - assertEquals(5, received[4]); + try { + + final byte[] data = new byte[]{1, 2, 3, 4, 5}; + broadcaster.broadcast(data); + + for (int i = 0; i < num; i++) { + byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); + assertNotNull(received); + assertEquals(5, received.length); + assertEquals(1, received[0]); + assertEquals(2, received[1]); + assertEquals(3, received[2]); + assertEquals(4, received[3]); + assertEquals(5, received[4]); + } + + for (int i = 0; i < num; i++) { + receivers[i].close(false); + } + + //new ones + for (int i = 0; i < num; i++) { + receivers[i] = factory.createBroadcastEndpoint(); + receivers[i].openClient(); + } + + broadcaster.broadcast(data); + + for (int i = 0; i < num; i++) { + byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); + assertNotNull(received); + assertEquals(5, received.length); + assertEquals(1, received[0]); + assertEquals(2, received[1]); + assertEquals(3, received[2]); + assertEquals(4, received[3]); + assertEquals(5, received[4]); + } + } - - for (int i = 0; i < num; i++) { - receivers[i].close(false); + finally { + for (int i = 0; i < num; i++) { + receivers[i].close(false); + } + broadcaster.close(true); } - - //new ones - for (int i = 0; i < num; i++) { - receivers[i] = factory.createBroadcastEndpoint(); - receivers[i].openClient(); - } - - broadcaster.broadcast(data); - - for (int i = 0; i < num; i++) { - byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); - assertNotNull(received); - assertEquals(5, received.length); - assertEquals(1, received[0]); - assertEquals(2, received[1]); - assertEquals(3, received[2]); - assertEquals(4, received[3]); - assertEquals(5, received[4]); - } - - for (int i = 0; i < num; i++) { - receivers[i].close(false); - } - broadcaster.close(true); } /**