resolve https://issues.apache.org/activemq/browse/AMQ-2764 - add inactivity monitor to http transport, enabled by default with 30 second default idle timeout, configure via transport url params as there is no negeotion via wire format info over http. Now a duplex return connection can expire when there is no inactivity. Also processed shutdown so in the normal case a duplex connector can be disposed when the remote broker stops, the inactivity monitor deal with the abortive closure case

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@990107 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-08-27 11:34:04 +00:00
parent fafaf7d6ea
commit ee4c8eeaba
15 changed files with 461 additions and 42 deletions

View File

@ -91,7 +91,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision$
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
private static final Log LOG = LogFactory.getLog(DemandForwardingBridgeSupport.class);
private static final ThreadPoolExecutor ASYNC_TASKS;
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker;
@ -527,6 +527,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
}
break;
case ShutdownInfo.DATA_STRUCTURE_TYPE:
// initiator is shutting down, controlled case
// abortive close dealt with by inactivity monitor
LOG.info("Stopping network bridge on shutdown of remote broker");
serviceRemoteException(new IOException(command.toString()));
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring remote command: " + command);

View File

@ -44,6 +44,7 @@ public class InactivityMonitor extends TransportFilter {
private static final ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
private static Timer WRITE_CHECK_TIMER;
@ -63,9 +64,11 @@ public class InactivityMonitor extends TransportFilter {
private SchedulerTimerTask readCheckerTask;
private boolean ignoreRemoteWireFormat = false;
private long readCheckTime;
private long writeCheckTime;
private long initialDelayTime;
private boolean ignoreAllWireFormatInfo = false;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean useKeepAlive = true;
private boolean keepAliveResponseRequired;
private WireFormat wireFormat;
@ -115,6 +118,14 @@ public class InactivityMonitor extends TransportFilter {
public InactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
this.wireFormat = wireFormat;
if (this.wireFormat == null) {
this.ignoreAllWireFormatInfo = true;
}
}
public void start() throws Exception {
next.start();
startMonitorThreads();
}
public void stop() throws Exception {
@ -268,23 +279,29 @@ public class InactivityMonitor extends TransportFilter {
ignoreRemoteWireFormat = val;
}
public long getReadCheckTime() {
return readCheckTime;
}
public void setReadCheckTime(long readCheckTime) {
this.readCheckTime = readCheckTime;
}
public long getInitialDelayTime() {
return initialDelayTime;
}
public void setInitialDelayTime(long initialDelayTime) {
this.initialDelayTime = initialDelayTime;
}
private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
return;
}
if (localWireFormatInfo == null) {
return;
}
if (remoteWireFormatInfo == null) {
return;
}
if (!ignoreRemoteWireFormat) {
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
} else {
readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
if (!configuredOk()) {
return;
}
if (readCheckTime > 0) {
@ -304,6 +321,23 @@ public class InactivityMonitor extends TransportFilter {
}
}
private boolean configuredOk() throws IOException {
boolean configured = false;
if (ignoreAllWireFormatInfo) {
configured = true;
} else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
if (!ignoreRemoteWireFormat) {
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
} else {
readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
}
configured = true;
}
return configured;
}
/**
*
*/

View File

@ -294,4 +294,11 @@ public abstract class TransportFactory {
return transport;
}
protected String getOption(Map options, String key, String def) {
String rc = (String) options.remove(key);
if( rc == null ) {
rc = def;
}
return rc;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.util.ServiceSupport;
@ -30,6 +31,7 @@ public abstract class TransportServerSupport extends ServiceSupport implements T
private URI connectURI;
private URI bindLocation;
private TransportAcceptListener acceptListener;
protected Map<String, Object> transportOptions;
public TransportServerSupport() {
}
@ -82,4 +84,8 @@ public abstract class TransportServerSupport extends ServiceSupport implements T
public void setBindLocation(URI bindLocation) {
this.bindLocation = bindLocation;
}
public void setTransportOption(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
}

View File

@ -111,14 +111,6 @@ public class TcpTransportFactory extends TransportFactory {
return super.compositeConfigure(transport, format, options);
}
private String getOption(Map options, String key, String def) {
String rc = (String) options.remove(key);
if( rc == null ) {
rc = def;
}
return rc;
}
/**
* Returns true if the inactivity monitor should be used on the transport
*/

View File

@ -106,7 +106,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected boolean startLogging = true;
protected Map<String, Object> transportOptions;
protected final ServerSocketFactory serverSocketFactory;
protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread;
@ -387,10 +386,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
return (InetSocketAddress)serverSocket.getLocalSocketAddress();
}
public void setTransportOption(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
protected final void handleSocket(Socket socket) {
try {
if (this.currentTransportCount >= this.maximumConnections) {

View File

@ -43,7 +43,8 @@ public class HttpEmbeddedTunnelServlet extends HttpTunnelServlet {
// Add the servlet connector
String url = getConnectorURL();
transportConnector = new HttpTransportServer(new URI(url));
HttpTransportFactory factory = new HttpTransportFactory();
transportConnector = (HttpTransportServer) factory.doBind(new URI(url));
broker.addConnector(transportConnector);
String brokerURL = getServletContext().getInitParameter("org.apache.activemq.brokerURL");

View File

@ -18,14 +18,22 @@ package org.apache.activemq.transport.http;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ThreadNameFilter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,7 +47,15 @@ public class HttpTransportFactory extends TransportFactory {
private static final Log LOG = LogFactory.getLog(HttpTransportFactory.class);
public TransportServer doBind(URI location) throws IOException {
return new HttpTransportServer(location);
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
HttpTransportServer result = new HttpTransportServer(location, this);
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
result.setTransportOption(transportOptions);
return result;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
protected TextWireFormat asTextWireFormat(WireFormat wireFormat) {
@ -59,16 +75,26 @@ public class HttpTransportFactory extends TransportFactory {
return new HttpClientTransport(textWireFormat, location);
}
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
return compositeConfigure(transport, format, options);
}
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
HttpClientTransport httpTransport = (HttpClientTransport) super.compositeConfigure(transport, format, options);
transport = httpTransport;
if( httpTransport.isTrace() ) {
transport = super.compositeConfigure(transport, format, options);
HttpClientTransport httpTransport = (HttpClientTransport)transport.narrow(HttpClientTransport.class);
if(httpTransport != null && httpTransport.isTrace() ) {
try {
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
} catch (Throwable e) {
LOG.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e);
}
}
boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
if (useInactivityMonitor) {
transport = new InactivityMonitor(transport, null /* ignore wire format as no negotiation over http */);
IntrospectionSupport.setProperties(transport, options);
}
return transport;
}

View File

@ -41,10 +41,12 @@ public class HttpTransportServer extends TransportServerSupport {
private TextWireFormat wireFormat;
private Server server;
private Connector connector;
private HttpTransportFactory transportFactory;
public HttpTransportServer(URI uri) {
public HttpTransportServer(URI uri, HttpTransportFactory factory) {
super(uri);
this.bindAddress = uri;
this.transportFactory = factory;
}
public void setBrokerInfo(BrokerInfo brokerInfo) {
@ -112,6 +114,8 @@ public class HttpTransportServer extends TransportServerSupport {
contextHandler.setAttribute("acceptListener", getAcceptListener());
contextHandler.setAttribute("wireFormat", getWireFormat());
contextHandler.setAttribute("transportFactory", transportFactory);
contextHandler.setAttribute("transportOptions", transportOptions);
server.start();
}

View File

@ -31,9 +31,12 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,9 +52,11 @@ public class HttpTunnelServlet extends HttpServlet {
private static final Log LOG = LogFactory.getLog(HttpTunnelServlet.class);
private TransportAcceptListener listener;
private HttpTransportFactory transportFactory;
private TextWireFormat wireFormat;
private final Map<String, BlockingQueueTransport> clients = new HashMap<String, BlockingQueueTransport>();
private final long requestTimeout = 30000L;
private HashMap transportOptions;
@Override
public void init() throws ServletException {
@ -60,6 +65,11 @@ public class HttpTunnelServlet extends HttpServlet {
if (listener == null) {
throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
}
transportFactory = (HttpTransportFactory)getServletContext().getAttribute("transportFactory");
if (transportFactory == null) {
throw new ServletException("No such attribute 'transportFactory' available in the ServletContext");
}
transportOptions = (HashMap)getServletContext().getAttribute("transportOptions");
wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
if (wireFormat == null) {
wireFormat = createWireFormat();
@ -174,14 +184,20 @@ public class HttpTunnelServlet extends HttpServlet {
synchronized (this) {
BlockingQueueTransport answer = clients.get(clientID);
if (answer != null) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has allready been established");
LOG.warn("A session for clientID '" + clientID + "' has allready been established");
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
LOG.warn("A session for clientID '" + clientID + "' has already been established");
return null;
}
answer = createTransportChannel();
clients.put(clientID, answer);
listener.onAccept(answer);
Transport transport = answer;
try {
transport = transportFactory.serverConfigure(answer, null, transportOptions);
} catch (Exception e) {
IOExceptionSupport.create(e);
}
listener.onAccept(transport);
//wait for the transport to connect
while (!answer.isConnected()) {
try {

View File

@ -33,11 +33,11 @@ import org.apache.activemq.wireformat.WireFormat;
public class HttpsTransportFactory extends HttpTransportFactory {
public TransportServer doBind(String brokerId, URI location) throws IOException {
return new HttpsTransportServer(location);
return new HttpsTransportServer(location, this);
}
public TransportServer doBind(URI location) throws IOException {
return new HttpsTransportServer(location);
return new HttpsTransportServer(location, this);
}
protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException {

View File

@ -32,8 +32,8 @@ public class HttpsTransportServer extends HttpTransportServer {
private String keyCertificateAlgorithm;
private String protocol;
public HttpsTransportServer(URI uri) {
super(uri);
public HttpsTransportServer(URI uri, HttpsTransportFactory factory) {
super(uri, factory);
}
public void doStart() throws Exception {

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AMQ2764Test extends TestCase {
private static final Log LOG = LogFactory.getLog(AMQ2764Test.class);
private BrokerService brokerOne;
private BrokerService brokerTwo;
private Destination destination;
private ArrayList<Connection> connections = new ArrayList<Connection>();
public void testBrokerRestart() throws Exception {
startBrokerOne();
Thread.sleep(5000);
startBrokerTwo();
Thread.sleep(5000);
ActiveMQConnectionFactory producerConnectionFactory = createBrokerOneConnectionFactory();
ActiveMQConnectionFactory secondProducerConnectionFactory = createBrokerTwoConnectionFactory();
ActiveMQConnectionFactory consumerConnectionFactory = createBrokerOneConnectionFactory();
MessageConsumer consumer = createConsumer(consumerConnectionFactory);
AtomicInteger counter = createConsumerCounter(consumerConnectionFactory);
waitForConsumerToArrive(counter);
final int expectedMessagesReceived = 25;
int actualMessagesReceived = doSendMessage(expectedMessagesReceived, consumer, producerConnectionFactory);
assertEquals("Didn't receive the right amount of messages directly connected", expectedMessagesReceived, actualMessagesReceived);
assertNull( "Had extra messages", consumer.receiveNoWait());
actualMessagesReceived = doSendMessage(expectedMessagesReceived, consumer, secondProducerConnectionFactory);
assertEquals("Didn't receive the right amount of messages via network", expectedMessagesReceived, actualMessagesReceived);
assertNull( "Had extra messages", consumer.receiveNoWait());
LOG.info("Stopping broker one");
stopBrokerOne();
TimeUnit.SECONDS.sleep(1);
LOG.info("Restarting broker");
startBrokerOne();
consumer = createConsumer(consumerConnectionFactory);
counter = createConsumerCounter(consumerConnectionFactory);
waitForConsumerToArrive(counter);
actualMessagesReceived = doSendMessage(expectedMessagesReceived, consumer, secondProducerConnectionFactory);
assertEquals("Didn't receive the right amount of messages via network after restart", expectedMessagesReceived, actualMessagesReceived);
assertNull( "Had extra messages", consumer.receiveNoWait());
stopBrokerOne();
stopBrokerTwo();
}
protected int doSendMessage(int expectedMessagesReceived, MessageConsumer consumer, ActiveMQConnectionFactory connectionFactory) throws Exception {
int messagesReceived = 0;
for (int i=0; i<expectedMessagesReceived; i++) {
String messageId = sendMessage(connectionFactory);
Message message = consumer.receive(5000);
if ( message!=null ) {
messagesReceived++;
}
}
return messagesReceived;
}
protected String sendMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException {
Connection connection = null;
try {
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
Message message = session.createMessage();
producer.send(message);
return message.getJMSMessageID();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected BrokerService createFirstBroker() throws Exception {
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/bugs/amq2764/reconnect-broker1.xml"));
}
protected BrokerService createSecondBroker() throws Exception {
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/bugs/amq2764/reconnect-broker2.xml"));
}
protected ActiveMQConnectionFactory createBrokerOneConnectionFactory() {
return new ActiveMQConnectionFactory("vm://broker1");
}
protected ActiveMQConnectionFactory createBrokerTwoConnectionFactory() {
return new ActiveMQConnectionFactory("vm://broker2");
}
protected void setUp() throws Exception {
LOG.info("===============================================================================");
LOG.info("Running Test Case: " + getName());
LOG.info("===============================================================================");
destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
}
protected void tearDown() throws Exception {
disposeConsumerConnections();
try {
stopBrokerOne();
} catch (Throwable e) {
}
try {
stopBrokerTwo();
} catch (Throwable e) {
}
}
protected void disposeConsumerConnections() {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
Connection connection = iter.next();
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void startBrokerOne() throws Exception {
if (brokerOne == null) {
brokerOne = createFirstBroker();
brokerOne.start();
}
}
protected void stopBrokerOne() throws Exception {
if (brokerOne != null) {
brokerOne.stop();
brokerOne = null;
}
}
protected void startBrokerTwo() throws Exception {
if (brokerTwo == null) {
brokerTwo = createSecondBroker();
brokerTwo.start();
}
}
protected void stopBrokerTwo() throws Exception {
if (brokerTwo != null) {
brokerTwo.stop();
brokerTwo = null;
}
}
protected MessageConsumer createConsumer(ActiveMQConnectionFactory consumerConnectionFactory) throws JMSException {
Connection connection = consumerConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(destination);
}
protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception {
final AtomicInteger rc = new AtomicInteger(0);
Connection connection = cf.createConnection();
connections.add(connection);
connection.start();
ConsumerEventSource source = new ConsumerEventSource(connection, destination);
source.setConsumerListener(new ConsumerListener() {
public void onConsumerEvent(ConsumerEvent event) {
rc.set(event.getConsumerCount());
}
});
source.start();
return rc;
}
protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException {
for (int i = 0; i < 100; i++) {
if (consumerCounter.get() > 0) {
return;
}
Thread.sleep(100);
}
fail("The consumer did not arrive.");
}
protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException {
for (int i = 0; i < 100; i++) {
if (consumerCounter.get() == 0) {
return;
}
Thread.sleep(100);
}
fail("The consumer did not leave.");
}
}

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
<destinations>
<queue physicalName="RECONNECT.TEST.QUEUE"/>
</destinations>
<networkConnectors>
<networkConnector uri="static:(http://localhost:61617)" duplex="true">
<staticallyIncludedDestinations>
<queue physicalName="RECONNECT.TEST.QUEUE"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="http://localhost:61616"/>
</transportConnectors>
</broker>
</beans>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<!-- configure a low inactivity monitor check time to ensure all messages are received quickly -->
<transportConnector uri="http://localhost:61617?transport.readCheckTime=4000&amp;transport.initialDelayTime=4000"/>
</transportConnectors>
</broker>
</beans>