diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index ffe9ccc33a..67d034447f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import javax.jms.JMSException; @@ -67,7 +68,11 @@ import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQStreamMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.transport.amqp.AmqpProtocolException; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.TypeConversionSupport; @@ -333,6 +338,15 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { apMap = new HashMap<>(); } apMap.put(key, value); + + int messageType = message.getDataStructureType(); + if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) { + // Type of command to recognize advisory message + Object data = message.getDataStructure(); + if(data != null) { + apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName()); + } + } } final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); @@ -376,7 +390,39 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { int messageType = message.getDataStructureType(); - if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) { + if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) { + Object data = message.getDataStructure(); + if (data instanceof ConnectionInfo) { + ConnectionInfo connectionInfo = (ConnectionInfo)data; + final HashMap connectionMap = new LinkedHashMap(); + + connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue()); + connectionMap.put("ClientId", connectionInfo.getClientId()); + connectionMap.put("ClientIp", connectionInfo.getClientIp()); + connectionMap.put("UserName", connectionInfo.getUserName()); + connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector()); + connectionMap.put("Manageable", connectionInfo.isManageable()); + connectionMap.put("ClientMaster", connectionInfo.isClientMaster()); + connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant()); + connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect()); + + body = new AmqpValue(connectionMap); + } else if (data instanceof RemoveInfo) { + RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure(); + final HashMap removeMap = new LinkedHashMap(); + + if (removeInfo.isConnectionRemove()) { + removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue()); + } else if (removeInfo.isConsumerRemove()) { + removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue()); + removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId()); + removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId()); + removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId()); + } + + body = new AmqpValue(removeMap); + } + } else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) { Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message); if (payload == null) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java index e9da2613e4..1d3adea1bf 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java @@ -39,6 +39,7 @@ import java.io.ObjectInputStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -57,6 +58,9 @@ import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -771,6 +775,82 @@ public class JMSMappingOutboundTransformerTest { String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); assertEquals(contentString, contents); } + + @Test + public void testConvertConnectionInfo() throws Exception { + String connectionId = "myConnectionId"; + String clientId = "myClientId"; + + ConnectionInfo dataStructure = new ConnectionInfo(); + dataStructure.setConnectionId(new ConnectionId(connectionId)); + dataStructure.setClientId(clientId); + + ActiveMQMessage outbound = createMessage(); + Map properties = new HashMap(); + properties.put("originUrl", "localhost"); + outbound.setProperties(properties); + outbound.setDataStructure(dataStructure); + outbound.onSend(); + outbound.storeContent(); + + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); + + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getApplicationProperties()); + + Map apMap = amqp.getApplicationProperties().getValue(); + assertEquals(ConnectionInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType")); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map amqpMap = (Map) ((AmqpValue) amqp.getBody()).getValue(); + + assertTrue(connectionId.equals(amqpMap.get("ConnectionId"))); + assertTrue(clientId.equals(amqpMap.get("ClientId"))); + } + + @Test + public void testConvertRemoveInfo() throws Exception { + String connectionId = "myConnectionId"; + + RemoveInfo dataStructure = new RemoveInfo(new ConnectionId(connectionId)); + + ActiveMQMessage outbound = createMessage(); + Map properties = new HashMap(); + properties.put("originUrl", "localhost"); + outbound.setProperties(properties); + outbound.setDataStructure(dataStructure); + outbound.onSend(); + outbound.storeContent(); + + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); + + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getApplicationProperties()); + + Map apMap = amqp.getApplicationProperties().getValue(); + assertEquals(RemoveInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType")); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map amqpMap = (Map) ((AmqpValue) amqp.getBody()).getValue(); + + assertTrue(connectionId.equals(amqpMap.get("ConnectionId"))); + } //----- Test JMSDestination Handling -------------------------------------//