diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java index 39e0df5bf3..c8fb00306b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java @@ -62,13 +62,15 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectM ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectOutputStream ous = new ObjectOutputStream(out); ous.writeObject(object); - getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray()); + byte[] src = out.toByteArray(); + getInnerMessage().getBodyBuffer().writeInt(src.length); + getInnerMessage().getBodyBuffer().writeBytes(src); } @Override public void decode() throws Exception { super.decode(); - int size = getInnerMessage().getBodyBuffer().readableBytes(); + int size = getInnerMessage().getBodyBuffer().readInt(); byte[] bytes = new byte[size]; getInnerMessage().getBodyBuffer().readBytes(bytes); ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes)); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 42ee9f2811..40cbf79218 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -279,7 +279,14 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { if (apMap == null) { apMap = new HashMap(); } - apMap.put(key, msg.getObjectProperty(key)); + Object objectProperty = msg.getObjectProperty(key); + if (objectProperty instanceof byte[]) { + Binary binary = new Binary((byte[]) objectProperty); + apMap.put(key, binary); + } + else { + apMap.put(key, objectProperty); + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 19524b0429..27a533ad40 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -97,7 +97,9 @@ public class TestConversions extends Assert { ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectOutputStream ois = new ObjectOutputStream(out); ois.writeObject(new ABadClass()); - serverMessage.getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray()); + byte[] src = out.toByteArray(); + serverMessage.getInnerMessage().getBodyBuffer().writeInt(src.length); + serverMessage.getInnerMessage().getBodyBuffer().writeBytes(src); try { converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 53e676f034..9ed8aaae39 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -495,6 +495,29 @@ public class ProtonTest extends ProtonTestBase { Assert.assertEquals(q.getMessageCount(), 10); } + @Test + public void testObjectMessage() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + MessageProducer p = session.createProducer(queue); + ArrayList list = new ArrayList(); + list.add("aString"); + ObjectMessage objectMessage = session.createObjectMessage(list); + p.send(objectMessage); + session.close(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + objectMessage = (ObjectMessage) cons.receive(5000); + assertNotNull(objectMessage); + list = (ArrayList) objectMessage.getObject(); + assertEquals(list.get(0), "aString"); + connection.close(); + } + @Test public void testResourceLimitExceptionOnAddressFull() throws Exception { setAddressFullBlockPolicy(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java new file mode 100644 index 0000000000..624c89ca7e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.crossprotocol; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.util.ArrayList; + + +public class AMQPToOpenwireTest extends ActiveMQTestBase { + + public static final String OWHOST = "localhost"; + public static final int OWPORT = 61616; + protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + + JMSServerManager serverManager; + private ActiveMQServer server; + protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); + protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString); + private JmsConnectionFactory qpidfactory; + protected String queueName = "amqTestQueue1"; + private SimpleString coreQueue; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + serverManager = new JMSServerManagerImpl(server); + Configuration serverConfig = server.getConfiguration(); + serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(false).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ"))); + serverConfig.setSecurityEnabled(false); + serverManager.start(); + coreQueue = new SimpleString("jms.queue." + queueName); + this.server.createQueue(coreQueue, coreQueue, null, false, false); + qpidfactory = new JmsConnectionFactory("amqp://localhost:61616"); + } + + @Override + @After + public void tearDown() throws Exception { + if (serverManager != null) { + serverManager.stop(); + serverManager = null; + } + } + + @Test + public void testObjectMessage() throws Exception { + Connection connection = null; + try { + connection = qpidfactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(coreQueue.toString()); + MessageProducer producer = session.createProducer(queue); + ArrayList list = new ArrayList(); + list.add("aString"); + ObjectMessage objectMessage = session.createObjectMessage(list); + producer.send(objectMessage); + connection.close(); + + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + BytesMessage receive = (BytesMessage) consumer.receive(5000); + assertNotNull(receive); + byte[] bytes = new byte[(int) receive.getBodyLength()]; + receive.readBytes(bytes); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + list = (ArrayList) ois.readObject(); + assertEquals(list.get(0), "aString"); + connection.close(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java new file mode 100644 index 0000000000..627620304d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.crossprotocol; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; + + +public class OpenWireToAMQPTest extends ActiveMQTestBase { + + public static final String OWHOST = "localhost"; + public static final int OWPORT = 61616; + protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + + private ActiveMQServer server; + protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); + protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString); + private JmsConnectionFactory qpidfactory; + protected String queueName = "amqTestQueue1"; + private SimpleString coreQueue; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + Configuration serverConfig = server.getConfiguration(); + serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(false).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ"))); + serverConfig.setSecurityEnabled(false); + server.start(); + coreQueue = new SimpleString("jms.queue." + queueName); + this.server.createQueue(coreQueue, coreQueue, null, false, false); + qpidfactory = new JmsConnectionFactory("amqp://localhost:61616"); + } + + @Override + @After + public void tearDown() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + } + + @Test + public void testObjectMessage() throws Exception { + Connection connection = null; + try { + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + ArrayList list = new ArrayList(); + list.add("aString"); + ObjectMessage objectMessage = session.createObjectMessage(list); + producer.send(objectMessage); + connection.close(); + + connection = qpidfactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(coreQueue.toString()); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + ObjectMessage receive = (ObjectMessage) consumer.receive(5000); + list = (ArrayList) receive.getObject(); + assertEquals(list.get(0), "aString"); + connection.close(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + if (connection != null) { + connection.close(); + } + } + } +}