[AMQ-7068] Advisory messages are empty when received with a AMQP subscription (#312)

This commit is contained in:
Johannes Bäurle 2019-11-17 17:40:15 +01:00 committed by Jean-Baptiste Onofré
parent 057b950485
commit 6f338aa281
2 changed files with 127 additions and 1 deletions

View File

@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.JMSException;
@ -67,7 +68,11 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.TypeConversionSupport;
@ -333,6 +338,15 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
apMap = new HashMap<>();
}
apMap.put(key, value);
int messageType = message.getDataStructureType();
if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
// Type of command to recognize advisory message
Object data = message.getDataStructure();
if(data != null) {
apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName());
}
}
}
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
@ -376,7 +390,39 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
int messageType = message.getDataStructureType();
if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
Object data = message.getDataStructure();
if (data instanceof ConnectionInfo) {
ConnectionInfo connectionInfo = (ConnectionInfo)data;
final HashMap<String, Object> connectionMap = new LinkedHashMap<String, Object>();
connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue());
connectionMap.put("ClientId", connectionInfo.getClientId());
connectionMap.put("ClientIp", connectionInfo.getClientIp());
connectionMap.put("UserName", connectionInfo.getUserName());
connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector());
connectionMap.put("Manageable", connectionInfo.isManageable());
connectionMap.put("ClientMaster", connectionInfo.isClientMaster());
connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant());
connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect());
body = new AmqpValue(connectionMap);
} else if (data instanceof RemoveInfo) {
RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure();
final HashMap<String, Object> removeMap = new LinkedHashMap<String, Object>();
if (removeInfo.isConnectionRemove()) {
removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue());
} else if (removeInfo.isConsumerRemove()) {
removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId());
}
body = new AmqpValue(removeMap);
}
} else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
if (payload == null) {

View File

@ -39,6 +39,7 @@ import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -57,6 +58,9 @@ import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@ -771,6 +775,82 @@ public class JMSMappingOutboundTransformerTest {
String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
@Test
public void testConvertConnectionInfo() throws Exception {
String connectionId = "myConnectionId";
String clientId = "myClientId";
ConnectionInfo dataStructure = new ConnectionInfo();
dataStructure.setConnectionId(new ConnectionId(connectionId));
dataStructure.setClientId(clientId);
ActiveMQMessage outbound = createMessage();
Map<String, String> properties = new HashMap<String, String>();
properties.put("originUrl", "localhost");
outbound.setProperties(properties);
outbound.setDataStructure(dataStructure);
outbound.onSend();
outbound.storeContent();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
EncodedMessage encoded = transformer.transform(outbound);
assertNotNull(encoded);
Message amqp = encoded.decode();
assertNotNull(amqp.getApplicationProperties());
Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
assertEquals(ConnectionInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
assertTrue(clientId.equals(amqpMap.get("ClientId")));
}
@Test
public void testConvertRemoveInfo() throws Exception {
String connectionId = "myConnectionId";
RemoveInfo dataStructure = new RemoveInfo(new ConnectionId(connectionId));
ActiveMQMessage outbound = createMessage();
Map<String, String> properties = new HashMap<String, String>();
properties.put("originUrl", "localhost");
outbound.setProperties(properties);
outbound.setDataStructure(dataStructure);
outbound.onSend();
outbound.storeContent();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
EncodedMessage encoded = transformer.transform(outbound);
assertNotNull(encoded);
Message amqp = encoded.decode();
assertNotNull(amqp.getApplicationProperties());
Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
assertEquals(RemoveInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
}
//----- Test JMSDestination Handling -------------------------------------//