mirror of https://github.com/apache/activemq.git
Added a test that shows using virtual topics in a network of brokers to resolve some of the pain that comes up with durable subscriptions across a network. This test demonstrates the use of Virtual Topics and ConditionalNetworkBridgeFilterFactory with replayWhenNoConsumers=true
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1445772 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eef380df62
commit
aa32dbde83
|
@ -0,0 +1,176 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
|
||||
|
||||
import javax.jms.*;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
|
||||
*/
|
||||
public class VirtualTopicNetworkClusterReactivationTest extends JmsMultipleBrokersTestSupport {
|
||||
|
||||
private static final String BROKER_A = "brokerA";
|
||||
private static final String BROKER_B = "brokerB";
|
||||
private static final String BROKER_A_TRANSPORT_URL = "tcp://localhost:61616";
|
||||
private static final String BROKER_B_TRANSPORT_URL = "tcp://localhost:61617";
|
||||
private static final long DEFAULT_SLEEP_MS = 1000;
|
||||
|
||||
private ActiveMQTopic topic = new ActiveMQTopic("VirtualTopic.FOO.TEST");
|
||||
private ActiveMQQueue queue = new ActiveMQQueue("Consumer.FOO.VirtualTopic.FOO.TEST");
|
||||
|
||||
|
||||
/**
|
||||
* This test shows how to use pub/sub to mimic durable subscribers in a network of brokers.
|
||||
*
|
||||
* When using durable subscribers in a broker cluster, you can encounter a situation where a
|
||||
* subscription gets orphaned on a broker when the client disconnects and reattaches to another
|
||||
* broker in the cluster. Since the clientID/durableName only need to be unique within a single
|
||||
* broker, it's possible to have a durable sub on multiple brokers in a cluster.
|
||||
*
|
||||
* FOR EXAMPLE:
|
||||
* Broker A and B are networked together in both directions to form a full mesh. If durable
|
||||
* subscriber 'foo' subscribes to failover(A,B) and ends up on B, and a producer on A, messages
|
||||
* will be demand forwarded from A to B. But if the durable sub 'foo' disconnects from B,
|
||||
* then reconnects to failover(A,B) but this time gets connected to A, the subscription on
|
||||
* B will still be there are continue to receive messages (and possibly have missed messages
|
||||
* sent there while gone)
|
||||
*
|
||||
* We can avoid all of this mess with virtual topics as seen below:
|
||||
*
|
||||
*
|
||||
* @throws JMSException
|
||||
*/
|
||||
public void testDurableSubReconnectFromAtoB() throws JMSException {
|
||||
// create consumer on broker B
|
||||
ActiveMQConnectionFactory bConnFactory = new ActiveMQConnectionFactory(BROKER_B_TRANSPORT_URL+ "?jms.prefetchPolicy.queuePrefetch=0");
|
||||
Connection bConn = bConnFactory.createConnection();
|
||||
bConn.start();
|
||||
Session bSession = bConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer bSessionConsumer = bSession.createConsumer(queue);
|
||||
|
||||
|
||||
// create producer on A
|
||||
ActiveMQConnectionFactory aConnFactory = new ActiveMQConnectionFactory(BROKER_A_TRANSPORT_URL);
|
||||
Connection aProducerConn = aConnFactory.createConnection();
|
||||
aProducerConn.start();
|
||||
|
||||
Session aProducerSession = aProducerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = aProducerSession.createProducer(topic);
|
||||
produce(producer, aProducerSession, 5);
|
||||
|
||||
// sleep for a sec to let the messages get bridged over to broker B
|
||||
sleep();
|
||||
|
||||
// consumer on B has not consumed any messages, and for some reason goes away:
|
||||
bSessionConsumer.close();
|
||||
bSession.close();
|
||||
bConn.close();
|
||||
|
||||
// let the bridge catch up
|
||||
sleep();
|
||||
|
||||
// and now consumer reattaches to A and wants the messages that were sent to B
|
||||
Connection aConsumerConn = aConnFactory.createConnection();
|
||||
aConsumerConn.start();
|
||||
Session aConsumerSession = aConsumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer aSessionConsumer = aConsumerSession.createConsumer(queue);
|
||||
|
||||
sleep();
|
||||
|
||||
// they should all be there
|
||||
consume(aSessionConsumer, 5);
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void consume(MessageConsumer durable, int numMessagesExpected) throws JMSException {
|
||||
for (int i = 0; i < numMessagesExpected; i++) {
|
||||
Message message = durable.receive(1000);
|
||||
assertNotNull(message);
|
||||
TextMessage textMessage = (TextMessage) message;
|
||||
System.out.println("received: " + textMessage.getText());
|
||||
assertEquals("message: " +i, textMessage.getText());
|
||||
}
|
||||
}
|
||||
|
||||
private void produce(MessageProducer producer, Session sess, int numMessages) throws JMSException {
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
producer.send(sess.createTextMessage("message: " + i));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
maxSetupTime = 1000;
|
||||
super.setAutoFail(true);
|
||||
super.setUp();
|
||||
final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
|
||||
|
||||
BrokerService brokerServiceA = createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_A_TRANSPORT_URL, BROKER_A, options)));
|
||||
brokerServiceA.setDestinationPolicy(buildPolicyMap());
|
||||
brokerServiceA.setDestinations(new ActiveMQDestination[]{queue});
|
||||
|
||||
BrokerService brokerServiceB = createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_B_TRANSPORT_URL, BROKER_B, options)));
|
||||
brokerServiceB.setDestinationPolicy(buildPolicyMap());
|
||||
brokerServiceB.setDestinations(new ActiveMQDestination[]{queue});
|
||||
|
||||
|
||||
|
||||
// bridge brokers to each other statically (static: discovery)
|
||||
bridgeBrokers(BROKER_A, BROKER_B);
|
||||
bridgeBrokers(BROKER_B, BROKER_A);
|
||||
|
||||
startAllBrokers();
|
||||
}
|
||||
|
||||
private PolicyMap buildPolicyMap() {
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry policyEntry = new PolicyEntry();
|
||||
policyEntry.setOptimizedDispatch(true);
|
||||
ConditionalNetworkBridgeFilterFactory networkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
|
||||
networkBridgeFilterFactory.setReplayWhenNoConsumers(true);
|
||||
policyEntry.setNetworkBridgeFilterFactory(networkBridgeFilterFactory);
|
||||
policyEntry.setEnableAudit(false);
|
||||
policyMap.put(new ActiveMQQueue("Consumer.*.VirtualTopic.>"), policyEntry);
|
||||
return policyMap;
|
||||
}
|
||||
|
||||
private void sleep() {
|
||||
try {
|
||||
Thread.sleep(DEFAULT_SLEEP_MS);
|
||||
} catch (InterruptedException igonred) {
|
||||
}
|
||||
}
|
||||
|
||||
private void sleep(int milliSecondTime) {
|
||||
try {
|
||||
Thread.sleep(milliSecondTime);
|
||||
} catch (InterruptedException igonred) {
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue