ARTEMIS-817 and ARTEMIS-818 openwire fixes
https://issues.apache.org/jira/browse/ARTEMIS-817 https://issues.apache.org/jira/browse/ARTEMIS-818 issues around Openwire protocol, sending a null stream maessage via openwire causes a null pointer and if a topic is auto created with openwire then it cant be destroyed as it checks for the management queue.
This commit is contained in:
parent
ad8919dc42
commit
1a4a148ba9
|
@ -815,7 +815,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
public void removeDestination(ActiveMQDestination dest) throws Exception {
|
||||
if (dest.isQueue()) {
|
||||
server.destroyQueue(OpenWireUtil.toCoreAddress(dest));
|
||||
try {
|
||||
server.destroyQueue(OpenWireUtil.toCoreAddress(dest));
|
||||
} catch (ActiveMQNonExistentQueueException neq) {
|
||||
//this is ok, ActiveMQ 5 allows this and will actually do it quite often
|
||||
ActiveMQServerLogger.LOGGER.debug("queue never existed");
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest));
|
||||
|
||||
|
|
|
@ -499,18 +499,21 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
}
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
|
||||
TypedProperties mapData = new TypedProperties();
|
||||
mapData.decode(buffer);
|
||||
//it could be a null map
|
||||
if (buffer.readableBytes() > 0) {
|
||||
mapData.decode(buffer);
|
||||
Map<String, Object> map = mapData.getMap();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
|
||||
OutputStream os = out;
|
||||
if (isCompressed) {
|
||||
os = new DeflaterOutputStream(os);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(os)) {
|
||||
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
|
||||
}
|
||||
bytes = out.toByteArray();
|
||||
}
|
||||
|
||||
Map<String, Object> map = mapData.getMap();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
|
||||
OutputStream os = out;
|
||||
if (isCompressed) {
|
||||
os = new DeflaterOutputStream(os);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(os)) {
|
||||
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
|
||||
}
|
||||
bytes = out.toByteArray();
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
|
||||
int len = buffer.readInt();
|
||||
bytes = new byte[len];
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
|
|||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
|
@ -127,6 +128,28 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendNullMapMessage() throws Exception {
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
System.out.println("Queue:" + queue);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
producer.send(session.createMapMessage());
|
||||
|
||||
Assert.assertNull(consumer.receive(100));
|
||||
connection.start();
|
||||
|
||||
MapMessage message = (MapMessage) consumer.receive(5000);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXASimple() throws Exception {
|
||||
XAConnection connection = xaFactory.createXAConnection();
|
||||
|
|
Loading…
Reference in New Issue