NO-JIRA test improvements
This commit is contained in:
parent
b7047faea5
commit
a7e5e6d074
|
@ -26,7 +26,16 @@ import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
public class CoreMessagePersister implements Persister<Message> {
|
public class CoreMessagePersister implements Persister<Message> {
|
||||||
public static final byte ID = 1;
|
public static final byte ID = 1;
|
||||||
|
|
||||||
public static CoreMessagePersister theInstance;
|
private static CoreMessagePersister theInstance;
|
||||||
|
|
||||||
|
/** This is a hook for testing */
|
||||||
|
public static void registerPersister(CoreMessagePersister newPersister) {
|
||||||
|
theInstance = newPersister;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void resetPersister() {
|
||||||
|
theInstance = null;
|
||||||
|
}
|
||||||
|
|
||||||
public static CoreMessagePersister getInstance() {
|
public static CoreMessagePersister getInstance() {
|
||||||
if (theInstance == null) {
|
if (theInstance == null) {
|
||||||
|
|
|
@ -128,7 +128,7 @@ public class SendAckFailTest extends SpawnedTestBase {
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||||
ServerLocator locator = factory.getServerLocator();
|
ServerLocator locator = factory.getServerLocator();
|
||||||
|
|
||||||
locator.setConfirmationWindowSize(0).setInitialConnectAttempts(100).setRetryInterval(100).setBlockOnDurableSend(false).setReconnectAttempts(0);
|
locator.setConfirmationWindowSize(0).setInitialConnectAttempts(10000).setRetryInterval(10).setBlockOnDurableSend(false).setReconnectAttempts(0);
|
||||||
|
|
||||||
ClientSessionFactory sf = locator.createSessionFactory();
|
ClientSessionFactory sf = locator.createSessionFactory();
|
||||||
|
|
||||||
|
@ -213,14 +213,31 @@ public class SendAckFailTest extends SpawnedTestBase {
|
||||||
|
|
||||||
public ActiveMQServer startServer(boolean fail) {
|
public ActiveMQServer startServer(boolean fail) {
|
||||||
try {
|
try {
|
||||||
//ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true, true);
|
|
||||||
|
|
||||||
AtomicInteger count = new AtomicInteger(0);
|
AtomicInteger count = new AtomicInteger(0);
|
||||||
|
|
||||||
ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
|
ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
|
||||||
|
|
||||||
Configuration configuration = createDefaultConfig(true);
|
Configuration configuration = createDefaultConfig(true);
|
||||||
|
|
||||||
|
|
||||||
|
if (fail) {
|
||||||
|
new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
|
||||||
|
// this is a protection, if the process is left forgoten for any amount of time,
|
||||||
|
// this will kill it
|
||||||
|
// This is to avoid rogue processes on the CI
|
||||||
|
Thread.sleep(10000);
|
||||||
|
System.err.println("Halting process, protecting the CI from rogue processes");
|
||||||
|
Runtime.getRuntime().halt(-1);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.start();
|
||||||
|
}
|
||||||
|
|
||||||
ActiveMQServer server = new ActiveMQServerImpl(configuration, ManagementFactory.getPlatformMBeanServer(), securityManager) {
|
ActiveMQServer server = new ActiveMQServerImpl(configuration, ManagementFactory.getPlatformMBeanServer(), securityManager) {
|
||||||
@Override
|
@Override
|
||||||
public StorageManager createStorageManager() {
|
public StorageManager createStorageManager() {
|
||||||
|
@ -249,6 +266,7 @@ public class SendAckFailTest extends SpawnedTestBase {
|
||||||
|
|
||||||
|
|
||||||
System.out.println("Location::" + server.getConfiguration().getJournalLocation().getAbsolutePath());
|
System.out.println("Location::" + server.getConfiguration().getJournalLocation().getAbsolutePath());
|
||||||
|
addServer(server);
|
||||||
server.start();
|
server.start();
|
||||||
return server;
|
return server;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.junit.Wait;
|
import org.apache.activemq.artemis.junit.Wait;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -58,12 +59,12 @@ import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class SharedNothingReplicationTest {
|
public class SharedNothingReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
|
private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
|
||||||
|
|
||||||
|
@ -71,30 +72,32 @@ public class SharedNothingReplicationTest {
|
||||||
public TemporaryFolder brokersFolder = new TemporaryFolder();
|
public TemporaryFolder brokersFolder = new TemporaryFolder();
|
||||||
|
|
||||||
private SlowMessagePersister slowMessagePersister;
|
private SlowMessagePersister slowMessagePersister;
|
||||||
|
ExecutorService sendMessageExecutor;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
slowMessagePersister = new SlowMessagePersister();
|
super.setUp();
|
||||||
CoreMessagePersister.theInstance = slowMessagePersister;
|
sendMessageExecutor = Executors.newSingleThreadExecutor();
|
||||||
|
CoreMessagePersister.registerPersister(SlowMessagePersister._getInstance());
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@Override
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (slowMessagePersister != null) {
|
CoreMessagePersister.resetPersister();
|
||||||
CoreMessagePersister.theInstance = slowMessagePersister.persister;
|
sendMessageExecutor.shutdownNow();
|
||||||
}
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplicateFromSlowLive() throws Exception {
|
public void testReplicateFromSlowLive() throws Exception {
|
||||||
// start live
|
// start live
|
||||||
Configuration liveConfiguration = createLiveConfiguration();
|
Configuration liveConfiguration = createLiveConfiguration();
|
||||||
ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration);
|
ActiveMQServer liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration));
|
||||||
liveServer.start();
|
liveServer.start();
|
||||||
|
|
||||||
Wait.waitFor(() -> liveServer.isStarted());
|
Wait.waitFor(liveServer::isStarted);
|
||||||
|
|
||||||
CoreMessagePersister.theInstance = SlowMessagePersister._getInstance();
|
|
||||||
|
|
||||||
final CountDownLatch replicated = new CountDownLatch(1);
|
final CountDownLatch replicated = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@ -120,7 +123,6 @@ public class SharedNothingReplicationTest {
|
||||||
ClientSession sess = csf.createSession();
|
ClientSession sess = csf.createSession();
|
||||||
sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
|
sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
|
||||||
sess.close();
|
sess.close();
|
||||||
Executor sendMessageExecutor = Executors.newCachedThreadPool();
|
|
||||||
|
|
||||||
// let's write some messages
|
// let's write some messages
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -150,7 +152,7 @@ public class SharedNothingReplicationTest {
|
||||||
|
|
||||||
// start backup
|
// start backup
|
||||||
Configuration backupConfiguration = createBackupConfiguration();
|
Configuration backupConfiguration = createBackupConfiguration();
|
||||||
ActiveMQServer backupServer = ActiveMQServers.newActiveMQServer(backupConfiguration);
|
ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration));
|
||||||
backupServer.start();
|
backupServer.start();
|
||||||
|
|
||||||
Wait.waitFor(() -> backupServer.isStarted());
|
Wait.waitFor(() -> backupServer.isStarted());
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
package org.apache.activemq.artemis.tests.integration.server;
|
package org.apache.activemq.artemis.tests.integration.server;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.ServerSocket;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ public class ActivationFailureListenerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void simpleTest() throws Exception {
|
public void simpleTest() throws Exception {
|
||||||
Socket s = new Socket();
|
ServerSocket s = new ServerSocket();
|
||||||
try {
|
try {
|
||||||
s.bind(new InetSocketAddress("127.0.0.1", 61616));
|
s.bind(new InetSocketAddress("127.0.0.1", 61616));
|
||||||
server = createServer(false, createDefaultNettyConfig());
|
server = createServer(false, createDefaultNettyConfig());
|
||||||
|
|
|
@ -17,6 +17,10 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
|
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
@ -30,10 +34,6 @@ import org.apache.activemq.transport.netty.NettyTransportFactory;
|
||||||
import org.apache.activemq.transport.netty.NettyTransportListener;
|
import org.apache.activemq.transport.netty.NettyTransportListener;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public class NettyHandshakeTimeoutTest extends ActiveMQTestBase {
|
public class NettyHandshakeTimeoutTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
protected ActiveMQServer server;
|
protected ActiveMQServer server;
|
||||||
|
@ -43,8 +43,6 @@ public class NettyHandshakeTimeoutTest extends ActiveMQTestBase {
|
||||||
public void testHandshakeTimeout() throws Exception {
|
public void testHandshakeTimeout() throws Exception {
|
||||||
int handshakeTimeout = 3;
|
int handshakeTimeout = 3;
|
||||||
|
|
||||||
setUp();
|
|
||||||
ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT);
|
|
||||||
HashMap<String, Object> params = new HashMap<>();
|
HashMap<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.HANDSHAKE_TIMEOUT, handshakeTimeout);
|
params.put(TransportConstants.HANDSHAKE_TIMEOUT, handshakeTimeout);
|
||||||
|
|
||||||
|
@ -70,10 +68,12 @@ public class NettyHandshakeTimeoutTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
transport.connect();
|
transport.connect();
|
||||||
assertTrue("Connection should be closed now", Wait.waitFor(() -> !transport.isConnected(), TimeUnit.SECONDS.toMillis(handshakeTimeout + 1)));
|
assertTrue("Connection should be closed now", Wait.waitFor(() -> !transport.isConnected(), TimeUnit.SECONDS.toMillis(handshakeTimeout + 10)));
|
||||||
} finally {
|
} finally {
|
||||||
transport.close();
|
transport.close();
|
||||||
tearDown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue