Fix serialization of RemoveInfo advisory message for AMQP consumers

This commit is contained in:
Lucas Tétreault 2022-10-12 19:01:06 -07:00
parent 84c193df81
commit e0a37a5c30
3 changed files with 81 additions and 2 deletions

View File

@ -417,7 +417,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue()); removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId()); removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId()); removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId()); removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId().getValue());
} }
body = new AmqpValue(removeMap); body = new AmqpValue(removeMap);

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class AmqpAdvisoryTest extends AmqpTestSupport {
protected Connection connection1;
protected Connection connection2;
@Override
public void setUp() throws Exception {
advisorySupport = true;
super.setUp();
}
@Test()
public void testConnectionAdvisory() throws Exception {
connection1 = createAmqpConnection();
connection1.start();
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination advisoryTopic = session1.createTopic("ActiveMQ.Advisory.Consumer.Queue.workshop.queueA");
MessageConsumer advisoryTopicConsumer = session1.createConsumer(advisoryTopic);
connection2 = createAmqpConnection();
connection2.start();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session2.createQueue("workshop.queueA");
session2.createConsumer(queue);
Message connectMessage = advisoryTopicConsumer.receive(100);
assertNotNull(connectMessage);
assertEquals("ConsumerInfo", connectMessage.getStringProperty("ActiveMqDataStructureType"));
connection2.close();
Message removeMessage = advisoryTopicConsumer.receive(100);
assertNotNull(removeMessage);
assertEquals("RemoveInfo", removeMessage.getStringProperty("ActiveMqDataStructureType"));
connection1.close();
}
public Connection createAmqpConnection() throws JMSException {
final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
final Connection connection = factory.createConnection();
connection.setExceptionListener(Throwable::printStackTrace);
connection.start();
return connection;
}
}

View File

@ -71,6 +71,7 @@ public class AmqpTestSupport {
protected Vector<Throwable> exceptions = new Vector<>(); protected Vector<Throwable> exceptions = new Vector<>();
protected int numberOfMessages; protected int numberOfMessages;
protected boolean advisorySupport = false;
protected URI amqpURI; protected URI amqpURI;
protected int amqpPort; protected int amqpPort;
protected URI amqpSslURI; protected URI amqpSslURI;
@ -118,7 +119,7 @@ public class AmqpTestSupport {
brokerService.setPersistenceAdapter(kaha); brokerService.setPersistenceAdapter(kaha);
} }
brokerService.setSchedulerSupport(isSchedulerEnabled()); brokerService.setSchedulerSupport(isSchedulerEnabled());
brokerService.setAdvisorySupport(false); brokerService.setAdvisorySupport(advisorySupport);
brokerService.setUseJmx(isUseJmx()); brokerService.setUseJmx(isUseJmx());
brokerService.getManagementContext().setCreateConnector(false); brokerService.getManagementContext().setCreateConnector(false);