This commit is contained in:
Clebert Suconic 2019-03-08 12:12:09 -05:00
commit 7fceaeb6c0
9 changed files with 207 additions and 13 deletions

View File

@ -266,6 +266,15 @@ public interface Message {
return this;
}
default Object getCorrelationID() {
return null;
}
default Message setCorrelationID(Object correlationID) {
return this;
}
SimpleString getReplyTo();
Message setReplyTo(SimpleString address);

View File

@ -297,6 +297,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this.putIntProperty(Message.HDR_GROUP_SEQUENCE, sequence);
}
@Override
public Object getCorrelationID() {
return getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME);
}
@Override
public Message setCorrelationID(final Object correlationID) {
putObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME, correlationID);
return this;
}
/**
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core

View File

@ -1147,6 +1147,21 @@ public class AMQPMessage extends RefCountMessage {
}
}
@Override
public Object getCorrelationID() {
return properties != null ? properties.getCorrelationId() : null;
}
@Override
public org.apache.activemq.artemis.api.core.Message setCorrelationID(final Object correlationID) {
if (properties == null) {
properties = new Properties();
}
properties.setCorrelationId(correlationID);
return this;
}
@Override
public Long getScheduledDeliveryTime() {
if (scheduledTime < 0) {

View File

@ -325,8 +325,13 @@ public class AmqpCoreConverter {
if (properties.getReplyTo() != null) {
jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
}
if (properties.getCorrelationId() != null) {
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()));
Object correlationID = properties.getCorrelationId();
if (correlationID != null) {
try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
} catch (IllegalArgumentException e) {
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
}
}
if (properties.getContentType() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());

View File

@ -72,6 +72,7 @@ import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@ -177,14 +178,19 @@ public class CoreAmqpConverter {
properties.setReplyTo(toAddress(replyTo));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
String correlationId = message.getJMSCorrelationID();
if (correlationId != null) {
Object correlationID = message.getInnerMessage().getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
try {
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(c));
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setCorrelationId(correlationId);
properties.setCorrelationId(correlationID);
}
} else {
properties.setCorrelationId(correlationID);
}
long expiration = message.getJMSExpiration();
if (expiration != 0) {
long ttl = expiration - System.currentTimeMillis();

View File

@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Enumeration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -118,21 +117,29 @@ public class ServerJMSMessage implements Message {
@Override
public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
try {
MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
} catch (ActiveMQException e) {
throw new JMSException(e.getMessage());
if (correlationID == null || correlationID.length == 0) {
throw new JMSException("Please specify a non-zero length byte[]");
}
message.setCorrelationID(correlationID);
}
@Override
public final String getJMSCorrelationID() throws JMSException {
return MessageUtil.getJMSCorrelationID(message);
Object correlationID = message.getCorrelationID();
if (correlationID instanceof String) {
return ((String) correlationID);
} else if (correlationID != null) {
return String.valueOf(correlationID);
} else {
return null;
}
}
@Override
public final void setJMSCorrelationID(String correlationID) throws JMSException {
MessageUtil.setJMSCorrelationID(message, correlationID);
message.setCorrelationID(correlationID);
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.management.impl;
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
@ -390,6 +392,11 @@ public class ManagementServiceImpl implements ManagementService {
reply.setType(Message.TEXT_TYPE);
reply.setReplyTo(message.getReplyTo());
Object correlationID = getCorrelationIdentity(message);
if (correlationID != null) {
reply.setCorrelationID(correlationID);
}
String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
if (logger.isDebugEnabled()) {
logger.debug("handling management message for " + resourceName);
@ -781,5 +788,24 @@ public class ManagementServiceImpl implements ManagementService {
return result;
}
/**
* Correlate management responses using the Correlation ID Pattern, if the request supplied a correlation id,
* or fallback to the Message ID Pattern providing the request had a message id.
* @param request
* @return correlation identify
*/
private Object getCorrelationIdentity(final Message request) {
Object correlationId = request.getCorrelationID();
if (correlationId == null) {
// CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the
// change is a nice to have for my point of view. I suggested it for completeness. The application could
// always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part.
Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID);
correlationId = underlying == null ? null : String.valueOf(underlying);
}
return correlationId;
}
// Inner classes -------------------------------------------------
}

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -27,6 +29,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
@ -39,6 +43,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
public class AmqpManagementTest extends AmqpClientTestSupport {
private static final Binary BINARY_CORRELATION_ID = new Binary("mystring".getBytes(StandardCharsets.UTF_8));
@Test(timeout = 60000)
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = createAmqpClient();
@ -101,4 +107,67 @@ public class AmqpManagementTest extends AmqpClientTestSupport {
msg = createMapMessage(1, map, null);
assertEquals(msg.getByte("sequence"), sequence);
}
@Test(timeout = 60000)
public void testCorrelationByMessageIDUUID() throws Throwable {
doTestReplyCorrelation(UUID.randomUUID(), false);
}
@Test(timeout = 60000)
public void testCorrelationByMessageIDString() throws Throwable {
doTestReplyCorrelation("mystring", false);
}
@Test(timeout = 60000)
public void testCorrelationByMessageIDBinary() throws Throwable {
doTestReplyCorrelation(BINARY_CORRELATION_ID, false);
}
@Test(timeout = 60000)
public void testCorrelationByCorrelationIDUUID() throws Throwable {
doTestReplyCorrelation(UUID.randomUUID(), true);
}
@Test(timeout = 60000)
public void testCorrelationByCorrelationIDString() throws Throwable {
doTestReplyCorrelation("mystring", true);
}
@Test(timeout = 60000)
public void testCorrelationByCorrelationIDBinary() throws Throwable {
doTestReplyCorrelation(BINARY_CORRELATION_ID, true);
}
private void doTestReplyCorrelation(final Object correlationId, final boolean sendCorrelAsCorrelation) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
String destinationAddress = getQueueName(1);
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);
// Create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setReplyToAddress(destinationAddress);
if (sendCorrelAsCorrelation) {
request.setRawCorrelationId(correlationId);
} else {
request.setRawMessageId(correlationId);
}
request.setText("[]");
sender.send(request);
AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(correlationId, response.getRawCorrelationId());
response.accept();
} finally {
connection.close();
}
}
}

View File

@ -31,10 +31,13 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Test;
@ -153,6 +156,49 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
Assert.assertEquals(queue.getName().toString(), queueControl.getName());
}
@Test
public void testCorrelateResponseByCorrelationID() throws Exception {
String queue = RandomUtil.randomString();
String address = RandomUtil.randomString();
String correlationID = UUIDGenerator.getInstance().generateStringUUID();
Configuration config = createBasicConfig().setJMXManagementEnabled(false);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();
// invoke attribute and operation on the server
CoreMessage message = new CoreMessage(1, 100);
MessageUtil.setJMSCorrelationID(message, correlationID);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "createQueue", queue, address);
Message reply = server.getManagementService().handleMessage(message);
Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
Assert.assertEquals(correlationID, MessageUtil.getJMSCorrelationID(reply));
}
@Test
public void testCorrelateResponseByMessageID() throws Exception {
String queue = RandomUtil.randomString();
String address = RandomUtil.randomString();
UUID messageId = UUIDGenerator.getInstance().generateUUID();
Configuration config = createBasicConfig().setJMXManagementEnabled(false);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();
// invoke attribute and operation on the server
CoreMessage message = new CoreMessage(1, 100);
message.setUserID(messageId);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "createQueue", queue, address);
Message reply = server.getManagementService().handleMessage(message);
Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
Assert.assertEquals(messageId.toString(), MessageUtil.getJMSCorrelationID(reply));
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------