This closes #481
This commit is contained in:
commit
97200074cb
|
@ -146,10 +146,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.jboss.logmanager</groupId>
|
<groupId>org.jboss.logmanager</groupId>
|
||||||
<artifactId>jboss-logmanager</artifactId>
|
<artifactId>jboss-logmanager</artifactId>
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.qpid</groupId>
|
|
||||||
<artifactId>proton-jms</artifactId>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.airlift</groupId>
|
<groupId>io.airlift</groupId>
|
||||||
|
@ -190,6 +186,10 @@
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty-codec-mqtt</artifactId>
|
<artifactId>netty-codec-mqtt</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
<artifactId>activemq-amqp</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -80,7 +80,7 @@
|
||||||
<include>org.jboss.logging:jboss-logging</include>
|
<include>org.jboss.logging:jboss-logging</include>
|
||||||
<include>io.netty:netty-all</include>
|
<include>io.netty:netty-all</include>
|
||||||
<include>org.apache.qpid:proton-j</include>
|
<include>org.apache.qpid:proton-j</include>
|
||||||
<include>org.apache.qpid:proton-jms</include>
|
<include>org.apache.activemq:activemq-amqp</include>
|
||||||
<include>org.apache.activemq:activemq-client</include>
|
<include>org.apache.activemq:activemq-client</include>
|
||||||
<include>org.slf4j:slf4j-api</include>
|
<include>org.slf4j:slf4j-api</include>
|
||||||
<include>io.airlift:airline</include>
|
<include>io.airlift:airline</include>
|
||||||
|
|
|
@ -41,6 +41,10 @@
|
||||||
<artifactId>artemis-core-client</artifactId>
|
<artifactId>artemis-core-client</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
<artifactId>activemq-amqp</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.jboss.logging</groupId>
|
<groupId>org.jboss.logging</groupId>
|
||||||
<artifactId>jboss-logging-processor</artifactId>
|
<artifactId>jboss-logging-processor</artifactId>
|
||||||
|
@ -83,10 +87,6 @@
|
||||||
<groupId>org.apache.qpid</groupId>
|
<groupId>org.apache.qpid</groupId>
|
||||||
<artifactId>proton-j</artifactId>
|
<artifactId>proton-j</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.qpid</groupId>
|
|
||||||
<artifactId>proton-jms</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.geronimo.specs</groupId>
|
<groupId>org.apache.geronimo.specs</groupId>
|
||||||
<artifactId>geronimo-jms_2.0_spec</artifactId>
|
<artifactId>geronimo-jms_2.0_spec</artifactId>
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
|
||||||
|
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||||
|
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
public class AMQPNativeOutboundTransformer {
|
||||||
|
static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
|
||||||
|
byte[] data = new byte[(int) msg.getBodyLength()];
|
||||||
|
msg.readBytes(data);
|
||||||
|
msg.reset();
|
||||||
|
int count = msg.getIntProperty("JMSXDeliveryCount");
|
||||||
|
|
||||||
|
// decode...
|
||||||
|
ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
|
||||||
|
int offset = 0;
|
||||||
|
int len = data.length;
|
||||||
|
while (len > 0) {
|
||||||
|
final int decoded = amqp.decode(data, offset, len);
|
||||||
|
assert decoded > 0 : "Make progress decoding the message";
|
||||||
|
offset += decoded;
|
||||||
|
len -= decoded;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the DeliveryCount header...
|
||||||
|
// The AMQP delivery-count field only includes prior failed delivery attempts,
|
||||||
|
// whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
|
||||||
|
if (amqp.getHeader() == null) {
|
||||||
|
amqp.setHeader(new Header());
|
||||||
|
}
|
||||||
|
|
||||||
|
amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
|
||||||
|
|
||||||
|
return amqp;
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,7 +26,6 @@ import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
|
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.qpid.proton.jms.JMSVendor;
|
|
||||||
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
|
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage;
|
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage;
|
||||||
|
@ -36,8 +35,9 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
|
import org.apache.activemq.transport.amqp.message.JMSVendor;
|
||||||
|
|
||||||
public class ActiveMQJMSVendor extends JMSVendor {
|
public class ActiveMQJMSVendor implements JMSVendor {
|
||||||
|
|
||||||
private final IDGenerator serverGenerator;
|
private final IDGenerator serverGenerator;
|
||||||
|
|
||||||
|
@ -85,11 +85,6 @@ public class ActiveMQJMSVendor extends JMSVendor {
|
||||||
return new ServerDestination(name);
|
return new ServerDestination(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T extends Destination> T createDestination(String name, Class<T> kind) {
|
|
||||||
return super.createDestination(name, kind);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setJMSXGroupID(Message message, String s) {
|
public void setJMSXGroupID(Message message, String s) {
|
||||||
|
|
||||||
|
|
|
@ -16,23 +16,30 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
||||||
|
|
||||||
import org.apache.qpid.proton.jms.EncodedMessage;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.qpid.proton.jms.InboundTransformer;
|
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||||
import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
|
import org.apache.activemq.transport.amqp.message.InboundTransformer;
|
||||||
import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer;
|
import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
|
||||||
|
import org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer;
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
|
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
public class ProtonMessageConverter implements MessageConverter {
|
public class ProtonMessageConverter implements MessageConverter {
|
||||||
|
|
||||||
ActiveMQJMSVendor activeMQJMSVendor;
|
ActiveMQJMSVendor activeMQJMSVendor;
|
||||||
|
|
||||||
|
private final String prefixVendor;
|
||||||
|
|
||||||
public ProtonMessageConverter(IDGenerator idGenerator) {
|
public ProtonMessageConverter(IDGenerator idGenerator) {
|
||||||
activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator);
|
activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator);
|
||||||
inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor);
|
inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor);
|
||||||
outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor);
|
outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor);
|
||||||
|
prefixVendor = outboundTransformer.getPrefixVendor();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final InboundTransformer inboundTransformer;
|
private final InboundTransformer inboundTransformer;
|
||||||
|
@ -50,11 +57,30 @@ public class ProtonMessageConverter implements MessageConverter {
|
||||||
*
|
*
|
||||||
* @param messageSource
|
* @param messageSource
|
||||||
* @return
|
* @return
|
||||||
* @throws Exception
|
* @throws Exception https://issues.jboss.org/browse/ENTMQ-1560
|
||||||
*/
|
*/
|
||||||
public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception {
|
public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception {
|
||||||
EncodedMessage encodedMessageSource = messageSource;
|
EncodedMessage encodedMessageSource = messageSource;
|
||||||
ServerJMSMessage transformedMessage = (ServerJMSMessage) inboundTransformer.transform(encodedMessageSource);
|
ServerJMSMessage transformedMessage = null;
|
||||||
|
|
||||||
|
InboundTransformer transformer = inboundTransformer;
|
||||||
|
|
||||||
|
while (transformer != null) {
|
||||||
|
try {
|
||||||
|
transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
|
||||||
|
ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
|
||||||
|
|
||||||
|
transformer = transformer.getFallbackTransformer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (transformedMessage == null) {
|
||||||
|
throw new IOException("Failed to transform incoming delivery, skipping.");
|
||||||
|
}
|
||||||
|
|
||||||
transformedMessage.encode();
|
transformedMessage.encode();
|
||||||
|
|
||||||
|
@ -64,8 +90,19 @@ public class ProtonMessageConverter implements MessageConverter {
|
||||||
@Override
|
@Override
|
||||||
public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
|
public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
|
||||||
ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
|
ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
|
||||||
|
|
||||||
jmsMessage.decode();
|
jmsMessage.decode();
|
||||||
|
|
||||||
return outboundTransformer.convert(jmsMessage);
|
if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) {
|
||||||
|
if (jmsMessage instanceof BytesMessage) {
|
||||||
|
return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return outboundTransformer.convert(jmsMessage);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,13 +24,13 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||||
|
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
import org.apache.qpid.proton.engine.Link;
|
import org.apache.qpid.proton.engine.Link;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
import org.apache.qpid.proton.jms.EncodedMessage;
|
|
||||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
|
|
@ -26,12 +26,12 @@ import java.util.Map;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.PooledByteBufAllocator;
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||||
import org.apache.qpid.proton.jms.EncodedMessage;
|
|
||||||
import org.apache.qpid.proton.message.Message;
|
import org.apache.qpid.proton.message.Message;
|
||||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||||
|
|
|
@ -171,6 +171,10 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
|
||||||
try {
|
try {
|
||||||
if (buffer.getByte(4) == 0x03) {
|
if (buffer.getByte(4) == 0x03) {
|
||||||
dispatchSASL();
|
dispatchSASL();
|
||||||
|
/*
|
||||||
|
* there is a chance that if SASL Handshake has been carried out that the capacity may change.
|
||||||
|
* */
|
||||||
|
capacity = transport.capacity();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Throwable ignored) {
|
catch (Throwable ignored) {
|
||||||
|
|
14
pom.xml
14
pom.xml
|
@ -408,15 +408,21 @@
|
||||||
<!-- License: Apache 2.0 -->
|
<!-- License: Apache 2.0 -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.qpid</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>proton-jms</artifactId>
|
<artifactId>activemq-client</artifactId>
|
||||||
<version>${proton.version}</version>
|
<version>${activemq5-version}</version>
|
||||||
<!-- License: Apache 2.0 -->
|
<!-- License: Apache 2.0 -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>activemq-client</artifactId>
|
<artifactId>activemq-amqp</artifactId>
|
||||||
<version>${activemq5-version}</version>
|
<version>${activemq5-version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.geronimo.specs</groupId>
|
||||||
|
<artifactId>geronimo-jms_1.1_spec</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
<!-- License: Apache 2.0 -->
|
<!-- License: Apache 2.0 -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -194,10 +194,6 @@
|
||||||
<groupId>org.apache.qpid</groupId>
|
<groupId>org.apache.qpid</groupId>
|
||||||
<artifactId>proton-j</artifactId>
|
<artifactId>proton-j</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.qpid</groupId>
|
|
||||||
<artifactId>proton-jms</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.qpid</groupId>
|
<groupId>org.apache.qpid</groupId>
|
||||||
<artifactId>qpid-client</artifactId>
|
<artifactId>qpid-client</artifactId>
|
||||||
|
|
Loading…
Reference in New Issue