This closes #3091
This commit is contained in:
commit
dbc7dca619
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface RetryMethod {
|
||||
int retries();
|
||||
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
import org.junit.rules.MethodRule;
|
||||
import org.junit.runners.model.FrameworkMethod;
|
||||
import org.junit.runners.model.Statement;
|
||||
|
||||
/**
|
||||
* Please use this only if you have to.
|
||||
* Try to fix the test first.
|
||||
*/
|
||||
public class RetryRule implements MethodRule {
|
||||
|
||||
public static final String PROPERTY_NAME = "org.apache.activemq.artemis.utils.RetryRule.retry";
|
||||
|
||||
private static Logger logger = Logger.getLogger(RetryRule.class);
|
||||
|
||||
int defaultNumberOfRetries;
|
||||
|
||||
public RetryRule() {
|
||||
this(0);
|
||||
}
|
||||
|
||||
public RetryRule(int defaultNumberOfRetries) {
|
||||
this.defaultNumberOfRetries = defaultNumberOfRetries;
|
||||
}
|
||||
|
||||
private int getNumberOfRetries(final FrameworkMethod method) {
|
||||
|
||||
if (!Boolean.parseBoolean(System.getProperty(PROPERTY_NAME))) {
|
||||
return 0;
|
||||
}
|
||||
RetryMethod retry = method.getAnnotation(RetryMethod.class);
|
||||
if (retry != null) {
|
||||
return retry.retries();
|
||||
} else {
|
||||
return defaultNumberOfRetries;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Statement apply(final Statement base, final FrameworkMethod method, Object target) {
|
||||
return new Statement() {
|
||||
@Override
|
||||
public void evaluate() throws Throwable {
|
||||
Throwable currentException = null;
|
||||
try {
|
||||
base.evaluate();
|
||||
} catch (Throwable t) {
|
||||
|
||||
if (t instanceof AssumptionViolatedException) {
|
||||
throw t;
|
||||
}
|
||||
|
||||
currentException = t;
|
||||
|
||||
int retries = getNumberOfRetries(method);
|
||||
|
||||
for (int retryNr = 0; retryNr < retries; retryNr++) {
|
||||
logger.warn("RETRY " + (retryNr + 1) + " of " + retries + " on " + target.getClass() + "::" + method.getName(), currentException);
|
||||
currentException = null;
|
||||
try {
|
||||
base.evaluate();
|
||||
logger.info("RETRY " + (retryNr + 1) + " of " + retries + " on " + target.getClass() + "::" + method.getName() + " succeeded");
|
||||
break;
|
||||
} catch (Throwable t2) {
|
||||
logger.info("RETRY " + (retryNr + 1) + " of " + retries + " on " + target.getClass() + "::" + method.getName() + " failed ", t2);
|
||||
currentException = t2;
|
||||
}
|
||||
}
|
||||
if (currentException != null) {
|
||||
throw currentException;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -304,7 +304,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
|||
if (ids.size() > 0 && persist) {
|
||||
long tx = storageManager.generateID();
|
||||
for (Pair<ByteArrayHolder, Long> id : ids) {
|
||||
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
|
||||
if (id != null) {
|
||||
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
|
||||
}
|
||||
}
|
||||
storageManager.commit(tx);
|
||||
}
|
||||
|
|
|
@ -506,8 +506,14 @@ public class NettyAcceptor extends AbstractAcceptor {
|
|||
|
||||
@Override
|
||||
public void reload() {
|
||||
serverChannelGroup.disconnect();
|
||||
ChannelGroupFuture future = serverChannelGroup.disconnect();
|
||||
try {
|
||||
future.awaitUninterruptibly();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
|
||||
serverChannelGroup.clear();
|
||||
|
||||
startServerChannels();
|
||||
}
|
||||
|
||||
|
|
11
pom.xml
11
pom.xml
|
@ -74,6 +74,8 @@
|
|||
<!-- base url for site deployment. See distribution management for full url. Override this in settings.xml for staging -->
|
||||
<staging.siteURL>scp://people.apache.org/x1/www/activemq.apache.org</staging.siteURL>
|
||||
|
||||
<retryTests>false</retryTests>
|
||||
|
||||
<activemq-artemis-native-version>1.0.1</activemq-artemis-native-version>
|
||||
<karaf.version>4.0.6</karaf.version>
|
||||
<pax.exam.version>4.9.1</pax.exam.version>
|
||||
|
@ -172,7 +174,7 @@
|
|||
|
||||
-->
|
||||
|
||||
<activemq-surefire-argline>-Dbrokerconfig.maxDiskUsage=100 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager
|
||||
<activemq-surefire-argline>-Dorg.apache.activemq.artemis.utils.RetryRule.retry=${retryTests} -Dbrokerconfig.maxDiskUsage=100 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager
|
||||
-Dlogging.configuration="file:${activemq.basedir}/tests/config/logging.properties"
|
||||
-Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
|
||||
-Djava.net.preferIPv4Stack=true -Dbasedir=${basedir}
|
||||
|
@ -1014,6 +1016,13 @@
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<!-- this will activate the property required to play with tests retry -->
|
||||
<id>tests-retry</id>
|
||||
<properties>
|
||||
<retryTests>true</retryTests>
|
||||
</properties>
|
||||
</profile>
|
||||
<profile>
|
||||
<!-- tests is the profile we use to run the entire testsuite
|
||||
Running this entire build could take up to 2 hours -->
|
||||
|
|
|
@ -283,7 +283,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
|
|||
}
|
||||
|
||||
public void testNonDestructiveLVQTombstone(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||
int tombstoneTimeToLive = 50;
|
||||
int tombstoneTimeToLive = 500;
|
||||
|
||||
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME));
|
||||
LastValueQueue lastValueQueue = (LastValueQueue)queueBinding.getQueue();
|
||||
|
|
|
@ -332,15 +332,15 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
|
|||
|
||||
while (true) {
|
||||
|
||||
ClientMessage msg = consumer.receiveImmediate();
|
||||
if (msg == null) {
|
||||
if (received.incrementAndGet() > NUMBER_OF_MESSAGES) {
|
||||
received.decrementAndGet();
|
||||
break;
|
||||
}
|
||||
ClientMessage msg = consumer.receive(1000);
|
||||
msg.acknowledge();
|
||||
|
||||
session.commit();
|
||||
|
||||
received.incrementAndGet();
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -29,10 +29,16 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
|||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
|
||||
public abstract class BridgeTestBase extends ActiveMQTestBase {
|
||||
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(2);
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
|
|
|
@ -303,52 +303,56 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
private void logTopologyDiagram() {
|
||||
StringBuffer topologyDiagram = new StringBuffer();
|
||||
for (ActiveMQServer activeMQServer : servers) {
|
||||
if (activeMQServer != null) {
|
||||
topologyDiagram.append("\n").append(activeMQServer.getIdentity()).append("\n");
|
||||
if (activeMQServer.isStarted()) {
|
||||
Set<ClusterConnection> ccs = activeMQServer.getClusterManager().getClusterConnections();
|
||||
try {
|
||||
StringBuffer topologyDiagram = new StringBuffer();
|
||||
for (ActiveMQServer activeMQServer : servers) {
|
||||
if (activeMQServer != null) {
|
||||
topologyDiagram.append("\n").append(activeMQServer.getIdentity()).append("\n");
|
||||
if (activeMQServer.isStarted()) {
|
||||
Set<ClusterConnection> ccs = activeMQServer.getClusterManager().getClusterConnections();
|
||||
|
||||
if (ccs.size() >= 1) {
|
||||
ClusterConnectionImpl clusterConnection = (ClusterConnectionImpl) ccs.iterator().next();
|
||||
Collection<TopologyMemberImpl> members = clusterConnection.getTopology().getMembers();
|
||||
for (TopologyMemberImpl member : members) {
|
||||
String nodeId = member.getNodeId();
|
||||
String liveServer = null;
|
||||
String backupServer = null;
|
||||
for (ActiveMQServer server : servers) {
|
||||
if (server != null && server.getNodeID() != null && server.isActive() && server.getNodeID().toString().equals(nodeId)) {
|
||||
if (server.isActive()) {
|
||||
liveServer = server.getIdentity();
|
||||
if (member.getLive() != null) {
|
||||
liveServer += "(notified)";
|
||||
if (ccs.size() >= 1) {
|
||||
ClusterConnectionImpl clusterConnection = (ClusterConnectionImpl) ccs.iterator().next();
|
||||
Collection<TopologyMemberImpl> members = clusterConnection.getTopology().getMembers();
|
||||
for (TopologyMemberImpl member : members) {
|
||||
String nodeId = member.getNodeId();
|
||||
String liveServer = null;
|
||||
String backupServer = null;
|
||||
for (ActiveMQServer server : servers) {
|
||||
if (server != null && server.getNodeID() != null && server.isActive() && server.getNodeID().toString().equals(nodeId)) {
|
||||
if (server.isActive()) {
|
||||
liveServer = server.getIdentity();
|
||||
if (member.getLive() != null) {
|
||||
liveServer += "(notified)";
|
||||
} else {
|
||||
liveServer += "(not notified)";
|
||||
}
|
||||
} else {
|
||||
liveServer += "(not notified)";
|
||||
}
|
||||
} else {
|
||||
backupServer = server.getIdentity();
|
||||
if (member.getBackup() != null) {
|
||||
liveServer += "(notified)";
|
||||
} else {
|
||||
liveServer += "(not notified)";
|
||||
backupServer = server.getIdentity();
|
||||
if (member.getBackup() != null) {
|
||||
liveServer += "(notified)";
|
||||
} else {
|
||||
liveServer += "(not notified)";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
topologyDiagram.append("\t").append("|\n").append("\t->").append(liveServer).append("/").append(backupServer).append("\n");
|
||||
topologyDiagram.append("\t").append("|\n").append("\t->").append(liveServer).append("/").append(backupServer).append("\n");
|
||||
}
|
||||
} else {
|
||||
topologyDiagram.append("-> no cluster connections\n");
|
||||
}
|
||||
} else {
|
||||
topologyDiagram.append("-> no cluster connections\n");
|
||||
topologyDiagram.append("-> stopped\n");
|
||||
}
|
||||
} else {
|
||||
topologyDiagram.append("-> stopped\n");
|
||||
}
|
||||
}
|
||||
topologyDiagram.append("\n");
|
||||
log.info(topologyDiagram.toString());
|
||||
} catch (Throwable e) {
|
||||
log.warn("error printing the topology::" + e.getMessage(), e);
|
||||
}
|
||||
topologyDiagram.append("\n");
|
||||
log.info(topologyDiagram.toString());
|
||||
}
|
||||
|
||||
protected void waitForMessages(final int node, final String address, final int count) throws Exception {
|
||||
|
|
|
@ -50,10 +50,15 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ClusteredGroupingTest extends ClusterTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(2);
|
||||
|
||||
@Test
|
||||
public void testGroupingGroupTimeoutWithUnproposal() throws Exception {
|
||||
setupServer(0, isFileStorage(), isNetty());
|
||||
|
|
|
@ -64,13 +64,18 @@ import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer
|
|||
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
|
||||
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FailoverTest extends FailoverTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(2);
|
||||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
protected static final int NUM_MESSAGES = 100;
|
||||
|
@ -112,7 +117,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
// https://issues.jboss.org/browse/HORNETQ-685
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailover() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(10);
|
||||
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 500L;
|
||||
|
@ -249,10 +254,10 @@ public class FailoverTest extends FailoverTestBase {
|
|||
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0).setRetryInterval(10);
|
||||
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 2000L;
|
||||
((InVMNodeManager) nodeManager).failoverPause = 200L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
@ -344,7 +349,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
// https://issues.jboss.org/browse/HORNETQ-685
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(10);
|
||||
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 2000L;
|
||||
|
@ -413,7 +418,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(50);
|
||||
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 2000L;
|
||||
|
@ -491,7 +496,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
// https://issues.jboss.org/browse/HORNETQ-685
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailoverTransactionRollback() throws Exception {
|
||||
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(10);
|
||||
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 1000L;
|
||||
|
@ -546,7 +551,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testNonTransactedWithZeroConsumerWindowSize() throws Exception {
|
||||
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(10);
|
||||
|
||||
createClientSessionFactory();
|
||||
|
||||
|
@ -661,7 +666,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
Assert.assertEquals(0, sf.numConnections());
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@Test
|
||||
public void testFailLiveTooSoon() throws Exception {
|
||||
ServerLocator locator = getServerLocator();
|
||||
|
||||
|
|
|
@ -0,0 +1,226 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||
import org.hamcrest.core.Is;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class FileLockNodeManagerTest extends FailoverTestBase {
|
||||
|
||||
public enum NodeManagerType {
|
||||
InVM, Jdbc, File
|
||||
}
|
||||
|
||||
protected ClientSession createSession(ClientSessionFactory sf1,
|
||||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks,
|
||||
int ackBatchSize) throws Exception {
|
||||
return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks, ackBatchSize));
|
||||
}
|
||||
|
||||
protected ClientSession createSession(ClientSessionFactory sf1,
|
||||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks) throws Exception {
|
||||
return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
|
||||
}
|
||||
|
||||
protected ClientSession createSession(ClientSessionFactory sf1) throws Exception {
|
||||
return addClientSession(sf1.createSession());
|
||||
}
|
||||
|
||||
protected ClientSession createSession(ClientSessionFactory sf1,
|
||||
boolean xa,
|
||||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks) throws Exception {
|
||||
return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{0} Node Manager, Use Separate Lock Folder = {1}")
|
||||
public static Iterable<? extends Object> nodeManagerTypes() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{NodeManagerType.File, false},
|
||||
{NodeManagerType.File, true}});
|
||||
}
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public NodeManagerType nodeManagerType;
|
||||
@Parameterized.Parameter(1)
|
||||
public boolean useSeparateLockFolder;
|
||||
|
||||
@Override
|
||||
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||
return getNettyAcceptorTransportConfiguration(live);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
|
||||
return getNettyConnectorTransportConfiguration(live);
|
||||
}
|
||||
|
||||
private List<ScheduledExecutorService> scheduledExecutorServices = new ArrayList<>();
|
||||
private List<ExecutorService> executors = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
|
||||
Assume.assumeThat("Replicated backup is supported only by " + NodeManagerType.InVM + " Node Manager", nodeManagerType, Is.is(NodeManagerType.InVM));
|
||||
return super.createReplicatedBackupNodeManager(backupConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createDefaultInVMConfig() throws Exception {
|
||||
final Configuration config = super.createDefaultInVMConfig();
|
||||
if (useSeparateLockFolder) {
|
||||
config.setNodeManagerLockDirectory(getTestDir() + "/nm_lock");
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeManager createNodeManager() throws Exception {
|
||||
|
||||
switch (nodeManagerType) {
|
||||
|
||||
case InVM:
|
||||
return new InVMNodeManager(false);
|
||||
case Jdbc:
|
||||
final ThreadFactory daemonThreadFactory = t -> {
|
||||
final Thread th = new Thread(t);
|
||||
th.setDaemon(true);
|
||||
return th;
|
||||
};
|
||||
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
|
||||
scheduledExecutorServices.add(scheduledExecutorService);
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
|
||||
executors.add(executor);
|
||||
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
|
||||
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
||||
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
|
||||
code.printStackTrace();
|
||||
Assert.fail(message);
|
||||
});
|
||||
case File:
|
||||
final Configuration config = createDefaultInVMConfig();
|
||||
if (useSeparateLockFolder) {
|
||||
config.getNodeManagerLockLocation().mkdirs();
|
||||
}
|
||||
return new FileLockNodeManager(config.getNodeManagerLockLocation(), false);
|
||||
|
||||
default:
|
||||
throw new AssertionError("enum type not supported!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected TestableServer createTestableServer(Configuration config) throws Exception {
|
||||
final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
|
||||
NodeManager nodeManager = this.nodeManager;
|
||||
//create a separate NodeManager for the backup
|
||||
if (isBackup && (nodeManagerType == NodeManagerType.Jdbc || nodeManagerType == NodeManagerType.File)) {
|
||||
nodeManager = createNodeManager();
|
||||
}
|
||||
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void shutDownExecutors() {
|
||||
if (!scheduledExecutorServices.isEmpty()) {
|
||||
ThreadLeakCheckRule.addKownThread("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser");
|
||||
executors.forEach(ExecutorService::shutdown);
|
||||
scheduledExecutorServices.forEach(ExecutorService::shutdown);
|
||||
executors.clear();
|
||||
scheduledExecutorServices.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testSimpleFailover() throws Exception {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
|
||||
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
||||
|
||||
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(150).setRetryInterval(10);
|
||||
|
||||
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
|
||||
|
||||
ClientSession session = createSession(sf, true, true, 0);
|
||||
|
||||
session.createQueue(new QueueConfiguration(ADDRESS));
|
||||
|
||||
ClientProducer producer = session.createProducer(ADDRESS);
|
||||
|
||||
final int numMessages = 10;
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
|
||||
session.start();
|
||||
|
||||
crash(session);
|
||||
|
||||
sendMessages(session, producer, numMessages);
|
||||
receiveMessages(consumer, 0, numMessages, true);
|
||||
|
||||
session.close();
|
||||
|
||||
sf.close();
|
||||
|
||||
Assert.assertEquals(0, sf.numSessions());
|
||||
|
||||
Assert.assertEquals(0, sf.numConnections());
|
||||
}
|
||||
|
||||
}
|
|
@ -40,7 +40,6 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfigur
|
|||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||
|
@ -60,22 +59,18 @@ import org.junit.runners.Parameterized;
|
|||
public class NettyFailoverTest extends FailoverTest {
|
||||
|
||||
public enum NodeManagerType {
|
||||
InVM, Jdbc, File
|
||||
InVM, Jdbc
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{0} Node Manager, Use Separate Lock Folder = {1}")
|
||||
@Parameterized.Parameters(name = "{0} Node Manager")
|
||||
public static Iterable<? extends Object> nodeManagerTypes() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{NodeManagerType.Jdbc, false},
|
||||
{NodeManagerType.InVM, false},
|
||||
{NodeManagerType.File, false},
|
||||
{NodeManagerType.File, true}});
|
||||
{NodeManagerType.Jdbc},
|
||||
{NodeManagerType.InVM}});
|
||||
}
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public NodeManagerType nodeManagerType;
|
||||
@Parameterized.Parameter(1)
|
||||
public boolean useSeparateLockFolder;
|
||||
|
||||
@Override
|
||||
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||
|
@ -99,9 +94,6 @@ public class NettyFailoverTest extends FailoverTest {
|
|||
@Override
|
||||
protected Configuration createDefaultInVMConfig() throws Exception {
|
||||
final Configuration config = super.createDefaultInVMConfig();
|
||||
if (useSeparateLockFolder) {
|
||||
config.setNodeManagerLockDirectory(getTestDir() + "/nm_lock");
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
|
@ -128,13 +120,6 @@ public class NettyFailoverTest extends FailoverTest {
|
|||
code.printStackTrace();
|
||||
Assert.fail(message);
|
||||
});
|
||||
case File:
|
||||
final Configuration config = createDefaultInVMConfig();
|
||||
if (useSeparateLockFolder) {
|
||||
config.getNodeManagerLockLocation().mkdirs();
|
||||
}
|
||||
return new FileLockNodeManager(config.getNodeManagerLockLocation(), false);
|
||||
|
||||
default:
|
||||
throw new AssertionError("enum type not supported!");
|
||||
}
|
||||
|
@ -146,7 +131,7 @@ public class NettyFailoverTest extends FailoverTest {
|
|||
final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
|
||||
NodeManager nodeManager = this.nodeManager;
|
||||
//create a separate NodeManager for the backup
|
||||
if (isBackup && (nodeManagerType == NodeManagerType.Jdbc || nodeManagerType == NodeManagerType.File)) {
|
||||
if (isBackup && (nodeManagerType == NodeManagerType.Jdbc)) {
|
||||
nodeManager = createNodeManager();
|
||||
}
|
||||
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
|
||||
|
|
|
@ -29,10 +29,16 @@ import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
|
||||
import org.apache.activemq.artemis.utils.RetryMethod;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(0);
|
||||
|
||||
@Override
|
||||
protected void setupServers() throws Exception {
|
||||
super.setupServers();
|
||||
|
@ -50,6 +56,7 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest {
|
|||
|
||||
}
|
||||
|
||||
@RetryMethod(retries = 2)
|
||||
@Test
|
||||
public void testQuorumVoting() throws Exception {
|
||||
int[] liveServerIDs = new int[]{0, 1, 2};
|
||||
|
|
|
@ -20,10 +20,16 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
|
||||
import org.apache.activemq.artemis.utils.RetryMethod;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class QuorumResultWaitTest extends StaticClusterWithBackupFailoverTest {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(0);
|
||||
|
||||
public static final int QUORUM_VOTE_WAIT_CONFIGURED_TIME_SEC = 12;
|
||||
@Override
|
||||
protected void setupServers() throws Exception {
|
||||
|
@ -41,6 +47,7 @@ public class QuorumResultWaitTest extends StaticClusterWithBackupFailoverTest {
|
|||
servers[3].getConfiguration().setHAPolicyConfiguration(replicatedPolicyConf);
|
||||
}
|
||||
|
||||
@RetryMethod(retries = 2)
|
||||
@Test
|
||||
public void testQuorumVotingResultWait() throws Exception {
|
||||
setupCluster();
|
||||
|
|
|
@ -32,15 +32,22 @@ import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||
import org.apache.activemq.artemis.utils.RetryMethod;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ReplicatedMultipleServerFailoverExtraBackupsTest extends ReplicatedMultipleServerFailoverTest {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule();
|
||||
|
||||
private void waitForSync(ActiveMQServer server) throws Exception {
|
||||
Wait.waitFor(server::isReplicaSync);
|
||||
}
|
||||
|
||||
@RetryMethod(retries = 1)
|
||||
@Override
|
||||
@Test
|
||||
public void testStartLiveFirst() throws Exception {
|
||||
|
|
|
@ -29,12 +29,18 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.utils.RetryMethod;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(0);
|
||||
|
||||
@Override
|
||||
protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
|
||||
final Configuration configuration,
|
||||
|
@ -59,6 +65,20 @@ public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
|
|||
internalBrowser(2);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RetryMethod(retries = 2)
|
||||
@Test(timeout = 120000)
|
||||
public void testReplicatedFailback() throws Exception {
|
||||
super.testReplicatedFailback();
|
||||
}
|
||||
|
||||
@Override
|
||||
@RetryMethod(retries = 2)
|
||||
@Test(timeout = 120000)
|
||||
public void testFailoverOnInitialConnection() throws Exception {
|
||||
super.testFailoverOnInitialConnection();
|
||||
}
|
||||
|
||||
//
|
||||
// 0 - no tamper
|
||||
// 1 - close files
|
||||
|
|
|
@ -492,14 +492,7 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
|
||||
final ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
Runnable tick = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
|
||||
|
||||
try {
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
|
||||
|
@ -514,8 +507,8 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
|
||||
latch.setCount(1);
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
|
||||
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
|
||||
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
|
||||
|
|
|
@ -32,14 +32,19 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(1);
|
||||
|
||||
public static final String WORD_START = "&*STARTED&*";
|
||||
|
||||
// We will add a random factor on the wait time
|
||||
|
|
|
@ -105,12 +105,15 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl
|
|||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.RetryMethod;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -118,6 +121,9 @@ import org.junit.runners.Parameterized;
|
|||
@RunWith(Parameterized.class)
|
||||
public class PagingTest extends ActiveMQTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryMethod = new RetryRule(0);
|
||||
|
||||
private static final Logger logger = Logger.getLogger(PagingTest.class);
|
||||
|
||||
protected ServerLocator locator;
|
||||
|
@ -1751,6 +1757,7 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@RetryMethod(retries = 1)
|
||||
@Test
|
||||
public void testInabilityToCreateDirectoryDuringPaging() throws Exception {
|
||||
// this test only applies to file-based stores
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
|
@ -47,12 +48,17 @@ import org.apache.activemq.artemis.core.server.ServerSession;
|
|||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ReconnectTest extends ActiveMQTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(2);
|
||||
|
||||
@Test
|
||||
public void testReconnectNetty() throws Exception {
|
||||
internalTestReconnect(true);
|
||||
|
@ -383,14 +389,14 @@ public class ReconnectTest extends ActiveMQTestBase {
|
|||
ActiveMQServer server = createServer(true, true);
|
||||
server.start();
|
||||
|
||||
final AtomicBoolean consumerClosed = new AtomicBoolean(false);
|
||||
// imitate consumer close timeout
|
||||
Interceptor reattachInterceptor = new Interceptor() {
|
||||
boolean consumerClosed;
|
||||
|
||||
@Override
|
||||
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
|
||||
if (!consumerClosed && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) {
|
||||
consumerClosed = true;
|
||||
if (!consumerClosed.get() && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) {
|
||||
consumerClosed.set(true);
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
|
@ -403,7 +409,7 @@ public class ReconnectTest extends ActiveMQTestBase {
|
|||
final long retryInterval = 500;
|
||||
final double retryMultiplier = 1d;
|
||||
final int reconnectAttempts = 10;
|
||||
ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
|
||||
ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
|
||||
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
||||
ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true);
|
||||
|
@ -416,6 +422,8 @@ public class ReconnectTest extends ActiveMQTestBase {
|
|||
ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
|
||||
clientConsumer1.close();
|
||||
|
||||
Wait.assertTrue(consumerClosed::get);
|
||||
|
||||
Wait.assertEquals(1, () -> getConsumerCount(server, session));
|
||||
|
||||
Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
|
||||
|
|
|
@ -25,14 +25,19 @@ import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImp
|
|||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(3);
|
||||
|
||||
public ScaleDownRemoveSFTest() {
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
|||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
|
||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
|
||||
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
||||
|
@ -65,6 +64,7 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
|
|||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -75,7 +75,8 @@ import org.junit.runners.Parameterized;
|
|||
@RunWith(Parameterized.class)
|
||||
public class StompTest extends StompTestBase {
|
||||
|
||||
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
private static final Logger log = Logger.getLogger(StompTest.class);
|
||||
|
||||
protected StompClientConnection conn;
|
||||
|
||||
@Override
|
||||
|
@ -968,7 +969,7 @@ public class StompTest extends StompTestBase {
|
|||
ClientStompFrame frame = conn.receiveFrame(10000);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||
|
||||
log.info("Reconnecting!");
|
||||
log.debug("Reconnecting!");
|
||||
|
||||
if (sendDisconnect) {
|
||||
conn.disconnect();
|
||||
|
@ -1025,7 +1026,7 @@ public class StompTest extends StompTestBase {
|
|||
sendJmsMessage("second message");
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
}
|
||||
|
||||
|
@ -1048,7 +1049,7 @@ public class StompTest extends StompTestBase {
|
|||
sendJmsMessage("second message");
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
}
|
||||
|
@ -1138,7 +1139,7 @@ public class StompTest extends StompTestBase {
|
|||
if (length - baselineQueueCount == 1) {
|
||||
return true;
|
||||
} else {
|
||||
log.info("Queue count: " + (length - baselineQueueCount));
|
||||
log.debug("Queue count: " + (length - baselineQueueCount));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -1156,7 +1157,7 @@ public class StompTest extends StompTestBase {
|
|||
sendJmsMessage(getName(), topic);
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServerControl().getQueueNames().length - baselineQueueCount);
|
||||
|
@ -1194,7 +1195,7 @@ public class StompTest extends StompTestBase {
|
|||
sendJmsMessage(getName(), queue);
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServerControl().getQueueNames().length);
|
||||
|
@ -1237,7 +1238,7 @@ public class StompTest extends StompTestBase {
|
|||
sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
conn.disconnect();
|
||||
|
@ -1408,7 +1409,7 @@ public class StompTest extends StompTestBase {
|
|||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
// send message on another JMS connection => it should be received
|
||||
|
@ -1443,7 +1444,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// ...and nothing else
|
||||
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
conn.disconnect();
|
||||
|
@ -1510,7 +1511,7 @@ public class StompTest extends StompTestBase {
|
|||
sendJmsMessage(getName(), topic);
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(TIME_OUT);
|
||||
log.info("Received frame: " + frame);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
conn.disconnect();
|
||||
|
@ -1872,7 +1873,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
frame = conn.receiveFrame(10000);
|
||||
|
||||
IntegrationTestLogger.LOGGER.info("Received: " + frame);
|
||||
log.debug("Received: " + frame);
|
||||
|
||||
Assert.assertEquals(Boolean.TRUE.toString(), frame.getHeader(ManagementHelper.HDR_OPERATION_SUCCEEDED.toString()));
|
||||
// the address will be returned in the message body in a JSON array
|
||||
|
@ -1900,7 +1901,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
frame = conn.receiveFrame(10000);
|
||||
|
||||
IntegrationTestLogger.LOGGER.info("Received: " + frame);
|
||||
log.debug("Received: " + frame);
|
||||
|
||||
Assert.assertEquals(Boolean.TRUE.toString(), frame.getHeader(ManagementHelper.HDR_OPERATION_SUCCEEDED.toString()));
|
||||
// there is no such messages => 0 returned in a JSON array
|
||||
|
|
|
@ -55,8 +55,10 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompCli
|
|||
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
|
@ -66,6 +68,9 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
|||
@Parameterized.Parameter
|
||||
public String scheme;
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(2);
|
||||
|
||||
protected URI uri;
|
||||
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
|
|
|
@ -40,9 +40,11 @@ import org.apache.activemq.artemis.core.server.ServerSession;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -50,6 +52,9 @@ import org.junit.runners.Parameterized;
|
|||
@RunWith(Parameterized.class)
|
||||
public class SessionFailureXATest extends ActiveMQTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(1);
|
||||
|
||||
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
|
||||
|
|
|
@ -25,6 +25,7 @@ under the License.
|
|||
<core xmlns="urn:activemq:core">
|
||||
<security-enabled>false</security-enabled>
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
|
|
|
@ -25,6 +25,7 @@ under the License.
|
|||
<core xmlns="urn:activemq:core">
|
||||
<security-enabled>false</security-enabled>
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
|
|
|
@ -30,10 +30,12 @@ import java.util.Collection;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -41,6 +43,9 @@ import org.junit.runners.Parameterized;
|
|||
@RunWith(Parameterized.class)
|
||||
public class SoakPagingTest extends SmokeTestBase {
|
||||
|
||||
@Rule
|
||||
public RetryRule retryRule = new RetryRule(1);
|
||||
|
||||
public static final int LAG_CONSUMER_TIME = 1000;
|
||||
public static final int TIME_RUNNING = 4000;
|
||||
public static final int CLIENT_KILLS = 2;
|
||||
|
|
Loading…
Reference in New Issue