diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 85fd39836c..6f462b4b86 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -146,10 +146,6 @@ org.jboss.logmanager jboss-logmanager - - - org.apache.qpid - proton-jms io.airlift @@ -190,6 +186,10 @@ io.netty netty-codec-mqtt + + org.apache.activemq + activemq-amqp + diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 4adfaf85be..d723bae567 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -80,7 +80,7 @@ org.jboss.logging:jboss-logging io.netty:netty-all org.apache.qpid:proton-j - org.apache.qpid:proton-jms + org.apache.activemq:activemq-amqp org.apache.activemq:activemq-client org.slf4j:slf4j-api io.airlift:airline diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml index 98149c1f93..78e9c3bfe0 100644 --- a/artemis-protocols/artemis-amqp-protocol/pom.xml +++ b/artemis-protocols/artemis-amqp-protocol/pom.xml @@ -41,6 +41,10 @@ artemis-core-client ${project.version} + + org.apache.activemq + activemq-amqp + org.jboss.logging jboss-logging-processor @@ -83,10 +87,6 @@ org.apache.qpid proton-j - - org.apache.qpid - proton-jms - org.apache.geronimo.specs geronimo-jms_2.0_spec diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java new file mode 100644 index 0000000000..c187ad088f --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter; + +import org.apache.activemq.transport.amqp.message.OutboundTransformer; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.message.ProtonJMessage; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +public class AMQPNativeOutboundTransformer { + static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + byte[] data = new byte[(int) msg.getBodyLength()]; + msg.readBytes(data); + msg.reset(); + int count = msg.getIntProperty("JMSXDeliveryCount"); + + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = data.length; + while (len > 0) { + final int decoded = amqp.decode(data, offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + // Update the DeliveryCount header... + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. + if (amqp.getHeader() == null) { + amqp.setHeader(new Header()); + } + + amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); + + return amqp; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java index 639b390fb4..ba6b9bea4b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java @@ -26,7 +26,6 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.qpid.proton.jms.JMSVendor; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage; @@ -36,8 +35,9 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.transport.amqp.message.JMSVendor; -public class ActiveMQJMSVendor extends JMSVendor { +public class ActiveMQJMSVendor implements JMSVendor { private final IDGenerator serverGenerator; @@ -85,11 +85,6 @@ public class ActiveMQJMSVendor extends JMSVendor { return new ServerDestination(name); } - @Override - public T createDestination(String name, Class kind) { - return super.createDestination(name, kind); - } - @Override public void setJMSXGroupID(Message message, String s) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java index 4de2357b58..da99e68669 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java @@ -16,23 +16,30 @@ */ package org.apache.activemq.artemis.core.protocol.proton.converter; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.apache.qpid.proton.jms.InboundTransformer; -import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; -import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.transport.amqp.message.EncodedMessage; +import org.apache.activemq.transport.amqp.message.InboundTransformer; +import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; +import org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.IDGenerator; +import javax.jms.BytesMessage; +import java.io.IOException; + public class ProtonMessageConverter implements MessageConverter { ActiveMQJMSVendor activeMQJMSVendor; + private final String prefixVendor; + public ProtonMessageConverter(IDGenerator idGenerator) { activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator); inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor); outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor); + prefixVendor = outboundTransformer.getPrefixVendor(); } private final InboundTransformer inboundTransformer; @@ -50,11 +57,30 @@ public class ProtonMessageConverter implements MessageConverter { * * @param messageSource * @return - * @throws Exception + * @throws Exception https://issues.jboss.org/browse/ENTMQ-1560 */ public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception { EncodedMessage encodedMessageSource = messageSource; - ServerJMSMessage transformedMessage = (ServerJMSMessage) inboundTransformer.transform(encodedMessageSource); + ServerJMSMessage transformedMessage = null; + + InboundTransformer transformer = inboundTransformer; + + while (transformer != null) { + try { + transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource); + break; + } + catch (Exception e) { + ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); + ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); + + transformer = transformer.getFallbackTransformer(); + } + } + + if (transformedMessage == null) { + throw new IOException("Failed to transform incoming delivery, skipping."); + } transformedMessage.encode(); @@ -64,8 +90,19 @@ public class ProtonMessageConverter implements MessageConverter { @Override public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); + jmsMessage.decode(); - return outboundTransformer.convert(jmsMessage); + if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) { + if (jmsMessage instanceof BytesMessage) { + return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage); + } + else { + return null; + } + } + else { + return outboundTransformer.convert(jmsMessage); + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 2dccc30962..ccd2b7ede4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -24,13 +24,13 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java index de514bbf44..fc9fe2c819 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java @@ -26,12 +26,12 @@ import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.impl.MessageImpl; diff --git a/pom.xml b/pom.xml index 292aa14e4a..47b58e4758 100644 --- a/pom.xml +++ b/pom.xml @@ -408,15 +408,21 @@ - org.apache.qpid - proton-jms - ${proton.version} + org.apache.activemq + activemq-client + ${activemq5-version} org.apache.activemq - activemq-client + activemq-amqp ${activemq5-version} + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 6c38b7c044..f0e1d14643 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -194,10 +194,6 @@ org.apache.qpid proton-j - - org.apache.qpid - proton-jms - org.apache.qpid qpid-client