diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 5ac95b2d19..17185a0d79 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -449,19 +449,9 @@ public class AmqpSender extends AmqpAbstractLink { ActiveMQMessage temp = null; if (md.getMessage() != null) { - - // Topics can dispatch the same Message to more than one consumer - // so we must copy to prevent concurrent read / write to the same - // message object. - if (md.getDestination().isTopic()) { - synchronized (md.getMessage()) { - temp = (ActiveMQMessage) md.getMessage().copy(); - } - } else { - temp = (ActiveMQMessage) md.getMessage(); - } - + temp = (ActiveMQMessage) md.getMessage(); if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) { + temp = (ActiveMQMessage) md.getMessage().copy(); temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java new file mode 100644 index 0000000000..f8bf560921 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.activemq.store.kahadb.KahaDBStore.PROPERTY_CANCELED_TASK_MOD_METRIC; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +@RunWith(Parameterized.class) +public class OpenWireToAmqpConcurrentStoreAndDispatchTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(OpenWireToAmqpConcurrentStoreAndDispatchTest.class); + + private final String transformer; + + @Parameters(name="Transformer->{0}") + public static Collection data() { + System.setProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "100"); + return Arrays.asList(new Object[][] { + {"jms"} + }); + } + + public OpenWireToAmqpConcurrentStoreAndDispatchTest(String transformer) { + this.transformer = transformer; + } + + @Override + protected String getAmqpTransformer() { + return transformer; + } + + @Override + protected boolean isPersistent() { + return true; + } + + @Override + protected boolean isUseOpenWireConnector() { + return true; + } + + @Test(timeout = 6000000) + @Ignore("takes more than 6 mins to complete but fails earlier without fix") + public void testNoErrorOnSend() throws Exception { + + final int numIterations = 100; + int numConsumers = 3; + final int numProducers = 10; + final int numMessages = 2000; + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicInteger sent = new AtomicInteger(); + final AtomicInteger received = new AtomicInteger(); + final AtomicBoolean errorOnSend = new AtomicBoolean(false); + + final AtomicInteger toSend = new AtomicInteger(numMessages); + + final Random random = new Random(); + for (int i=0; i= 0) { + producer.send(bytesMessage); + sent.incrementAndGet(); + } + } catch (Exception e) { + e.printStackTrace(); + errorOnSend.set(true); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (JMSException ignored) {} + } + } + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.SECONDS); + + done.set(true); + assertEquals("[" + i + "] sent all requested", numMessages, sent.get()); + assertEquals("[" + i + "] got all sent", numMessages, received.get()); + assertFalse("[" + i + "] no error on send", errorOnSend.get()); + } + } +}