mirror of https://github.com/apache/activemq.git
Moving some test code into a shared support class
This commit is contained in:
parent
82295fd2c6
commit
b1c4b1871f
|
@ -20,12 +20,10 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
|
@ -34,21 +32,16 @@ import org.apache.activemq.advisory.AdvisoryBroker;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.CommandTypes;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.apache.activemq.util.SubscriptionKey;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.activemq.util.Wait.Condition;
|
||||
import org.junit.After;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
@ -58,21 +51,11 @@ import org.slf4j.LoggerFactory;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class DurableSyncNetworkBridgeTest {
|
||||
public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
|
||||
|
||||
protected Connection localConnection;
|
||||
protected Connection remoteConnection;
|
||||
protected BrokerService localBroker;
|
||||
protected BrokerService remoteBroker;
|
||||
protected Session localSession;
|
||||
protected Session remoteSession;
|
||||
protected ActiveMQTopic included;
|
||||
protected ActiveMQTopic excluded;
|
||||
protected String includeTopicName = "include.test.bar";
|
||||
protected String includeTopicName2 = "include.test.bar2";
|
||||
protected String excludeTopicName = "exclude.test.bar";
|
||||
protected String clientId = "clientId";
|
||||
protected String testTopicName2 = "include.test.bar2";
|
||||
private boolean dynamicOnly = false;
|
||||
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
|
||||
public static enum FLOW {FORWARD, REVERSE};
|
||||
|
@ -82,9 +65,6 @@ public class DurableSyncNetworkBridgeTest {
|
|||
private Session session1;
|
||||
private final FLOW flow;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
@ -110,35 +90,10 @@ public class DurableSyncNetworkBridgeTest {
|
|||
doTearDown();
|
||||
}
|
||||
|
||||
protected void doTearDown() throws Exception {
|
||||
stopLocalBroker();
|
||||
stopRemoteBroker();
|
||||
}
|
||||
|
||||
protected void stopLocalBroker() throws Exception {
|
||||
if (localConnection != null) {
|
||||
localConnection.close();
|
||||
}
|
||||
if (localBroker != null) {
|
||||
localBroker.stop();
|
||||
localBroker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
protected void stopRemoteBroker() throws Exception {
|
||||
if (remoteConnection != null) {
|
||||
remoteConnection.close();
|
||||
}
|
||||
if (remoteBroker != null) {
|
||||
remoteBroker.stop();
|
||||
remoteBroker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveSubscriptionPropagate() throws Exception {
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
|
||||
sub1.close();
|
||||
|
||||
|
@ -154,8 +109,7 @@ public class DurableSyncNetworkBridgeTest {
|
|||
|
||||
@Test
|
||||
public void testRemoveSubscriptionPropegateAfterRestart() throws Exception {
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
|
||||
sub1.close();
|
||||
|
||||
|
@ -176,8 +130,7 @@ public class DurableSyncNetworkBridgeTest {
|
|||
|
||||
@Test
|
||||
public void testRemoveSubscriptionWithBridgeOffline() throws Exception {
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
|
||||
sub1.close();
|
||||
|
||||
|
@ -201,9 +154,8 @@ public class DurableSyncNetworkBridgeTest {
|
|||
|
||||
@Test
|
||||
public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Exception {
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final ActiveMQTopic topic2 = new ActiveMQTopic(includeTopicName2);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
final ActiveMQTopic topic2 = new ActiveMQTopic(testTopicName2);
|
||||
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
|
||||
sub1.close();
|
||||
|
||||
|
@ -236,10 +188,9 @@ public class DurableSyncNetworkBridgeTest {
|
|||
|
||||
@Test
|
||||
public void testAddSubscriptionsWithBridgeOffline() throws Exception {
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final ActiveMQTopic topic2 = new ActiveMQTopic(includeTopicName2);
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
final ActiveMQTopic topic2 = new ActiveMQTopic(testTopicName2);
|
||||
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
|
||||
final String subName = "sub1";
|
||||
|
||||
assertSubscriptionsCount(broker1, topic, 0);
|
||||
assertNCSubscriptionsCount(broker2, topic, 0);
|
||||
|
@ -269,8 +220,7 @@ public class DurableSyncNetworkBridgeTest {
|
|||
@Test
|
||||
public void testAddSubscriptionsWithBridgeOfflineOpenWire11() throws Exception {
|
||||
this.remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC - 1;
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
|
||||
assertSubscriptionsCount(broker1, topic, 0);
|
||||
assertNCSubscriptionsCount(broker2, topic, 0);
|
||||
|
@ -293,8 +243,7 @@ public class DurableSyncNetworkBridgeTest {
|
|||
public void testAddOfflineSubscriptionWithBridgeOfflineDynamicTrue() throws Exception {
|
||||
//set dynamicOnly to true
|
||||
this.dynamicOnly = true;
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
|
||||
assertSubscriptionsCount(broker1, topic, 0);
|
||||
assertNCSubscriptionsCount(broker2, topic, 0);
|
||||
|
@ -314,8 +263,7 @@ public class DurableSyncNetworkBridgeTest {
|
|||
public void testAddOnlineSubscriptionWithBridgeOfflineDynamicTrue() throws Exception {
|
||||
//set dynamicOnly to true
|
||||
this.dynamicOnly = true;
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final String subName = "sub1";
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
|
||||
assertSubscriptionsCount(broker1, topic, 0);
|
||||
assertNCSubscriptionsCount(broker2, topic, 0);
|
||||
|
@ -337,9 +285,8 @@ public class DurableSyncNetworkBridgeTest {
|
|||
|
||||
@Test
|
||||
public void testAddAndRemoveSubscriptionsWithBridgeOffline() throws Exception {
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
|
||||
final String subName = "sub1";
|
||||
|
||||
session1.createDurableSubscriber(topic, subName).close();
|
||||
assertSubscriptionsCount(broker1, topic, 1);
|
||||
|
@ -363,9 +310,8 @@ public class DurableSyncNetworkBridgeTest {
|
|||
public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception {
|
||||
Assume.assumeTrue(flow == FLOW.FORWARD);
|
||||
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(includeTopicName);
|
||||
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
|
||||
final String subName = "sub1";
|
||||
|
||||
assertSubscriptionsCount(broker1, topic, 0);
|
||||
assertNCSubscriptionsCount(broker2, topic, 0);
|
||||
|
@ -424,46 +370,6 @@ public class DurableSyncNetworkBridgeTest {
|
|||
}, 10000, 500));
|
||||
}
|
||||
|
||||
protected void assertNCSubscriptionsCount(final BrokerService brokerService,
|
||||
final ActiveMQTopic dest, final int count) throws Exception {
|
||||
assertTrue(Wait.waitFor(new Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return count == getNCSubscriptions(brokerService, dest).size();
|
||||
}
|
||||
}, 10000, 500));
|
||||
}
|
||||
|
||||
protected List<DurableTopicSubscription> getSubscriptions(final BrokerService brokerService,
|
||||
final ActiveMQTopic dest) throws Exception {
|
||||
List<DurableTopicSubscription> subs = new ArrayList<>();
|
||||
Topic destination = (Topic) brokerService.getDestination(dest);
|
||||
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
|
||||
if (!key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
|
||||
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
|
||||
if (sub != null) {
|
||||
subs.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
return subs;
|
||||
}
|
||||
|
||||
protected List<DurableTopicSubscription> getNCSubscriptions(final BrokerService brokerService,
|
||||
final ActiveMQTopic dest) throws Exception {
|
||||
List<DurableTopicSubscription> subs = new ArrayList<>();
|
||||
Topic destination = (Topic) brokerService.getDestination(dest);
|
||||
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
|
||||
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
|
||||
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
|
||||
if (sub != null) {
|
||||
subs.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
return subs;
|
||||
}
|
||||
|
||||
protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
|
||||
if (broker.getBrokerName().equals("localBroker")) {
|
||||
restartLocalBroker(startNetworkConnector);
|
||||
|
@ -480,7 +386,7 @@ public class DurableSyncNetworkBridgeTest {
|
|||
|
||||
protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir,
|
||||
File remoteDataDir) throws Exception {
|
||||
included = new ActiveMQTopic(includeTopicName);
|
||||
included = new ActiveMQTopic(testTopicName);
|
||||
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
|
||||
doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir);
|
||||
//Give time for advisories to propagate
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
package org.apache.activemq.network;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -28,9 +31,13 @@ import javax.jms.Session;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.apache.activemq.util.SubscriptionKey;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.activemq.util.Wait.Condition;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
|
@ -53,6 +60,31 @@ public abstract class DynamicNetworkTestSupport {
|
|||
@Rule
|
||||
public TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
|
||||
|
||||
protected void doTearDown() throws Exception {
|
||||
stopLocalBroker();
|
||||
stopRemoteBroker();
|
||||
}
|
||||
|
||||
protected void stopLocalBroker() throws Exception {
|
||||
if (localConnection != null) {
|
||||
localConnection.close();
|
||||
}
|
||||
if (localBroker != null) {
|
||||
localBroker.stop();
|
||||
localBroker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
protected void stopRemoteBroker() throws Exception {
|
||||
if (remoteConnection != null) {
|
||||
remoteConnection.close();
|
||||
}
|
||||
if (remoteBroker != null) {
|
||||
remoteBroker.stop();
|
||||
remoteBroker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context,
|
||||
final BrokerService brokerService) throws Exception {
|
||||
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
|
||||
|
@ -94,4 +126,44 @@ public abstract class DynamicNetworkTestSupport {
|
|||
MessageConsumer createConsumer() throws JMSException;
|
||||
}
|
||||
|
||||
protected void assertNCSubscriptionsCount(final BrokerService brokerService,
|
||||
final ActiveMQTopic dest, final int count) throws Exception {
|
||||
assertTrue(Wait.waitFor(new Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return count == getNCSubscriptions(brokerService, dest).size();
|
||||
}
|
||||
}, 10000, 500));
|
||||
}
|
||||
|
||||
protected List<DurableTopicSubscription> getSubscriptions(final BrokerService brokerService,
|
||||
final ActiveMQTopic dest) throws Exception {
|
||||
List<DurableTopicSubscription> subs = new ArrayList<>();
|
||||
Topic destination = (Topic) brokerService.getDestination(dest);
|
||||
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
|
||||
if (!key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
|
||||
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
|
||||
if (sub != null) {
|
||||
subs.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
return subs;
|
||||
}
|
||||
|
||||
protected List<DurableTopicSubscription> getNCSubscriptions(final BrokerService brokerService,
|
||||
final ActiveMQTopic dest) throws Exception {
|
||||
List<DurableTopicSubscription> subs = new ArrayList<>();
|
||||
Topic destination = (Topic) brokerService.getDestination(dest);
|
||||
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
|
||||
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
|
||||
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
|
||||
if (sub != null) {
|
||||
subs.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
return subs;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1289,22 +1289,6 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
|
|||
doTearDown();
|
||||
}
|
||||
|
||||
protected void doTearDown() throws Exception {
|
||||
if (localConnection != null) {
|
||||
localConnection.close();
|
||||
}
|
||||
if (remoteConnection != null) {
|
||||
remoteConnection.close();
|
||||
}
|
||||
if (localBroker != null) {
|
||||
localBroker.stop();
|
||||
}
|
||||
if (remoteBroker != null) {
|
||||
remoteBroker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void doSetUp(boolean deleteAllMessages,
|
||||
VirtualDestination[] remoteVirtualDests) throws Exception {
|
||||
doSetUp(deleteAllMessages, remoteVirtualDests, true);
|
||||
|
|
Loading…
Reference in New Issue