From 7fe30bc0cca18c579d04ace7ab64fe3e6ce92664 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 3 Oct 2012 21:46:46 +0000 Subject: [PATCH] Making more progress on the AMQP implementation. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1393782 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/ActiveMQJMSVendor.java | 67 +++++ .../transport/amqp/AmqpProtocolConverter.java | 239 +++++++----------- .../transport/amqp/AmqpSubscription.java | 68 ----- .../amqp/JMSMappingInboundTransformer.java | 24 -- .../AMQPNativeInboundTransformer.java | 29 ++- .../AMQPNativeOutboundTransformer.java | 59 +++++ .../amqp/transform/InboundTransformer.java | 80 ++++++ .../JMSMappingInboundTransformer.java | 206 +++++++++++++++ .../JMSMappingOutboundTransformer.java | 58 +++++ .../transport/amqp/transform/JMSVendor.java | 29 +++ .../OutboundTransformer.java} | 33 ++- .../activemq/transport/amqp/AmqpNioTest.java | 2 +- .../activemq/transport/amqp/AmqpSslTest.java | 2 +- .../{AmqpTest.java => AmqpTestSupport.java} | 17 +- .../transport/amqp/SwiftMQClientTest.java | 3 +- pom.xml | 6 + 16 files changed, 667 insertions(+), 255 deletions(-) create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java delete mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java delete mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java rename activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/{ => transform}/AMQPNativeInboundTransformer.java (52%) create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java rename activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/{InboundTransformer.java => transform/OutboundTransformer.java} (57%) rename activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/{AmqpTest.java => AmqpTestSupport.java} (95%) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java new file mode 100644 index 0000000000..37b23e1b8b --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java @@ -0,0 +1,67 @@ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.command.*; +import org.apache.activemq.transport.amqp.transform.JMSVendor; + +import javax.jms.*; +import javax.jms.Message; + +/** + * @author Hiram Chirino + */ +public class ActiveMQJMSVendor extends JMSVendor { + + final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor(); + + private ActiveMQJMSVendor() {} + + @Override + public BytesMessage createBytesMessage() { + return new ActiveMQBytesMessage(); + } + + @Override + public StreamMessage createStreamMessage() { + return new ActiveMQStreamMessage(); + } + + @Override + public Message createMessage() { + return new ActiveMQMessage(); + } + + @Override + public TextMessage createTextMessage() { + return new ActiveMQTextMessage(); + } + + @Override + public ObjectMessage createObjectMessage() { + return new ActiveMQObjectMessage(); + } + + @Override + public MapMessage createMapMessage() { + return new ActiveMQMapMessage(); + } + + @Override + public Destination createDestination(String name) { + return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE); + } + + @Override + public void setJMSXUserID(Message msg, String value) { + ((ActiveMQMessage)msg).setUserID(value); + } + + @Override + public void setJMSXGroupID(Message msg, String value) { + ((ActiveMQMessage)msg).setGroupID(value); + } + + @Override + public void setJMSXGroupSequence(Message msg, int value) { + ((ActiveMQMessage)msg).setGroupSequence(value); + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 82a3b96d13..ddf22225d1 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -18,26 +18,23 @@ package org.apache.activemq.transport.amqp; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.command.*; -import org.apache.activemq.command.Message; +import org.apache.activemq.transport.amqp.transform.*; import org.apache.activemq.util.*; import org.apache.qpid.proton.engine.*; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.impl.ConnectionImpl; -import org.apache.qpid.proton.engine.impl.DeliveryImpl; import org.apache.qpid.proton.engine.impl.TransportImpl; -import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.zip.Inflater; + +import org.fusesource.hawtbuf.ByteArrayOutputStream; class AmqpProtocolConverter { @@ -80,7 +77,7 @@ class AmqpProtocolConverter { this.protonTransport.bind(this.protonConnection); } - void pumpOut() { + void pumpProtonToSocket() { try { int size = 1024 * 64; byte data[] = new byte[size]; @@ -158,11 +155,7 @@ class AmqpProtocolConverter { link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE); while (link != null) { - if (link instanceof Receiver) { -// listener.onReceiverClose((Receiver) link); - } else { -// listener.onSenderClose((Sender) link); - } + ((AmqpDeliveryListener)link.getContext()).onClose(); link.close(); link = link.next(ACTIVE_STATE, CLOSED_STATE); } @@ -170,8 +163,7 @@ class AmqpProtocolConverter { session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE); while (session != null) { //TODO - close links? -// listener.onSessionClose(session); - session.close(); + onSessionClose(session); session = session.next(ACTIVE_STATE, CLOSED_STATE); } if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) { @@ -183,7 +175,7 @@ class AmqpProtocolConverter { handleException(new AmqpProtocolException("Could not process AMQP commands", true, e)); } - pumpOut(); + pumpProtonToSocket(); } public void onActiveMQCommand(Command command) throws Exception { @@ -223,6 +215,7 @@ class AmqpProtocolConverter { static abstract class AmqpDeliveryListener { abstract public void onDelivery(Delivery delivery) throws Exception; + public void onClose() throws Exception {} } private void onConnectionOpen() throws AmqpProtocolException { @@ -255,14 +248,14 @@ class AmqpProtocolConverter { public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { protonConnection.open(); - pumpOut(); + pumpProtonToSocket(); if (response.isException()) { Throwable exception = ((ExceptionResponse) response).getException(); // TODO: figure out how to close /w an error. // protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage())); protonConnection.close(); - pumpOut(); + pumpProtonToSocket(); amqpTransport.onException(IOExceptionSupport.create(exception)); return; } @@ -278,6 +271,12 @@ class AmqpProtocolConverter { session.open(); } + private void onSessionClose(Session session) { + AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext(); + sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null); + session.close(); + } + private void onLinkOpen(Link link) { link.setLocalSourceAddress(link.getRemoteSourceAddress()); link.setLocalTargetAddress(link.getRemoteTargetAddress()); @@ -290,54 +289,54 @@ class AmqpProtocolConverter { } } + InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + class ProducerContext extends AmqpDeliveryListener { private final ProducerId producerId; private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final ActiveMQDestination destination; + ByteArrayOutputStream current = new ByteArrayOutputStream(); public ProducerContext(ProducerId producerId, ActiveMQDestination destination) { this.producerId = producerId; this.destination = destination; } + @Override - public void onDelivery(Delivery delivery) throws JMSException { -// delivery. - ActiveMQMessage message = convertMessage((DeliveryImpl) delivery); + public void onDelivery(Delivery delivery) throws Exception { + if( current ==null ) { + current = new ByteArrayOutputStream(); + } + + Receiver receiver = ((Receiver)delivery.getLink()); + int count; + byte data[] = new byte[1024*4]; + while( (count = receiver.recv(data, 0, data.length)) > 0 ) { + current.write(data, 0, count); + } + + // Expecting more deliveries.. + if( count == 0 ) { + return; + } + + final Buffer buffer = current.toBuffer(); + final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); + current = null; + + if( message.getDestination()==null ) { + message.setJMSDestination(destination); + } message.setProducerId(producerId); + if( message.getMessageId()==null ) { + message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId())); + } message.onSend(); // sendToActiveMQ(message, createResponseHandler(command)); sendToActiveMQ(message, null); } - ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException { - ActiveMQBytesMessage msg = nextMessage(delivery); - final Receiver receiver = (Receiver) delivery.getLink(); - byte buff[] = new byte[1024 * 4]; - int count = 0; - while ((count = receiver.recv(buff, 0, buff.length)) >= 0) { - msg.writeBytes(buff, 0, count); - } - return msg; - } - - ActiveMQBytesMessage current; - - private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException { - if (current == null) { - current = new ActiveMQBytesMessage(); - current.setJMSDestination(destination); - current.setProducerId(producerId); - current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId())); - current.setTimestamp(System.currentTimeMillis()); - current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY); -// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); -// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); - System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState()); - } - return current; - } - } @@ -345,7 +344,7 @@ class AmqpProtocolConverter { // Client is producing to this receiver object ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); - ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE); + ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE); ProducerContext producerContext = new ProducerContext(producerId, destination); receiver.setContext(producerContext); @@ -360,12 +359,13 @@ class AmqpProtocolConverter { Throwable exception = ((ExceptionResponse) response).getException(); receiver.close(); } - pumpOut(); + pumpProtonToSocket(); } }); } + OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE); class ConsumerContext extends AmqpDeliveryListener { private final ConsumerId consumerId; @@ -395,83 +395,66 @@ class AmqpProtocolConverter { this.sender = sender; } + @Override + public void onClose() throws Exception { + sendToActiveMQ(new RemoveInfo(consumerId), null); + } + + LinkedList outbound = new LinkedList(); + // called when the connection receives a JMS message from ActiveMQ public void onMessageDispatch(MessageDispatch md) throws Exception { - final byte[] tag = nextTag(); - final Delivery delivery = sender.delivery(tag, 0, tag.length); - delivery.setContext(md); - - // Covert to an AMQP messages. - org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage()); - byte buffer[] = new byte[1024*4]; - int c=0; - - // And send the AMQP message over the link. - while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) { - sender.send(buffer, 0, c); - } - sender.advance(); - + outbound.addLast(md); + pumpOutbound(); + pumpProtonToSocket(); } - public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception { -// result.setContentEncoding(); -// QoS qoS; -// if (message.propertyExists(QOS_PROPERTY_NAME)) { -// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); -// qoS = QoS.values()[ordinal]; -// -// } else { -// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; -// } -// result.qos(qoS); + Buffer current; - Buffer content = null; - if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { - ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); - msg.setReadOnlyBody(true); - String messageText = msg.getText(); - content = new Buffer(messageText.getBytes("UTF-8")); - } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { - ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); - msg.setReadOnlyBody(true); - byte[] data = new byte[(int) msg.getBodyLength()]; - msg.readBytes(data); - content = new Buffer(data); - } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { - ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); - msg.setReadOnlyBody(true); - Map map = msg.getContentMap(); - content = new Buffer(map.toString().getBytes("UTF-8")); - } else { - ByteSequence byteSequence = message.getContent(); - if (byteSequence != null && byteSequence.getLength() > 0) { - if (message.isCompressed()) { - Inflater inflater = new Inflater(); - inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); - byte[] data = new byte[4096]; - int read; - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - while ((read = inflater.inflate(data)) != 0) { - bytesOut.write(data, 0, read); + public void pumpOutbound() { + while(true) { + + while( current!=null ) { + int sent = sender.send(current.data, current.offset, current.length); + if( sent > 0 ) { + current.moveHead(sent); + if( current.length == 0 ) { + sender.advance(); + current = null; } - byteSequence = bytesOut.toByteSequence(); + } else { + return; } - content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length); - } else { - content = new Buffer(0); + } + + if( outbound.isEmpty() ) { + return; + } + + final MessageDispatch md = outbound.removeFirst(); + final byte[] tag = nextTag(); + final Delivery delivery = sender.delivery(tag, 0, tag.length); + delivery.setContext(md); + + try { + final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage(); + final byte[] amqpMessage = outboundTransformer.transform(jms); + if( amqpMessage!=null && amqpMessage.length > 0 ) { + current = new Buffer(amqpMessage); + } else { + // TODO: message could not be generated what now? + } + } catch (Exception e) { + e.printStackTrace(); } } - - org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message(); - return result; } - @Override public void onDelivery(Delivery delivery) throws JMSException { if( delivery.remotelySettled() ) { MessageDispatch md = (MessageDispatch) delivery.getContext(); + pumpOutbound(); } } @@ -501,38 +484,12 @@ class AmqpProtocolConverter { Throwable exception = ((ExceptionResponse) response).getException(); sender.close(); } - pumpOut(); + pumpProtonToSocket(); } }); } -// -// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException { -// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString())); -// if (destination == null) { -// throw new AmqpProtocolException("Invalid Destination."); -// } -// -// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); -// ConsumerInfo consumerInfo = new ConsumerInfo(id); -// consumerInfo.setDestination(destination); -// consumerInfo.setPrefetchSize(1000); -// consumerInfo.setDispatchAsync(true); -// if (!connect.cleanSession() && (connect.clientId() != null)) { -// //by default subscribers are persistent -// consumerInfo.setSubscriptionName(connect.clientId().toString()); -// } -// -// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo); -// -// -// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription); -// -// sendToActiveMQ(consumerInfo, null); -// return topic.qos(); -// } -// // void onUnSubscribe(UNSUBSCRIBE command) { // UTF8Buffer[] topics = command.topics(); // if (topics != null) { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java deleted file mode 100644 index a68ee00e70..0000000000 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp; - -import org.apache.activemq.command.*; - -import javax.jms.JMSException; -import java.io.IOException; -import java.util.zip.DataFormatException; - -/** - * Keeps track of the AMQP client subscription so that acking is correctly done. - */ -class AmqpSubscription { -// private final AmqpProtocolConverter protocolConverter; -// -// private final ConsumerInfo consumerInfo; -// private ActiveMQDestination destination; -// private final QoS qos; -// -// public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) { -// this.protocolConverter = protocolConverter; -// this.consumerInfo = consumerInfo; -// this.qos = qos; -// } -// -// MessageAck createMessageAck(MessageDispatch md) { -// return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); -// } -// -// PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException { -// PUBLISH publish = protocolConverter.convertMessage(message); -// if (publish.qos().ordinal() > this.qos.ordinal()) { -// publish.qos(this.qos); -// } -// return publish; -// } -// -// public boolean expectAck() { -// return qos != QoS.AT_MOST_ONCE; -// } -// -// public void setDestination(ActiveMQDestination destination) { -// this.destination = destination; -// } -// -// public ActiveMQDestination getDestination() { -// return destination; -// } -// -// public ConsumerInfo getConsumerInfo() { -// return consumerInfo; -// } -} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java deleted file mode 100644 index 8626c17ae6..0000000000 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp; - -/** -* @author Hiram Chirino -*/ -public class JMSMappingInboundTransformer extends InboundTransformer { - -} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java similarity index 52% rename from activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java rename to activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java index 994319b030..57f5d2d6e2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java @@ -14,11 +14,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp; +package org.apache.activemq.transport.amqp.transform; + +import javax.jms.BytesMessage; +import javax.jms.Message; /** * @author Hiram Chirino */ public class AMQPNativeInboundTransformer extends InboundTransformer { + + public AMQPNativeInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception { + + BytesMessage rc = vendor.createBytesMessage(); + rc.writeBytes(amqpMessage, offset, len); + + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + + final long now = System.currentTimeMillis(); + rc.setJMSTimestamp(now); + if( defaultTtl > 0 ) { + rc.setJMSExpiration(now + defaultTtl); + } + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat); + rc.setBooleanProperty(prefixVendor + "NATIVE", false); + return rc; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java new file mode 100644 index 0000000000..8e2f1a9262 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import javax.jms.BytesMessage; +import javax.jms.Message; +import javax.jms.MessageFormatException; + +/** +* @author Hiram Chirino +*/ +public class AMQPNativeOutboundTransformer extends OutboundTransformer { + + public AMQPNativeOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public byte[] transform(Message jms) throws Exception { + if( jms == null ) + return null; + if( !(jms instanceof BytesMessage) ) + return null; + + long messageFormat; + try { + if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) { + return null; + } + messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT"); + } catch (MessageFormatException e) { + return null; + } + + // TODO: Proton should probably expose a way to set the msg format + // delivery.settMessageFormat(messageFormat); + + BytesMessage bytesMessage = (BytesMessage) jms; + byte data[] = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(data); + return data; + } + + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java new file mode 100644 index 0000000000..a1a86bfffa --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import org.apache.qpid.proton.engine.Delivery; + +import javax.jms.Message; +import java.io.IOException; + +/** +* @author Hiram Chirino +*/ +public abstract class InboundTransformer { + + JMSVendor vendor; + String prefixVendor = "JMS_AMQP_"; + int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; + long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + + public InboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + } + + abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception; + + public int getDefaultDeliveryMode() { + return defaultDeliveryMode; + } + + public void setDefaultDeliveryMode(int defaultDeliveryMode) { + this.defaultDeliveryMode = defaultDeliveryMode; + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public long getDefaultTtl() { + return defaultTtl; + } + + public void setDefaultTtl(long defaultTtl) { + this.defaultTtl = defaultTtl; + } + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java new file mode 100644 index 0000000000..78991d47ce --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import org.apache.qpid.proton.type.Binary; +import org.apache.qpid.proton.type.messaging.*; + +import javax.jms.*; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** +* @author Hiram Chirino +*/ +public class JMSMappingInboundTransformer extends InboundTransformer { + + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations= "MA_"; + String prefixFooter = "FT_"; + + public JMSMappingInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception { + org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(); + + while( len > 0 ) { + final int decoded = amqp.decode(amqpMessage, offset, len); + assert decoded > 0: "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + Message rc; + final Section body = amqp.getBody(); + if( body instanceof Data ) { + Binary d = ((Data) body).getValue(); + BytesMessage m = vendor.createBytesMessage(); + m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); + rc = m; + } else if (body instanceof AmqpSequence ) { + AmqpSequence sequence = (AmqpSequence) body; + StreamMessage m = vendor.createStreamMessage(); + throw new RuntimeException("not implemented"); +// jms = m; + } else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + if( value == null ) { + rc = vendor.createMessage(); + } if( value instanceof String ) { + TextMessage m = vendor.createTextMessage(); + m.setText((String) value); + rc = m; + } else if( value instanceof Binary ) { + Binary d = (Binary) value; + BytesMessage m = vendor.createBytesMessage(); + m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); + rc = m; + } else if( value instanceof List) { + List d = (List) value; + StreamMessage m = vendor.createStreamMessage(); + throw new RuntimeException("not implemented"); +// jms = m; + } else if( value instanceof Map) { + Map d = (Map) value; + MapMessage m = vendor.createMapMessage(); + throw new RuntimeException("not implemented"); +// jms = m; + } else { + ObjectMessage m = vendor.createObjectMessage(); + throw new RuntimeException("not implemented"); +// jms = m; + } + } else { + throw new RuntimeException("Unexpected body type."); + } + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + rc.setJMSExpiration(defaultTtl); + + final Header header = amqp.getHeader(); + if( header!=null ) { + if( header.getDurable()!=null ) { + rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } + if( header.getPriority()!=null ) { + rc.setJMSPriority(header.getPriority().intValue()); + } + if( header.getTtl()!=null ) { + rc.setJMSExpiration(header.getTtl().longValue()); + } + if( header.getFirstAcquirer() !=null ) { + rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); + } + if( header.getDeliveryCount()!=null ) { + rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue()); + } + } + + final DeliveryAnnotations da = amqp.getDeliveryAnnotations(); + if( da!=null ) { + for (Map.Entry entry : (Set)da.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue()); + } + } + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if( ma!=null ) { + for (Map.Entry entry : (Set)ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + } + } + + final Properties properties = amqp.getProperties(); + if( properties!=null ) { + if( properties.getMessageId()!=null ) { + rc.setJMSMessageID(properties.getMessageId().toString()); + } + Binary userId = properties.getUserId(); + if( userId!=null ) { + vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); + } + if( properties.getTo()!=null ) { + rc.setJMSDestination(vendor.createDestination(properties.getTo())); + } + if( properties.getSubject()!=null ) { + rc.setStringProperty(prefixVendor + "Subject", properties.getSubject()); + } + if( properties.getReplyTo() !=null ) { + rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); + } + if( properties.getCorrelationId() !=null ) { + rc.setJMSCorrelationID(properties.getCorrelationId().toString()); + } + if( properties.getContentType() !=null ) { + rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + } + if( properties.getContentEncoding() !=null ) { + rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + } + if( properties.getCreationTime()!=null ) { + rc.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if( properties.getGroupId()!=null ) { + vendor.setJMSXGroupID(rc, properties.getGroupId()); + } + if( properties.getGroupSequence()!=null ) { + vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue()); + } + if( properties.getReplyToGroupId()!=null ) { + rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if( da!=null ) { + for (Map.Entry entry : (Set)ap.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(rc, key, entry.getValue()); + } + } + + final Footer fp = amqp.getFooter(); + if( da!=null ) { + for (Map.Entry entry : (Set)fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue()); + } + } + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat); + rc.setBooleanProperty(prefixVendor + "NATIVE", false); + return rc; + } + + private void setProperty(Message msg, String key, Object value) throws JMSException { + if( value instanceof String ) { + msg.setStringProperty(key, (String) value); +// } else if( value instanceof Integer ) { +// msg.setIntProperty(key, ((Integer) value).intValue()); +// } else if( value instanceof Long ) { +// msg.setLongProperty(key, ((Long) value).longValue()); + } else { + throw new RuntimeException("Unexpected value type: "+value.getClass()); + } + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java new file mode 100644 index 0000000000..5f4822deb3 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import javax.jms.BytesMessage; +import javax.jms.Message; +import javax.jms.MessageFormatException; + +/** +* @author Hiram Chirino +*/ +public class JMSMappingOutboundTransformer extends OutboundTransformer { + + + public JMSMappingOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public byte[] transform(Message jms) throws Exception { + if( jms == null ) + return null; + if( !(jms instanceof BytesMessage) ) + return null; + + long messageFormat; + try { + if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) { + return null; + } + messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT"); + } catch (MessageFormatException e) { + return null; + } + + // TODO: Proton should probably expose a way to set the msg format + // delivery.settMessageFormat(messageFormat); + + BytesMessage bytesMessage = (BytesMessage) jms; + byte data[] = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(data); + return data; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java new file mode 100644 index 0000000000..e7571edd42 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java @@ -0,0 +1,29 @@ +package org.apache.activemq.transport.amqp.transform; + +import javax.jms.*; + +/** + * @author Hiram Chirino + */ +abstract public class JMSVendor { + + public abstract BytesMessage createBytesMessage(); + + public abstract StreamMessage createStreamMessage(); + + public abstract Message createMessage(); + + public abstract TextMessage createTextMessage(); + + public abstract ObjectMessage createObjectMessage(); + + public abstract MapMessage createMapMessage(); + + public abstract void setJMSXUserID(Message jms, String value); + + public abstract Destination createDestination(String name); + + public abstract void setJMSXGroupID(Message jms, String groupId); + + public abstract void setJMSXGroupSequence(Message jms, int i); +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java similarity index 57% rename from activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java rename to activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java index 10344894b8..74a44c51be 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java @@ -14,16 +14,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp; +package org.apache.activemq.transport.amqp.transform; + +import org.apache.qpid.proton.engine.Delivery; + +import javax.jms.Message; /** * @author Hiram Chirino */ -public class InboundTransformer { +public abstract class OutboundTransformer { + JMSVendor vendor; String prefixVendor = "JMS_AMQP_"; - int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; - int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; - long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + public OutboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + } + + public abstract byte[] transform(Message jms) throws Exception; + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java index 859062fdc6..ca186749dc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java @@ -18,7 +18,7 @@ package org.apache.activemq.transport.amqp; import org.apache.activemq.broker.BrokerService; -public class AmqpNioTest extends AmqpTest { +public class AmqpNioTest extends AmqpTestSupport { protected void addAMQPConnector(BrokerService brokerService) throws Exception { brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1"); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java index 443479e33a..2d496b24ec 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java @@ -28,7 +28,7 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; @Ignore("hangs atm, needs investigation") -public class AmqpSslTest extends AmqpTest { +public class AmqpSslTest extends AmqpTestSupport { public void startBroker() throws Exception { System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); System.setProperty("javax.net.ssl.trustStorePassword", "password"); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java similarity index 95% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 17afc37e88..f9ad5af1b0 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -16,34 +16,25 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; +import junit.framework.TestCase; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.util.ByteSequence; import org.junit.After; import org.junit.Before; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.fusesource.hawtbuf.UTF8Buffer.utf8; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -public class AmqpTest { - protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class); +public class AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class); protected BrokerService brokerService; protected Vector exceptions = new Vector(); protected int numberOfMessages; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java index ce089d9294..2db1b5b385 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java @@ -27,7 +27,7 @@ import org.junit.Test; /** * @author Hiram Chirino */ -public class SwiftMQClientTest extends AmqpTest { +public class SwiftMQClientTest extends AmqpTestSupport { @Test public void testSendReceive() throws Exception { @@ -64,6 +64,7 @@ public class SwiftMQClientTest extends AmqpTest { p.close(); session.close(); } + // { // Session session = connection.createSession(10, 10); // Consumer c = session.createConsumer(queue, 100, qos, true, null); diff --git a/pom.xml b/pom.xml index 7412aa3d61..d0cd21d8dc 100755 --- a/pom.xml +++ b/pom.xml @@ -1372,6 +1372,12 @@ + + unstable + + activemq-amqp + + apache-release