diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java index 115b29b625..cbd565d3a8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java @@ -26,7 +26,16 @@ import org.apache.activemq.artemis.utils.DataConstants; public class CoreMessagePersister implements Persister { public static final byte ID = 1; - public static CoreMessagePersister theInstance; + private static CoreMessagePersister theInstance; + + /** This is a hook for testing */ + public static void registerPersister(CoreMessagePersister newPersister) { + theInstance = newPersister; + } + + public static void resetPersister() { + theInstance = null; + } public static CoreMessagePersister getInstance() { if (theInstance == null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 9364aa0b57..ab1932fe3a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -128,7 +128,7 @@ public class SendAckFailTest extends SpawnedTestBase { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); ServerLocator locator = factory.getServerLocator(); - locator.setConfirmationWindowSize(0).setInitialConnectAttempts(100).setRetryInterval(100).setBlockOnDurableSend(false).setReconnectAttempts(0); + locator.setConfirmationWindowSize(0).setInitialConnectAttempts(10000).setRetryInterval(10).setBlockOnDurableSend(false).setReconnectAttempts(0); ClientSessionFactory sf = locator.createSessionFactory(); @@ -213,14 +213,31 @@ public class SendAckFailTest extends SpawnedTestBase { public ActiveMQServer startServer(boolean fail) { try { - //ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true, true); - AtomicInteger count = new AtomicInteger(0); ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration()); Configuration configuration = createDefaultConfig(true); + + if (fail) { + new Thread() { + @Override + public void run() { + try { + + // this is a protection, if the process is left forgoten for any amount of time, + // this will kill it + // This is to avoid rogue processes on the CI + Thread.sleep(10000); + System.err.println("Halting process, protecting the CI from rogue processes"); + Runtime.getRuntime().halt(-1); + } catch (Throwable e) { + } + } + }.start(); + } + ActiveMQServer server = new ActiveMQServerImpl(configuration, ManagementFactory.getPlatformMBeanServer(), securityManager) { @Override public StorageManager createStorageManager() { @@ -249,6 +266,7 @@ public class SendAckFailTest extends SpawnedTestBase { System.out.println("Location::" + server.getConfiguration().getJournalLocation().getAbsolutePath()); + addServer(server); server.start(); return server; } catch (Exception e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java index 3997c1dad3..e6e997f3f8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -58,12 +59,12 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -public class SharedNothingReplicationTest { +public class SharedNothingReplicationTest extends ActiveMQTestBase { private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class); @@ -71,30 +72,32 @@ public class SharedNothingReplicationTest { public TemporaryFolder brokersFolder = new TemporaryFolder(); private SlowMessagePersister slowMessagePersister; + ExecutorService sendMessageExecutor; @Before + @Override public void setUp() throws Exception { - slowMessagePersister = new SlowMessagePersister(); - CoreMessagePersister.theInstance = slowMessagePersister; + super.setUp(); + sendMessageExecutor = Executors.newSingleThreadExecutor(); + CoreMessagePersister.registerPersister(SlowMessagePersister._getInstance()); } @After + @Override public void tearDown() throws Exception { - if (slowMessagePersister != null) { - CoreMessagePersister.theInstance = slowMessagePersister.persister; - } + CoreMessagePersister.resetPersister(); + sendMessageExecutor.shutdownNow(); + super.tearDown(); } @Test public void testReplicateFromSlowLive() throws Exception { // start live Configuration liveConfiguration = createLiveConfiguration(); - ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration); + ActiveMQServer liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration)); liveServer.start(); - Wait.waitFor(() -> liveServer.isStarted()); - - CoreMessagePersister.theInstance = SlowMessagePersister._getInstance(); + Wait.waitFor(liveServer::isStarted); final CountDownLatch replicated = new CountDownLatch(1); @@ -120,7 +123,6 @@ public class SharedNothingReplicationTest { ClientSession sess = csf.createSession(); sess.createQueue("slow", RoutingType.ANYCAST, "slow", true); sess.close(); - Executor sendMessageExecutor = Executors.newCachedThreadPool(); // let's write some messages int i = 0; @@ -150,7 +152,7 @@ public class SharedNothingReplicationTest { // start backup Configuration backupConfiguration = createBackupConfiguration(); - ActiveMQServer backupServer = ActiveMQServers.newActiveMQServer(backupConfiguration); + ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); backupServer.start(); Wait.waitFor(() -> backupServer.isStarted()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ActivationFailureListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ActivationFailureListenerTest.java index 597751b145..3d6d307df3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ActivationFailureListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ActivationFailureListenerTest.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.server; import java.net.InetSocketAddress; -import java.net.Socket; +import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -36,7 +36,7 @@ public class ActivationFailureListenerTest extends ActiveMQTestBase { @Test public void simpleTest() throws Exception { - Socket s = new Socket(); + ServerSocket s = new ServerSocket(); try { s.bind(new InetSocketAddress("127.0.0.1", 61616)); server = createServer(false, createDefaultNettyConfig()); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java index ec9c99530b..515a8935e8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java @@ -17,6 +17,10 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; +import java.net.URI; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -30,10 +34,6 @@ import org.apache.activemq.transport.netty.NettyTransportFactory; import org.apache.activemq.transport.netty.NettyTransportListener; import org.junit.Test; -import java.net.URI; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - public class NettyHandshakeTimeoutTest extends ActiveMQTestBase { protected ActiveMQServer server; @@ -43,8 +43,6 @@ public class NettyHandshakeTimeoutTest extends ActiveMQTestBase { public void testHandshakeTimeout() throws Exception { int handshakeTimeout = 3; - setUp(); - ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT); HashMap params = new HashMap<>(); params.put(TransportConstants.HANDSHAKE_TIMEOUT, handshakeTimeout); @@ -70,10 +68,12 @@ public class NettyHandshakeTimeoutTest extends ActiveMQTestBase { try { transport.connect(); - assertTrue("Connection should be closed now", Wait.waitFor(() -> !transport.isConnected(), TimeUnit.SECONDS.toMillis(handshakeTimeout + 1))); + assertTrue("Connection should be closed now", Wait.waitFor(() -> !transport.isConnected(), TimeUnit.SECONDS.toMillis(handshakeTimeout + 10))); } finally { transport.close(); - tearDown(); } + + server.stop(); + } }