From 60df3dc05ff634cf536a1252b826936b0a66bcba Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 26 Apr 2016 13:56:53 +0100 Subject: [PATCH] ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ some extra fixes needed https://issues.apache.org/jira/browse/ARTEMIS-503 --- .../proton/converter/ActiveMQJMSVendor.java | 5 +- .../JMSMappingInboundTransformer.java | 49 +++++++++++++ .../JMSMappingOutboundTransformer.java | 53 +++++++++++++++ .../converter/ProtonMessageConverter.java | 2 - .../converter/jms/ServerDestination.java | 10 ++- .../converter/jms/ServerJMSObjectMessage.java | 68 +++++++++++++++++++ 6 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java index ba6b9bea4b..3af26dc87c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java @@ -25,6 +25,7 @@ import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination; +import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage; @@ -67,7 +68,7 @@ public class ActiveMQJMSVendor implements JMSVendor { @Override public ObjectMessage createObjectMessage() { - return null; + return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0); } @Override @@ -110,6 +111,8 @@ public class ActiveMQJMSVendor implements JMSVendor { return new ServerJMSMapMessage(wrapped, deliveryCount); case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: return new ServerJMSTextMessage(wrapped, deliveryCount); + case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: + return new ServerJMSObjectMessage(wrapped, deliveryCount); default: return new ServerJMSMessage(wrapped, deliveryCount); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java new file mode 100644 index 0000000000..03f91046e9 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter; + +import org.apache.activemq.transport.amqp.message.JMSVendor; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.messaging.Properties; + +import javax.jms.Message; + +class JMSMappingInboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer { + + JMSMappingInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + super.populateMessage(jms, amqp); + final Properties properties = amqp.getProperties(); + if (properties != null) { + if (properties.getMessageId() != null) { + if (properties.getMessageId() instanceof Long) { + jms.setLongProperty(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID", (Long) properties.getMessageId()); + } + else if (properties.getMessageId() instanceof UnsignedLong) { + jms.setLongProperty(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID", ((UnsignedLong) properties.getMessageId()).longValue()); + } + else { + jms.setStringProperty(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID", properties.getMessageId().toString()); + } + } + } + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java new file mode 100644 index 0000000000..b643162d45 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter; + +import org.apache.activemq.transport.amqp.message.JMSVendor; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.message.ProtonJMessage; + +import javax.jms.JMSException; +import javax.jms.Message; +import java.io.UnsupportedEncodingException; +import java.util.Map; + +class JMSMappingOutboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer { + JMSMappingOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { + ProtonJMessage protonJMessage = super.convert(msg); + + Map properties = protonJMessage.getApplicationProperties().getValue(); + + if (properties.containsKey(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID")) { + Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID"); + protonJMessage.setMessageId(id); + } + else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID")) { + Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID"); + protonJMessage.setMessageId(new UnsignedLong(id)); + } + else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID")) { + String id = (String) properties.remove(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID"); + protonJMessage.setMessageId(id); + } + return protonJMessage; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java index da99e68669..47011c17c1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.protocol.proton.converter; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.activemq.transport.amqp.message.InboundTransformer; -import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; -import org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java index 09a0ae5bf3..ab26264223 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java @@ -18,12 +18,20 @@ package org.apache.activemq.artemis.core.protocol.proton.converter.jms; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import javax.jms.JMSException; +import javax.jms.Queue; + /** * This is just here to avoid all the client checks we ned with valid JMS destinations, protocol convertors don't need to * adhere to the jms. semantics. */ -public class ServerDestination extends ActiveMQDestination { +public class ServerDestination extends ActiveMQDestination implements Queue { public ServerDestination(String name) { super(name, name, false, false, null); } + + @Override + public String getQueueName() throws JMSException { + return getName(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java new file mode 100644 index 0000000000..938f459175 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.jms; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.util.ByteArrayOutputStream; + +import javax.jms.JMSException; +import javax.jms.ObjectMessage; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage { + public static final byte TYPE = Message.STREAM_TYPE; + + private Serializable object; + + public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) { + super(message, deliveryCount); + } + + @Override + public void setObject(Serializable object) throws JMSException { + this.object = object; + } + + @Override + public Serializable getObject() throws JMSException { + return object; + } + + @Override + public void encode() throws Exception { + super.encode(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream ous = new ObjectOutputStream(out); + ous.writeObject(object); + getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray()); + } + + @Override + public void decode() throws Exception { + super.decode(); + int size = getInnerMessage().getBodyBuffer().readableBytes(); + byte[] bytes = new byte[size]; + getInnerMessage().getBodyBuffer().readBytes(bytes); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + object = (Serializable) ois.readObject(); + } +}