From 258a66652ac16f31bd957575b1078fb344af44aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20T=C3=A9treault?= Date: Wed, 12 Oct 2022 19:01:06 -0700 Subject: [PATCH] Fix serialization of RemoveInfo advisory message for AMQP consumers (cherry picked from commit e0a37a5c304367e693d180b014f679ad68ca0b80) --- .../JMSMappingOutboundTransformer.java | 2 +- .../transport/amqp/AmqpAdvisoryTest.java | 78 +++++++++++++++++++ .../transport/amqp/AmqpTestSupport.java | 3 +- 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 67d034447f..dbae00f992 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -417,7 +417,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue()); removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId()); removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId()); - removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId()); + removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId().getValue()); } body = new AmqpValue(removeMap); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java new file mode 100644 index 0000000000..5b32c54d99 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.amqp; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AmqpAdvisoryTest extends AmqpTestSupport { + protected Connection connection1; + protected Connection connection2; + + @Override + public void setUp() throws Exception { + advisorySupport = true; + super.setUp(); + } + + @Test() + public void testConnectionAdvisory() throws Exception { + connection1 = createAmqpConnection(); + connection1.start(); + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination advisoryTopic = session1.createTopic("ActiveMQ.Advisory.Consumer.Queue.workshop.queueA"); + MessageConsumer advisoryTopicConsumer = session1.createConsumer(advisoryTopic); + + + connection2 = createAmqpConnection(); + connection2.start(); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session2.createQueue("workshop.queueA"); + session2.createConsumer(queue); + + Message connectMessage = advisoryTopicConsumer.receive(100); + assertNotNull(connectMessage); + assertEquals("ConsumerInfo", connectMessage.getStringProperty("ActiveMqDataStructureType")); + + connection2.close(); + + Message removeMessage = advisoryTopicConsumer.receive(100); + assertNotNull(removeMessage); + assertEquals("RemoveInfo", removeMessage.getStringProperty("ActiveMqDataStructureType")); + connection1.close(); + } + + public Connection createAmqpConnection() throws JMSException { + final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI); + final Connection connection = factory.createConnection(); + connection.setExceptionListener(Throwable::printStackTrace); + connection.start(); + return connection; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 35f607fa72..b25e802fce 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -71,6 +71,7 @@ public class AmqpTestSupport { protected Vector exceptions = new Vector<>(); protected int numberOfMessages; + protected boolean advisorySupport = false; protected URI amqpURI; protected int amqpPort; protected URI amqpSslURI; @@ -118,7 +119,7 @@ public class AmqpTestSupport { brokerService.setPersistenceAdapter(kaha); } brokerService.setSchedulerSupport(isSchedulerEnabled()); - brokerService.setAdvisorySupport(false); + brokerService.setAdvisorySupport(advisorySupport); brokerService.setUseJmx(isUseJmx()); brokerService.getManagementContext().setCreateConnector(false);