ARTEMIS-3323 - ensure openwire message id is unique and consistent for the life of a broker when converted from core

This commit is contained in:
gtully 2021-06-01 12:16:29 +01:00 committed by Gary Tully
parent 58e59ef679
commit 7ce9030e9f
9 changed files with 292 additions and 44 deletions

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQBytesMessage;
@ -87,8 +88,6 @@ public final class OpenWireMessageConverter {
private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER"); private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE;
private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID"); private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
@ -131,7 +130,7 @@ public final class OpenWireMessageConverter {
} else if (contents != null) { } else if (contents != null) {
final boolean messageCompressed = messageSend.isCompressed(); final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) { if (messageCompressed) {
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, true);
} }
switch (coreType) { switch (coreType) {
@ -468,7 +467,7 @@ public final class OpenWireMessageConverter {
SimpleString key = new SimpleString(entry.getKey()); SimpleString key = new SimpleString(entry.getKey());
Object value = entry.getValue(); Object value = entry.getValue();
if (value instanceof UTF8Buffer) { if (value instanceof UTF8Buffer) {
value = ((UTF8Buffer) value).toString(); value = value.toString();
} }
TypedProperties.setObjectProperty(key, value, props); TypedProperties.setObjectProperty(key, value, props);
} }
@ -498,8 +497,8 @@ public final class OpenWireMessageConverter {
public static MessageDispatch createMessageDispatch(MessageReference reference, public static MessageDispatch createMessageDispatch(MessageReference reference,
ICoreMessage message, ICoreMessage message,
WireFormat marshaller, WireFormat marshaller,
AMQConsumer consumer) throws IOException { AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer); ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer, serverNodeUUID);
//we can use core message id for sequenceId //we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@ -526,11 +525,11 @@ public final class OpenWireMessageConverter {
private static ActiveMQMessage toAMQMessage(MessageReference reference, private static ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage, ICoreMessage coreMessage,
WireFormat marshaller, WireFormat marshaller,
AMQConsumer consumer) throws IOException { AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
final ActiveMQMessage amqMsg; final ActiveMQMessage amqMsg;
final byte coreType = coreMessage.getType(); final byte coreType = coreMessage.getType();
final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); final boolean isCompressed = compressProp != null && compressProp;
final byte[] bytes; final byte[] bytes;
final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
buffer.resetReaderIndex(); buffer.resetReaderIndex();
@ -591,12 +590,12 @@ public final class OpenWireMessageConverter {
amqMsg.setArrival(arrival); amqMsg.setArrival(arrival);
final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
if (brokerPath != null && brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { if (brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) {
setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString()); setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString());
} }
final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
if (clusterPath != null && clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { if (clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) {
setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString()); setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString());
} }
@ -626,20 +625,15 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupSequence(coreMessage.getGroupSequence()); amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID);
final MessageId mid; final MessageId mid;
final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
if (midBytes != null) { if (midBytes != null) {
ByteSequence midSeq = new ByteSequence(midBytes); ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq); mid = (MessageId) marshaller.unmarshal(midSeq);
} else { } else {
final SimpleString connectionId = (SimpleString) coreMessage.getObjectProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); //JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id)
if (connectionId != null) { String midd = "ID:" + serverNodeUUID + ":-1:-1:-1";
mid = new MessageId("ID:" + connectionId.toString() + ":-1:-1:-1", coreMessage.getMessageID()); mid = new MessageId(midd, coreMessage.getMessageID());
} else {
//JMSMessageID should be started with "ID:"
String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
mid = new MessageId(midd);
}
} }
amqMsg.setMessageId(mid); amqMsg.setMessageId(mid);
@ -673,7 +667,7 @@ public final class OpenWireMessageConverter {
} }
final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID); final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
if (userId != null && userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { if (userId instanceof SimpleString && ((SimpleString)userId).length() > 0) {
amqMsg.setUserID(((SimpleString)userId).toString()); amqMsg.setUserID(((SimpleString)userId).toString());
} }
@ -694,7 +688,7 @@ public final class OpenWireMessageConverter {
final Set<SimpleString> props = coreMessage.getPropertyNames(); final Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) { if (props != null) {
setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer); setAMQMsgObjectProperties(amqMsg, coreMessage, props);
} }
if (bytes != null) { if (bytes != null) {
@ -945,8 +939,7 @@ public final class OpenWireMessageConverter {
private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg, private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
final ICoreMessage coreMessage, final ICoreMessage coreMessage,
final Set<SimpleString> props, final Set<SimpleString> props) throws IOException {
final AMQConsumer consumer) throws IOException {
for (SimpleString s : props) { for (SimpleString s : props) {
final String keyStr = s.toString(); final String keyStr = s.toString();
if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.protocol.openwire.amq; package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -60,10 +59,8 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer { public class AMQConsumer {
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private final AMQSession session; private final AMQSession session;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination; private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final boolean hasNotificationDestination;
private final ConsumerInfo info; private final ConsumerInfo info;
private final ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer; private ServerConsumer serverConsumer;
@ -85,7 +82,6 @@ public class AMQConsumer {
boolean internalAddress) { boolean internalAddress) {
this.session = amqSession; this.session = amqSession;
this.openwireDestination = d; this.openwireDestination = d;
this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
this.info = info; this.info = info;
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize(); this.prefetchSize = info.getPrefetchSize();
@ -132,7 +128,7 @@ public class AMQConsumer {
preAck = true; preAck = true;
} }
String id = info.getClientId() != null ? info.getClientId() : this.getId().getConnectionId(); String id = info.getClientId() != null ? info.getClientId() : this.getId().getConnectionId();
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + id + "'"; String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME + "<>'" + id + "'";
if (selector == null) { if (selector == null) {
selector = new SimpleString(noLocalSelector); selector = new SimpleString(noLocalSelector);
} else { } else {
@ -250,7 +246,7 @@ public class AMQConsumer {
} }
} }
public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) { public int handleDeliver(MessageReference reference, ICoreMessage message) {
MessageDispatch dispatch; MessageDispatch dispatch;
try { try {
MessagePullHandler pullHandler = messagePullHandler.get(); MessagePullHandler pullHandler = messagePullHandler.get();
@ -264,15 +260,12 @@ public class AMQConsumer {
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
} }
//handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat() //handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat()
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this); dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID());
int size = dispatch.getMessage().getSize(); int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId()); reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch); session.deliverMessage(dispatch);
currentWindow.decrementAndGet(); currentWindow.decrementAndGet();
return size; return size;
} catch (IOException e) {
ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", e);
return 0;
} catch (Throwable t) { } catch (Throwable t) {
ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", t); ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", t);
return 0; return 0;
@ -399,9 +392,6 @@ public class AMQConsumer {
serverConsumer.close(false); serverConsumer.close(false);
} }
public boolean hasNotificationDestination() {
return hasNotificationDestination;
}
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() { public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
return openwireDestination; return openwireDestination;
@ -488,12 +478,11 @@ public class AMQConsumer {
} }
} }
public boolean removeRolledback(MessageReference messageReference) { public void removeRolledback(MessageReference messageReference) {
final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs(); final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
if (rolledbackMessageRefs == null) { if (rolledbackMessageRefs != null) {
return false; rolledbackMessageRefs.remove(messageReference);
} }
return rolledbackMessageRefs.remove(messageReference);
} }
public void addRolledback(MessageReference messageReference) { public void addRolledback(MessageReference messageReference) {

View File

@ -311,7 +311,7 @@ public class AMQSession implements SessionCallback {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
//clear up possible rolledback ids. //clear up possible rolledback ids.
theConsumer.removeRolledback(reference); theConsumer.removeRolledback(reference);
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); return theConsumer.handleDeliver(reference, message.toCore());
} }
@Override @Override

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
public class OpenWireMessageConverterTest {
final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory();
final WireFormat openWireFormat = formatFactory.createWireFormat();
final byte[] content = new byte[] {'a','a'};
final String address = "Q";
final ActiveMQDestination destination = new ActiveMQQueue(address);
final UUID nodeUUID = UUIDGenerator.getInstance().generateUUID();
@Test
public void createMessageDispatch() throws Exception {
ActiveMQMessageAuditNoSync mqMessageAuditNoSync = new ActiveMQMessageAuditNoSync();
for (int i = 0; i < 10; i++) {
ICoreMessage msg = new CoreMessage().initBuffer(100);
msg.setMessageID(i);
msg.getBodyBuffer().writeBytes(content);
msg.setAddress(address);
MessageReference messageReference = new MessageReferenceImpl(msg, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID);
MessageId messageId = dispatch.getMessage().getMessageId();
assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
}
for (int i = 10; i < 20; i++) {
CoreMessage msg = new CoreMessage().initBuffer(100);
msg.setMessageID(i);
msg.getBodyBuffer().writeBytes(content);
msg.setAddress(address);
// share a connection id
msg.getProperties().putProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, "MyClient");
MessageReference messageReference = new MessageReferenceImpl(msg, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID);
MessageId messageId = dispatch.getMessage().getMessageId();
assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
}
}
}

View File

@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -55,6 +58,12 @@ public class AMQConsumerTest {
} }
private AMQConsumer getConsumer(int prefetchSize) throws Exception { private AMQConsumer getConsumer(int prefetchSize) throws Exception {
UUID nodeId = UUIDGenerator.getInstance().generateUUID();
ActiveMQServer coreServer = Mockito.mock(ActiveMQServer.class);
NodeManager nodeManager = Mockito.mock(NodeManager.class);
Mockito.when(coreServer.getNodeManager()).thenReturn(nodeManager);
Mockito.when(nodeManager.getUUID()).thenReturn(nodeId);
ServerSession coreSession = Mockito.mock(ServerSession.class); ServerSession coreSession = Mockito.mock(ServerSession.class);
Mockito.when(coreSession.createConsumer(ArgumentMatchers.anyLong(), ArgumentMatchers.nullable(SimpleString.class), Mockito.when(coreSession.createConsumer(ArgumentMatchers.anyLong(), ArgumentMatchers.nullable(SimpleString.class),
ArgumentMatchers.nullable(SimpleString.class), ArgumentMatchers.anyInt(), ArgumentMatchers.nullable(SimpleString.class), ArgumentMatchers.anyInt(),
@ -62,7 +71,7 @@ public class AMQConsumerTest {
ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class)); ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class));
AMQSession session = Mockito.mock(AMQSession.class); AMQSession session = Mockito.mock(AMQSession.class);
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class)); Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
Mockito.when(session.getCoreServer()).thenReturn(Mockito.mock(ActiveMQServer.class)); Mockito.when(session.getCoreServer()).thenReturn(coreServer);
Mockito.when(session.getCoreSession()).thenReturn(coreSession); Mockito.when(session.getCoreSession()).thenReturn(coreSession);
Mockito.when(session.convertWildcard(ArgumentMatchers.any(ActiveMQDestination.class))).thenReturn(""); Mockito.when(session.convertWildcard(ArgumentMatchers.any(ActiveMQDestination.class))).thenReturn("");
@ -81,7 +90,7 @@ public class AMQConsumerTest {
Assert.assertTrue(consumer.hasCredits()); Assert.assertTrue(consumer.hasCredits());
consumer.handleDeliver(reference, message, 0); consumer.handleDeliver(reference, message);
Assert.assertFalse(consumer.hasCredits()); Assert.assertFalse(consumer.hasCredits());

View File

@ -130,6 +130,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
protected ServerLocator[] locators; protected ServerLocator[] locators;
protected boolean isForceUniqueStorageManagerIds() {
return true;
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -1934,7 +1938,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
log.debug("started server " + servers[node]); log.debug("started server " + servers[node]);
waitForServerToStart(servers[node]); waitForServerToStart(servers[node]);
if (servers[node].getStorageManager() != null) { if (servers[node].getStorageManager() != null && isForceUniqueStorageManagerIds()) {
for (int i = 0; i < node * 1000; i++) { for (int i = 0; i < node * 1000; i++) {
// it is common to have messages landing with similar IDs on separate nodes, which could hide a few issues. // it is common to have messages landing with similar IDs on separate nodes, which could hide a few issues.
// so we make them unequal // so we make them unequal

View File

@ -16,9 +16,11 @@
*/ */
package org.apache.activemq.artemis.tests.integration.openwire.cluster; package org.apache.activemq.artemis.tests.integration.openwire.cluster;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -35,7 +37,9 @@ import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import java.util.Collection; import java.util.Collection;
@ -44,6 +48,12 @@ import java.util.concurrent.TimeUnit;
public class MessageRedistributionTest extends ClusterTestBase { public class MessageRedistributionTest extends ClusterTestBase {
@Override
protected boolean isForceUniqueStorageManagerIds() {
// we want to verify messageId uniqueness across brokers
return false;
}
@Test @Test
public void testRemoteConsumerClose() throws Exception { public void testRemoteConsumerClose() throws Exception {
@ -76,6 +86,98 @@ public class MessageRedistributionTest extends ClusterTestBase {
} }
} }
@Test
public void testFailoverNonClusteredBrokersInteropWithCoreProducer() throws Exception {
setupServer(0, true, true);
setupServer(1, true, true);
startServers(0, 1);
servers[0].getAddressSettingsRepository().getMatch("#").setRedeliveryDelay(0).setRedistributionDelay(0);
servers[1].getAddressSettingsRepository().getMatch("#").setRedeliveryDelay(0).setRedistributionDelay(0);
setupSessionFactory(0, true);
setupSessionFactory(1, true);
createAddressInfo(0, "q", RoutingType.ANYCAST, -1, false);
createAddressInfo(1, "q", RoutingType.ANYCAST, -1, false);
createQueue(0, "q", "q", null, true, RoutingType.ANYCAST);
createQueue(1, "q", "q", null, true, RoutingType.ANYCAST);
final int numMessagesPerNode = 1000;
produceWithCoreTo(0, numMessagesPerNode);
produceWithCoreTo(1, numMessagesPerNode);
// consume with openwire from both brokers which both start with journal id = 0, should be in lock step
String zero = getServerUri(0);
String one = getServerUri(1);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + zero + "," + one + ")?jms.prefetchPolicy.all=10&randomize=false&timeout=400&reconnectDelay=500&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500&nested.wireFormat.maxInactivityDurationInitalDelay=500&nested.ignoreRemoteWireFormat=true&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400");
factory.setWatchTopicAdvisories(false);
CountDownLatch continueLatch = new CountDownLatch(1);
CountDownLatch received = new CountDownLatch(numMessagesPerNode * 2);
final Connection conn = factory.createConnection();
conn.start();
((ActiveMQConnection)conn).setClientInternalExceptionListener(Throwable::printStackTrace);
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = ActiveMQDestination.createDestination("q", ActiveMQDestination.QUEUE_TYPE);
session.createConsumer(dest).setMessageListener(message -> {
try {
received.countDown();
} catch (Exception exception) {
exception.printStackTrace();
}
});
assertTrue(Wait.waitFor(new org.apache.activemq.artemis.utils.Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return received.getCount() <= numMessagesPerNode;
}
}));
// force a failover to the other broker
servers[0].stop(false, true);
// get all the messages, our openwire audit does not detect any duplicate
assertTrue(Wait.waitFor(() -> {
return received.await(1, TimeUnit.SECONDS);
}));
conn.close();
}
private void produceWithCoreTo(int serveId, final int numMessagesPerNode) throws Exception {
String targetUrl = getServerUri(serveId);
Connection jmsConn = null;
try {
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory coreCf = ActiveMQJMSClient.createConnectionFactory(targetUrl, "cf" + serveId);
jmsConn = coreCf.createConnection();
jmsConn.setClientID("theProducer");
Session coreSession = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TextMessage msg = coreSession.createTextMessage("TEXT");
Queue queue = coreSession.createQueue("q");
MessageProducer producer = coreSession.createProducer(queue);
for (int i = 0; i < numMessagesPerNode; i++) {
msg.setIntProperty("MM", i);
msg.setIntProperty("SN", serveId);
producer.send(msg);
}
} finally {
if (jmsConn != null) {
jmsConn.close();
}
}
}
@Test @Test
public void testAdvisoriesNotClustered() throws Exception { public void testAdvisoriesNotClustered() throws Exception {

View File

@ -250,10 +250,51 @@ public class GeneralInteropTest extends BasicOpenWireTest {
} }
} }
@Test
public void testReceiveTwiceTheSameCoreMessage() throws Exception {
final String text = "HelloAgain";
sendMultipleTextMessagesUsingCoreJms(queueName, text, 1);
String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.MaxInactivityDuration=5000)";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString);
Connection connection = connectionFactory.createConnection();
try {
connection.setClientID("clientId");
connection.start();
Message message = null;
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
message = consumer.receive(4000);
assertNotNull(message);
String id1 = message.getJMSMessageID();
consumer.close();
// consume again!
consumer = session.createConsumer(queue);
message = consumer.receive(4000);
assertNotNull(message);
String id2 = message.getJMSMessageID();
assertEquals(id1, id2);
} finally {
if (connection != null) {
connection.close();
}
}
}
private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num) throws Exception { private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num) throws Exception {
Connection jmsConn = null; Connection jmsConn = null;
try { try {
jmsConn = coreCf.createConnection(); jmsConn = coreCf.createConnection();
jmsConn.setClientID("PROD");
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName); Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);

View File

@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
public class UUIDGeneratorTest extends ActiveMQTestBase { public class UUIDGeneratorTest extends ActiveMQTestBase {
@ -42,6 +44,20 @@ public class UUIDGeneratorTest extends ActiveMQTestBase {
assertEquals(javaId.toString(), nativeId.toString()); assertEquals(javaId.toString(), nativeId.toString());
} }
@Test
public void testDifferentInTightLoop() throws Exception {
UUIDGenerator gen = UUIDGenerator.getInstance();
final int numIterations = 10000;
Set<org.apache.activemq.artemis.utils.UUID> uuidSet = new HashSet<>();
for (int i = 0; i < numIterations; i++) {
org.apache.activemq.artemis.utils.UUID nativeId = gen.generateUUID();
uuidSet.add(nativeId);
}
assertEquals("All there", numIterations, uuidSet.size());
}
@Test @Test
public void testGetHardwareAddress() throws Exception { public void testGetHardwareAddress() throws Exception {
byte[] bytes = UUIDGenerator.getHardwareAddress(); byte[] bytes = UUIDGenerator.getHardwareAddress();