https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network without advisories. Allow the connection id generator prefix to be set via a connection factory such that temp identies can be configured such that they are suitable for inclusion in a network list of statically included destintions. Allow auto recreation of temp destinations by a new producer and tie lifecycle to the producers connection. This allows configurable support for request reply with temps in a network with advisory support disabled

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1087330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-31 15:11:09 +00:00
parent 3df7404d3e
commit b9045dba98
15 changed files with 310 additions and 17 deletions

View File

@ -30,8 +30,9 @@ public class CamelConnection extends ActiveMQConnection implements CamelContextA
private CamelContext camelContext;
protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, factoryStats);
protected CamelConnection(Transport transport, IdGenerator clientIdGenerator,
IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
}
public CamelContext getCamelContext() {

View File

@ -45,7 +45,7 @@ public class CamelConnectionFactory extends ActiveMQConnectionFactory implements
// Implementation methods
//-----------------------------------------------------------------------
protected CamelConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), stats);
CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
CamelContext context = getCamelContext();
if (context != null) {
connection.setCamelContext(context);

View File

@ -200,7 +200,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @param factoryStats
* @throws Exception
*/
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
@ -216,7 +216,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = CONNECTION_ID_GENERATOR.generateId();
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());

View File

@ -83,6 +83,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private IdGenerator clientIdGenerator;
private String clientIDPrefix;
private IdGenerator connectionIdGenerator;
private String connectionIDPrefix;
// client policies
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
@ -288,7 +290,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
}
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
getConnectionIdGenerator(), stats);
return connection;
}
@ -843,6 +846,29 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.clientIdGenerator = clientIdGenerator;
}
/**
* Sets the prefix used by connection id generator
* @param connectionIDPrefix
*/
public void setConnectionIDPrefix(String connectionIDPrefix) {
this.connectionIDPrefix = connectionIDPrefix;
}
protected synchronized IdGenerator getConnectionIdGenerator() {
if (connectionIdGenerator == null) {
if (connectionIDPrefix != null) {
connectionIdGenerator = new IdGenerator(connectionIDPrefix);
} else {
connectionIdGenerator = new IdGenerator();
}
}
return connectionIdGenerator;
}
protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
this.connectionIdGenerator = connectionIdGenerator;
}
/**
* @return the statsEnabled
*/

View File

@ -50,8 +50,9 @@ import org.apache.activemq.util.IdGenerator;
*/
public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, factoryStats);
protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator,
IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
}
public XASession createXASession() throws JMSException {

View File

@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple
}
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), stats);
ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
return connection;
}
}

View File

@ -297,10 +297,6 @@ public class BrokerService implements Service {
* @throws Exception
*/
public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
if (!isAdvisorySupport()) {
throw new javax.jms.IllegalStateException(
"Networks require advisory messages to function - advisories are currently disabled");
}
NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
return addNetworkConnector(connector);
}

View File

@ -28,10 +28,12 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
@ -130,6 +132,18 @@ public abstract class AbstractRegion implements Region {
destinations.put(destination, dest);
destinationMap.put(destination, dest);
addSubscriptionsForDestination(context, dest);
if (destination.isTemporary()) {
// need to associate with the connection so it can get removed
if (context.getConnection() instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) context.getConnection();
DestinationInfo info = new DestinationInfo(context.getConnectionId(),
DestinationInfo.ADD_OPERATION_TYPE,
destination);
transportConnection.processAddDestination(info);
LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:"
+ context.getConnectionId());
}
}
}
if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist.");

View File

@ -834,7 +834,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}finally {
messagesLock.readLock().unlock();
}
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
+ ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+ messageGroupOwners;
}

View File

@ -393,7 +393,7 @@ public class RegionBroker extends EmptyBroker {
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, false);
context.getBroker().addDestination(context, destination, true);
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);

View File

@ -20,6 +20,7 @@ import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
/**
* @org.apache.xbean.XBean element="tempQueue" description="An ActiveMQ Temporary Queue Destination"
* @openwire:marshaller code="102"
*
*/

View File

@ -20,6 +20,7 @@ import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
/**
* @org.apache.xbean.XBean element="tempTopic" description="An ActiveMQ Temporary Topic Destination"
* @openwire:marshaller code="103"
*
*/

View File

@ -127,19 +127,21 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
event.connectFailures++;
if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
return;
}
synchronized (sleepMutex) {
try {
if (!running.get()) {
LOG.debug("Reconnecting disabled: stopped");
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
LOG.debug("Reconnecting disabled: " + ie);
Thread.currentThread().interrupt();
return;
}
@ -161,6 +163,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
}
if (!running.get()) {
LOG.debug("Reconnecting disabled: stopped");
return;
}

View File

@ -52,11 +52,14 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
/**
@ -66,6 +69,7 @@ import org.springframework.core.io.Resource;
*
*/
public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class);
public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
public static int maxSetupTime = 5000;
@ -170,7 +174,14 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
if (!broker.getNetworkConnectors().isEmpty()) {
result = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return (broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size() >= min);
int activeCount = 0;
for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) {
if (bridge.getRemoteBrokerName() != null) {
LOG.info("found bridge to " + bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName());
activeCount++;
}
}
return activeCount >= min;
}}, wait);
}
return result;

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.Map;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSupport {
private static final transient Logger LOG = LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
BrokerService a, b;
ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
static final String connectionIdMarker = "ID:marker.";
ActiveMQTempQueue replyQWildcard = new ActiveMQTempQueue(connectionIdMarker + ">");
private long receiveTimeout = 30000;
public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception {
final String xmlConfigString = new String(
"<beans" +
" xmlns=\"http://www.springframework.org/schema/beans\"" +
" xmlns:amq=\"http://activemq.apache.org/schema/core\"" +
" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"" +
" xsi:schemaLocation=\"http://www.springframework.org/schema/beans" +
" http://www.springframework.org/schema/beans/spring-beans-2.0.xsd" +
" http://activemq.apache.org/schema/core" +
" http://activemq.apache.org/schema/core/activemq-core.xsd\">" +
" <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\"" +
" brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\" useJmx=\"false\" >" +
" <destinationPolicy>" +
" <policyMap>" +
" <policyEntries>" +
" <policyEntry optimizedDispatch=\"true\">"+
" <destination>"+
" <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName() + "\"/>" +
" </destination>" +
" </policyEntry>" +
" </policyEntries>" +
" </policyMap>" +
" </destinationPolicy>" +
" <networkConnectors>" +
" <networkConnector uri=\"multicast://default\">" +
" <staticallyIncludedDestinations>" +
" <queue physicalName=\"" + sendQ.getPhysicalName() + "\"/>" +
" <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName() + "\"/>" +
" </staticallyIncludedDestinations>" +
" </networkConnector>" +
" </networkConnectors>" +
" <transportConnectors>" +
" <transportConnector uri=\"tcp://0.0.0.0:0\" discoveryUri=\"multicast://default\" />" +
" </transportConnectors>" +
" </broker>" +
"</beans>");
final String localProtocolScheme = "inline";
URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if (localProtocolScheme.equalsIgnoreCase(protocol)) {
return new URLStreamHandler() {
@Override
protected URLConnection openConnection(URL u) throws IOException {
return new URLConnection(u) {
@Override
public void connect() throws IOException {
}
@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(xmlConfigString.replace("%HOST%", url.getFile()).getBytes("UTF-8"));
}
};
}
};
}
return null;
}
});
a = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme + ":A"));
b = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme + ":B"));
doTestNonAdvisoryNetworkRequestReply();
}
public void testNonAdvisoryNetworkRequestReply() throws Exception {
createBridgeAndStartBrokers();
doTestNonAdvisoryNetworkRequestReply();
}
public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
waitForBridgeFormation(a, 1, 0);
waitForBridgeFormation(b, 1, 0);
ActiveMQConnectionFactory sendFactory = createConnectionFactory(a);
ActiveMQConnection sendConnection = createConnection(sendFactory);
ActiveMQSession sendSession = (ActiveMQSession)sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(sendQ);
ActiveMQTempQueue realReplyQ = (ActiveMQTempQueue) sendSession.createTemporaryQueue();
TextMessage message = sendSession.createTextMessage("1");
message.setJMSReplyTo(realReplyQ);
producer.send(message);
// responder
ActiveMQConnectionFactory consumerFactory = createConnectionFactory(b);
ActiveMQConnection consumerConnection = createConnection(consumerFactory);
ActiveMQSession consumerSession = (ActiveMQSession)consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(sendQ);
TextMessage received = (TextMessage) consumer.receive(receiveTimeout);
assertNotNull(received);
LOG.info("got request, sending reply");
MessageProducer consumerProducer = consumerSession.createProducer(received.getJMSReplyTo());
consumerProducer.send(consumerSession.createTextMessage("got " + received.getText()));
// temp dest on reply broker tied to this connection, setOptimizedDispatch=true ensures
// message gets delivered before destination is removed
consumerConnection.close();
// reply consumer
MessageConsumer replyConsumer = sendSession.createConsumer(realReplyQ);
TextMessage reply = (TextMessage) replyConsumer.receive(receiveTimeout);
assertNotNull("expected reply message", reply);
assertEquals("text is as expected", "got 1", reply.getText());
sendConnection.close();
verifyAllTempQueuesAreGone();
}
private void verifyAllTempQueuesAreGone() throws Exception {
for (BrokerService brokerService : new BrokerService[]{a, b}) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
Map temps = regionBroker.getTempTopicRegion().getDestinationMap();
assertTrue("no temp topics on " + brokerService + ", " + temps, temps.isEmpty());
temps = regionBroker.getTempQueueRegion().getDestinationMap();
assertTrue("no temp queues on " + brokerService + ", " + temps, temps.isEmpty());
}
}
private ActiveMQConnection createConnection(ActiveMQConnectionFactory factory) throws Exception {
ActiveMQConnection c =(ActiveMQConnection) factory.createConnection();
c.start();
return c;
}
private ActiveMQConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception {
String target = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(target);
factory.setWatchTopicAdvisories(false);
factory.setConnectionIDPrefix(connectionIdMarker + brokerService.getBrokerName());
return factory;
}
public void createBridgeAndStartBrokers() throws Exception {
a = configureBroker("A");
b = configureBroker("B");
bridge(a, b);
bridge(b, a);
a.start();
b.start();
}
public void tearDown() throws Exception {
stop(a);
stop(b);
}
private void stop(BrokerService broker) throws Exception {
if (broker != null) {
broker.stop();
}
}
private void bridge(BrokerService from, BrokerService to) throws Exception {
TransportConnector toConnector = to.addConnector("tcp://localhost:0");
NetworkConnector bridge =
from.addNetworkConnector("static://" + toConnector.getPublishableConnectString());
bridge.addStaticallyIncludedDestination(sendQ);
bridge.addStaticallyIncludedDestination(replyQWildcard);
}
private BrokerService configureBroker(String brokerName) throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(brokerName);
broker.setAdvisorySupport(false);
broker.setPersistent(false);
broker.setUseJmx(false);
PolicyMap map = new PolicyMap();
PolicyEntry tempReplyQPolicy = new PolicyEntry();
tempReplyQPolicy.setOptimizedDispatch(true);
map.put(replyQWildcard, tempReplyQPolicy);
broker.setDestinationPolicy(map);
return broker;
}
}