diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java new file mode 100644 index 0000000000..15be77cb63 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * A simple request / reply test + */ +public class JmsSimpleRequestReplyTest extends CamelTestSupport { + + protected String componentName = "activemq"; + + @Test + public void testRequestReply2Messages() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + + template.requestBody("activemq:queue:hello", "Hello World"); + + result.assertIsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = createConnectionFactory(null); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + public static ConnectionFactory createConnectionFactory(String options) { + String url = "vm://test-broker?broker.persistent=false&broker.useJmx=false"; + if (options != null) { + url = url + "&" + options; + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + // use a pooled connection factory + PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory); + pooled.setMaxConnections(8); + return pooled; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("activemq:queue:hello").process(new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Bye World"); + assertNotNull(exchange.getIn().getHeader("JMSReplyTo")); + } + }).to("mock:result"); + } + }; + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 43568dcc98..542eb000c3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -2425,7 +2425,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon while(entries.hasNext()) { ConcurrentHashMap.Entry entry = entries.next(); try { - this.deleteTempDestination(entry.getValue()); + // Only delete this temp destination if it was created from this connection. The connection used + // for the advisory consumer may also have a reference to this temp destination. + ActiveMQTempDestination dest = entry.getValue(); + String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString(); + if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) { + this.deleteTempDestination(entry.getValue()); + } } catch (Exception ex) { // the temp dest is in use so it can not be deleted. // it is ok to leave it to connection tear down phase