diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 3348b0fa9e..136f7b936c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.command.ActiveMQBytesMessage; @@ -87,8 +88,6 @@ public final class OpenWireMessageConverter { private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER"); private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); - private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID; - private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE; private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID"); @@ -131,7 +130,7 @@ public final class OpenWireMessageConverter { } else if (contents != null) { final boolean messageCompressed = messageSend.isCompressed(); if (messageCompressed) { - coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); + coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, true); } switch (coreType) { @@ -468,7 +467,7 @@ public final class OpenWireMessageConverter { SimpleString key = new SimpleString(entry.getKey()); Object value = entry.getValue(); if (value instanceof UTF8Buffer) { - value = ((UTF8Buffer) value).toString(); + value = value.toString(); } TypedProperties.setObjectProperty(key, value, props); } @@ -498,8 +497,8 @@ public final class OpenWireMessageConverter { public static MessageDispatch createMessageDispatch(MessageReference reference, ICoreMessage message, WireFormat marshaller, - AMQConsumer consumer) throws IOException { - ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer); + AMQConsumer consumer, UUID serverNodeUUID) throws IOException { + ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer, serverNodeUUID); //we can use core message id for sequenceId amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); @@ -526,11 +525,11 @@ public final class OpenWireMessageConverter { private static ActiveMQMessage toAMQMessage(MessageReference reference, ICoreMessage coreMessage, WireFormat marshaller, - AMQConsumer consumer) throws IOException { + AMQConsumer consumer, UUID serverNodeUUID) throws IOException { final ActiveMQMessage amqMsg; final byte coreType = coreMessage.getType(); final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); - final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); + final boolean isCompressed = compressProp != null && compressProp; final byte[] bytes; final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); buffer.resetReaderIndex(); @@ -591,12 +590,12 @@ public final class OpenWireMessageConverter { amqMsg.setArrival(arrival); final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); - if (brokerPath != null && brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { + if (brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString()); } final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); - if (clusterPath != null && clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { + if (clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString()); } @@ -626,20 +625,15 @@ public final class OpenWireMessageConverter { amqMsg.setGroupSequence(coreMessage.getGroupSequence()); + final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID); final MessageId mid; - final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID); if (midBytes != null) { ByteSequence midSeq = new ByteSequence(midBytes); mid = (MessageId) marshaller.unmarshal(midSeq); } else { - final SimpleString connectionId = (SimpleString) coreMessage.getObjectProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); - if (connectionId != null) { - mid = new MessageId("ID:" + connectionId.toString() + ":-1:-1:-1", coreMessage.getMessageID()); - } else { - //JMSMessageID should be started with "ID:" - String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1"; - mid = new MessageId(midd); - } + //JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id) + String midd = "ID:" + serverNodeUUID + ":-1:-1:-1"; + mid = new MessageId(midd, coreMessage.getMessageID()); } amqMsg.setMessageId(mid); @@ -673,7 +667,7 @@ public final class OpenWireMessageConverter { } final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID); - if (userId != null && userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { + if (userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { amqMsg.setUserID(((SimpleString)userId).toString()); } @@ -694,7 +688,7 @@ public final class OpenWireMessageConverter { final Set props = coreMessage.getPropertyNames(); if (props != null) { - setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer); + setAMQMsgObjectProperties(amqMsg, coreMessage, props); } if (bytes != null) { @@ -945,8 +939,7 @@ public final class OpenWireMessageConverter { private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg, final ICoreMessage coreMessage, - final Set props, - final AMQConsumer consumer) throws IOException { + final Set props) throws IOException { for (SimpleString s : props) { final String keyStr = s.toString(); if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 89eafe7a47..d68fa91683 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -60,10 +59,8 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; public class AMQConsumer { - private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications"; private final AMQSession session; private final org.apache.activemq.command.ActiveMQDestination openwireDestination; - private final boolean hasNotificationDestination; private final ConsumerInfo info; private final ScheduledExecutorService scheduledPool; private ServerConsumer serverConsumer; @@ -85,7 +82,6 @@ public class AMQConsumer { boolean internalAddress) { this.session = amqSession; this.openwireDestination = d; - this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION); this.info = info; this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); @@ -132,7 +128,7 @@ public class AMQConsumer { preAck = true; } String id = info.getClientId() != null ? info.getClientId() : this.getId().getConnectionId(); - String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + id + "'"; + String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME + "<>'" + id + "'"; if (selector == null) { selector = new SimpleString(noLocalSelector); } else { @@ -250,7 +246,7 @@ public class AMQConsumer { } } - public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) { + public int handleDeliver(MessageReference reference, ICoreMessage message) { MessageDispatch dispatch; try { MessagePullHandler pullHandler = messagePullHandler.get(); @@ -264,15 +260,12 @@ public class AMQConsumer { message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); } //handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat() - dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this); + dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID()); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); currentWindow.decrementAndGet(); return size; - } catch (IOException e) { - ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", e); - return 0; } catch (Throwable t) { ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", t); return 0; @@ -399,9 +392,6 @@ public class AMQConsumer { serverConsumer.close(false); } - public boolean hasNotificationDestination() { - return hasNotificationDestination; - } public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() { return openwireDestination; @@ -488,12 +478,11 @@ public class AMQConsumer { } } - public boolean removeRolledback(MessageReference messageReference) { + public void removeRolledback(MessageReference messageReference) { final Set rolledbackMessageRefs = getRolledbackMessageRefs(); - if (rolledbackMessageRefs == null) { - return false; + if (rolledbackMessageRefs != null) { + rolledbackMessageRefs.remove(messageReference); } - return rolledbackMessageRefs.remove(messageReference); } public void addRolledback(MessageReference messageReference) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index d4de2ee4f2..f96b6c9c45 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -311,7 +311,7 @@ public class AMQSession implements SessionCallback { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); //clear up possible rolledback ids. theConsumer.removeRolledback(reference); - return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); + return theConsumer.handleDeliver(reference, message.toCore()); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java new file mode 100644 index 0000000000..d29676f0de --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.openwire; + +import org.apache.activemq.ActiveMQMessageAuditNoSync; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; +import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.UUID; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.wireformat.WireFormat; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertFalse; + +public class OpenWireMessageConverterTest { + + final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory(); + final WireFormat openWireFormat = formatFactory.createWireFormat(); + final byte[] content = new byte[] {'a','a'}; + final String address = "Q"; + final ActiveMQDestination destination = new ActiveMQQueue(address); + final UUID nodeUUID = UUIDGenerator.getInstance().generateUUID(); + + @Test + public void createMessageDispatch() throws Exception { + + ActiveMQMessageAuditNoSync mqMessageAuditNoSync = new ActiveMQMessageAuditNoSync(); + + for (int i = 0; i < 10; i++) { + + ICoreMessage msg = new CoreMessage().initBuffer(100); + msg.setMessageID(i); + msg.getBodyBuffer().writeBytes(content); + msg.setAddress(address); + + MessageReference messageReference = new MessageReferenceImpl(msg, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + + MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID); + + MessageId messageId = dispatch.getMessage().getMessageId(); + assertFalse(mqMessageAuditNoSync.isDuplicate(messageId)); + } + + + for (int i = 10; i < 20; i++) { + + CoreMessage msg = new CoreMessage().initBuffer(100); + msg.setMessageID(i); + msg.getBodyBuffer().writeBytes(content); + msg.setAddress(address); + + // share a connection id + msg.getProperties().putProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, "MyClient"); + + + MessageReference messageReference = new MessageReferenceImpl(msg, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + + MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID); + + MessageId messageId = dispatch.getMessage().getMessageId(); + assertFalse(mqMessageAuditNoSync.isDuplicate(messageId)); + } + + } +} \ No newline at end of file diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java index 5c1a879114..b20f209e28 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java @@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.utils.UUID; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; @@ -55,6 +58,12 @@ public class AMQConsumerTest { } private AMQConsumer getConsumer(int prefetchSize) throws Exception { + UUID nodeId = UUIDGenerator.getInstance().generateUUID(); + ActiveMQServer coreServer = Mockito.mock(ActiveMQServer.class); + NodeManager nodeManager = Mockito.mock(NodeManager.class); + Mockito.when(coreServer.getNodeManager()).thenReturn(nodeManager); + Mockito.when(nodeManager.getUUID()).thenReturn(nodeId); + ServerSession coreSession = Mockito.mock(ServerSession.class); Mockito.when(coreSession.createConsumer(ArgumentMatchers.anyLong(), ArgumentMatchers.nullable(SimpleString.class), ArgumentMatchers.nullable(SimpleString.class), ArgumentMatchers.anyInt(), @@ -62,7 +71,7 @@ public class AMQConsumerTest { ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class)); AMQSession session = Mockito.mock(AMQSession.class); Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class)); - Mockito.when(session.getCoreServer()).thenReturn(Mockito.mock(ActiveMQServer.class)); + Mockito.when(session.getCoreServer()).thenReturn(coreServer); Mockito.when(session.getCoreSession()).thenReturn(coreSession); Mockito.when(session.convertWildcard(ArgumentMatchers.any(ActiveMQDestination.class))).thenReturn(""); @@ -81,7 +90,7 @@ public class AMQConsumerTest { Assert.assertTrue(consumer.hasCredits()); - consumer.handleDeliver(reference, message, 0); + consumer.handleDeliver(reference, message); Assert.assertFalse(consumer.hasCredits()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 4a88fccf76..a6aeeaa564 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -130,6 +130,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { protected ServerLocator[] locators; + protected boolean isForceUniqueStorageManagerIds() { + return true; + } + @Override @Before public void setUp() throws Exception { @@ -1934,7 +1938,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { log.debug("started server " + servers[node]); waitForServerToStart(servers[node]); - if (servers[node].getStorageManager() != null) { + if (servers[node].getStorageManager() != null && isForceUniqueStorageManagerIds()) { for (int i = 0; i < node * 1000; i++) { // it is common to have messages landing with similar IDs on separate nodes, which could hide a few issues. // so we make them unequal diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java index d83491a5e4..ceaad7b2e2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.artemis.tests.integration.openwire.cluster; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -35,7 +37,9 @@ import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import java.util.Collection; @@ -44,6 +48,12 @@ import java.util.concurrent.TimeUnit; public class MessageRedistributionTest extends ClusterTestBase { + @Override + protected boolean isForceUniqueStorageManagerIds() { + // we want to verify messageId uniqueness across brokers + return false; + } + @Test public void testRemoteConsumerClose() throws Exception { @@ -76,6 +86,98 @@ public class MessageRedistributionTest extends ClusterTestBase { } } + @Test + public void testFailoverNonClusteredBrokersInteropWithCoreProducer() throws Exception { + + setupServer(0, true, true); + setupServer(1, true, true); + + startServers(0, 1); + + servers[0].getAddressSettingsRepository().getMatch("#").setRedeliveryDelay(0).setRedistributionDelay(0); + servers[1].getAddressSettingsRepository().getMatch("#").setRedeliveryDelay(0).setRedistributionDelay(0); + + setupSessionFactory(0, true); + setupSessionFactory(1, true); + + createAddressInfo(0, "q", RoutingType.ANYCAST, -1, false); + createAddressInfo(1, "q", RoutingType.ANYCAST, -1, false); + createQueue(0, "q", "q", null, true, RoutingType.ANYCAST); + createQueue(1, "q", "q", null, true, RoutingType.ANYCAST); + + + final int numMessagesPerNode = 1000; + produceWithCoreTo(0, numMessagesPerNode); + produceWithCoreTo(1, numMessagesPerNode); + + // consume with openwire from both brokers which both start with journal id = 0, should be in lock step + + String zero = getServerUri(0); + String one = getServerUri(1); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + zero + "," + one + ")?jms.prefetchPolicy.all=10&randomize=false&timeout=400&reconnectDelay=500&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500&nested.wireFormat.maxInactivityDurationInitalDelay=500&nested.ignoreRemoteWireFormat=true&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400"); + factory.setWatchTopicAdvisories(false); + + CountDownLatch continueLatch = new CountDownLatch(1); + CountDownLatch received = new CountDownLatch(numMessagesPerNode * 2); + final Connection conn = factory.createConnection(); + conn.start(); + + ((ActiveMQConnection)conn).setClientInternalExceptionListener(Throwable::printStackTrace); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = ActiveMQDestination.createDestination("q", ActiveMQDestination.QUEUE_TYPE); + session.createConsumer(dest).setMessageListener(message -> { + try { + received.countDown(); + } catch (Exception exception) { + exception.printStackTrace(); + } + }); + + + assertTrue(Wait.waitFor(new org.apache.activemq.artemis.utils.Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return received.getCount() <= numMessagesPerNode; + } + })); + + // force a failover to the other broker + servers[0].stop(false, true); + + // get all the messages, our openwire audit does not detect any duplicate + assertTrue(Wait.waitFor(() -> { + return received.await(1, TimeUnit.SECONDS); + })); + + conn.close(); + } + + private void produceWithCoreTo(int serveId, final int numMessagesPerNode) throws Exception { + + String targetUrl = getServerUri(serveId); + Connection jmsConn = null; + try { + org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory coreCf = ActiveMQJMSClient.createConnectionFactory(targetUrl, "cf" + serveId); + jmsConn = coreCf.createConnection(); + jmsConn.setClientID("theProducer"); + Session coreSession = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TextMessage msg = coreSession.createTextMessage("TEXT"); + Queue queue = coreSession.createQueue("q"); + MessageProducer producer = coreSession.createProducer(queue); + for (int i = 0; i < numMessagesPerNode; i++) { + msg.setIntProperty("MM", i); + msg.setIntProperty("SN", serveId); + producer.send(msg); + } + } finally { + if (jmsConn != null) { + jmsConn.close(); + } + } + } + @Test public void testAdvisoriesNotClustered() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java index 869f6f2bda..28715b1744 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java @@ -250,10 +250,51 @@ public class GeneralInteropTest extends BasicOpenWireTest { } } + + @Test + public void testReceiveTwiceTheSameCoreMessage() throws Exception { + + final String text = "HelloAgain"; + sendMultipleTextMessagesUsingCoreJms(queueName, text, 1); + + String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.MaxInactivityDuration=5000)"; + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString); + Connection connection = connectionFactory.createConnection(); + try { + connection.setClientID("clientId"); + connection.start(); + + Message message = null; + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + message = consumer.receive(4000); + assertNotNull(message); + + String id1 = message.getJMSMessageID(); + consumer.close(); + + // consume again! + consumer = session.createConsumer(queue); + message = consumer.receive(4000); + assertNotNull(message); + + String id2 = message.getJMSMessageID(); + + assertEquals(id1, id2); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num) throws Exception { Connection jmsConn = null; try { jmsConn = coreCf.createConnection(); + jmsConn.setClientID("PROD"); Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); MessageProducer producer = session.createProducer(queue); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java index 2eaa41962f..1ffdfea67c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java @@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Assert; import org.junit.Test; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; public class UUIDGeneratorTest extends ActiveMQTestBase { @@ -42,6 +44,20 @@ public class UUIDGeneratorTest extends ActiveMQTestBase { assertEquals(javaId.toString(), nativeId.toString()); } + + @Test + public void testDifferentInTightLoop() throws Exception { + UUIDGenerator gen = UUIDGenerator.getInstance(); + + final int numIterations = 10000; + Set uuidSet = new HashSet<>(); + for (int i = 0; i < numIterations; i++) { + org.apache.activemq.artemis.utils.UUID nativeId = gen.generateUUID(); + uuidSet.add(nativeId); + } + assertEquals("All there", numIterations, uuidSet.size()); + } + @Test public void testGetHardwareAddress() throws Exception { byte[] bytes = UUIDGenerator.getHardwareAddress();