Making more progress on the AMQP implementation.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1393782 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-03 21:46:46 +00:00
parent cdd5150340
commit 7fe30bc0cc
16 changed files with 667 additions and 255 deletions

View File

@ -0,0 +1,67 @@
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.amqp.transform.JMSVendor;
import javax.jms.*;
import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class ActiveMQJMSVendor extends JMSVendor {
final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
private ActiveMQJMSVendor() {}
@Override
public BytesMessage createBytesMessage() {
return new ActiveMQBytesMessage();
}
@Override
public StreamMessage createStreamMessage() {
return new ActiveMQStreamMessage();
}
@Override
public Message createMessage() {
return new ActiveMQMessage();
}
@Override
public TextMessage createTextMessage() {
return new ActiveMQTextMessage();
}
@Override
public ObjectMessage createObjectMessage() {
return new ActiveMQObjectMessage();
}
@Override
public MapMessage createMapMessage() {
return new ActiveMQMapMessage();
}
@Override
public Destination createDestination(String name) {
return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
}
@Override
public void setJMSXUserID(Message msg, String value) {
((ActiveMQMessage)msg).setUserID(value);
}
@Override
public void setJMSXGroupID(Message msg, String value) {
((ActiveMQMessage)msg).setGroupID(value);
}
@Override
public void setJMSXGroupSequence(Message msg, int value) {
((ActiveMQMessage)msg).setGroupSequence(value);
}
}

View File

@ -18,26 +18,23 @@ package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
import org.apache.activemq.command.Message;
import org.apache.activemq.transport.amqp.transform.*;
import org.apache.activemq.util.*;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.Inflater;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
class AmqpProtocolConverter {
@ -80,7 +77,7 @@ class AmqpProtocolConverter {
this.protonTransport.bind(this.protonConnection);
}
void pumpOut() {
void pumpProtonToSocket() {
try {
int size = 1024 * 64;
byte data[] = new byte[size];
@ -158,11 +155,7 @@ class AmqpProtocolConverter {
link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
while (link != null) {
if (link instanceof Receiver) {
// listener.onReceiverClose((Receiver) link);
} else {
// listener.onSenderClose((Sender) link);
}
((AmqpDeliveryListener)link.getContext()).onClose();
link.close();
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
@ -170,8 +163,7 @@ class AmqpProtocolConverter {
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
//TODO - close links?
// listener.onSessionClose(session);
session.close();
onSessionClose(session);
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
@ -183,7 +175,7 @@ class AmqpProtocolConverter {
handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
}
pumpOut();
pumpProtonToSocket();
}
public void onActiveMQCommand(Command command) throws Exception {
@ -223,6 +215,7 @@ class AmqpProtocolConverter {
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onClose() throws Exception {}
}
private void onConnectionOpen() throws AmqpProtocolException {
@ -255,14 +248,14 @@ class AmqpProtocolConverter {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.open();
pumpOut();
pumpProtonToSocket();
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
// TODO: figure out how to close /w an error.
// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
protonConnection.close();
pumpOut();
pumpProtonToSocket();
amqpTransport.onException(IOExceptionSupport.create(exception));
return;
}
@ -278,6 +271,12 @@ class AmqpProtocolConverter {
session.open();
}
private void onSessionClose(Session session) {
AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
session.close();
}
private void onLinkOpen(Link link) {
link.setLocalSourceAddress(link.getRemoteSourceAddress());
link.setLocalTargetAddress(link.getRemoteTargetAddress());
@ -290,54 +289,54 @@ class AmqpProtocolConverter {
}
}
InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ProducerContext extends AmqpDeliveryListener {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
ByteArrayOutputStream current = new ByteArrayOutputStream();
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
this.producerId = producerId;
this.destination = destination;
}
@Override
public void onDelivery(Delivery delivery) throws JMSException {
// delivery.
ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
public void onDelivery(Delivery delivery) throws Exception {
if( current ==null ) {
current = new ByteArrayOutputStream();
}
Receiver receiver = ((Receiver)delivery.getLink());
int count;
byte data[] = new byte[1024*4];
while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
current.write(data, 0, count);
}
// Expecting more deliveries..
if( count == 0 ) {
return;
}
final Buffer buffer = current.toBuffer();
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
current = null;
if( message.getDestination()==null ) {
message.setJMSDestination(destination);
}
message.setProducerId(producerId);
if( message.getMessageId()==null ) {
message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
}
message.onSend();
// sendToActiveMQ(message, createResponseHandler(command));
sendToActiveMQ(message, null);
}
ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
ActiveMQBytesMessage msg = nextMessage(delivery);
final Receiver receiver = (Receiver) delivery.getLink();
byte buff[] = new byte[1024 * 4];
int count = 0;
while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
msg.writeBytes(buff, 0, count);
}
return msg;
}
ActiveMQBytesMessage current;
private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
if (current == null) {
current = new ActiveMQBytesMessage();
current.setJMSDestination(destination);
current.setProducerId(producerId);
current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
current.setTimestamp(System.currentTimeMillis());
current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
}
return current;
}
}
@ -345,7 +344,7 @@ class AmqpProtocolConverter {
// Client is producing to this receiver object
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
ProducerContext producerContext = new ProducerContext(producerId, destination);
receiver.setContext(producerContext);
@ -360,12 +359,13 @@ class AmqpProtocolConverter {
Throwable exception = ((ExceptionResponse) response).getException();
receiver.close();
}
pumpOut();
pumpProtonToSocket();
}
});
}
OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
@ -395,83 +395,66 @@ class AmqpProtocolConverter {
this.sender = sender;
}
@Override
public void onClose() throws Exception {
sendToActiveMQ(new RemoveInfo(consumerId), null);
}
LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
outbound.addLast(md);
pumpOutbound();
pumpProtonToSocket();
}
Buffer current;
public void pumpOutbound() {
while(true) {
while( current!=null ) {
int sent = sender.send(current.data, current.offset, current.length);
if( sent > 0 ) {
current.moveHead(sent);
if( current.length == 0 ) {
sender.advance();
current = null;
}
} else {
return;
}
}
if( outbound.isEmpty() ) {
return;
}
final MessageDispatch md = outbound.removeFirst();
final byte[] tag = nextTag();
final Delivery delivery = sender.delivery(tag, 0, tag.length);
delivery.setContext(md);
// Covert to an AMQP messages.
org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
byte buffer[] = new byte[1024*4];
int c=0;
// And send the AMQP message over the link.
while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
sender.send(buffer, 0, c);
}
sender.advance();
}
public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
// result.setContentEncoding();
// QoS qoS;
// if (message.propertyExists(QOS_PROPERTY_NAME)) {
// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
// qoS = QoS.values()[ordinal];
//
// } else {
// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
// }
// result.qos(qoS);
Buffer content = null;
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
msg.setReadOnlyBody(true);
String messageText = msg.getText();
content = new Buffer(messageText.getBytes("UTF-8"));
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
msg.setReadOnlyBody(true);
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
content = new Buffer(data);
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
msg.setReadOnlyBody(true);
Map map = msg.getContentMap();
content = new Buffer(map.toString().getBytes("UTF-8"));
try {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
final byte[] amqpMessage = outboundTransformer.transform(jms);
if( amqpMessage!=null && amqpMessage.length > 0 ) {
current = new Buffer(amqpMessage);
} else {
ByteSequence byteSequence = message.getContent();
if (byteSequence != null && byteSequence.getLength() > 0) {
if (message.isCompressed()) {
Inflater inflater = new Inflater();
inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
byte[] data = new byte[4096];
int read;
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
while ((read = inflater.inflate(data)) != 0) {
bytesOut.write(data, 0, read);
// TODO: message could not be generated what now?
}
byteSequence = bytesOut.toByteSequence();
}
content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
} else {
content = new Buffer(0);
} catch (Exception e) {
e.printStackTrace();
}
}
org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
return result;
}
@Override
public void onDelivery(Delivery delivery) throws JMSException {
if( delivery.remotelySettled() ) {
MessageDispatch md = (MessageDispatch) delivery.getContext();
pumpOutbound();
}
}
@ -501,38 +484,12 @@ class AmqpProtocolConverter {
Throwable exception = ((ExceptionResponse) response).getException();
sender.close();
}
pumpOut();
pumpProtonToSocket();
}
});
}
//
// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
// if (destination == null) {
// throw new AmqpProtocolException("Invalid Destination.");
// }
//
// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
// ConsumerInfo consumerInfo = new ConsumerInfo(id);
// consumerInfo.setDestination(destination);
// consumerInfo.setPrefetchSize(1000);
// consumerInfo.setDispatchAsync(true);
// if (!connect.cleanSession() && (connect.clientId() != null)) {
// //by default subscribers are persistent
// consumerInfo.setSubscriptionName(connect.clientId().toString());
// }
//
// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
//
//
// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
//
// sendToActiveMQ(consumerInfo, null);
// return topic.qos();
// }
//
// void onUnSubscribe(UNSUBSCRIBE command) {
// UTF8Buffer[] topics = command.topics();
// if (topics != null) {

View File

@ -1,68 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.*;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.zip.DataFormatException;
/**
* Keeps track of the AMQP client subscription so that acking is correctly done.
*/
class AmqpSubscription {
// private final AmqpProtocolConverter protocolConverter;
//
// private final ConsumerInfo consumerInfo;
// private ActiveMQDestination destination;
// private final QoS qos;
//
// public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) {
// this.protocolConverter = protocolConverter;
// this.consumerInfo = consumerInfo;
// this.qos = qos;
// }
//
// MessageAck createMessageAck(MessageDispatch md) {
// return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
// }
//
// PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
// PUBLISH publish = protocolConverter.convertMessage(message);
// if (publish.qos().ordinal() > this.qos.ordinal()) {
// publish.qos(this.qos);
// }
// return publish;
// }
//
// public boolean expectAck() {
// return qos != QoS.AT_MOST_ONCE;
// }
//
// public void setDestination(ActiveMQDestination destination) {
// this.destination = destination;
// }
//
// public ActiveMQDestination getDestination() {
// return destination;
// }
//
// public ConsumerInfo getConsumerInfo() {
// return consumerInfo;
// }
}

View File

@ -1,24 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingInboundTransformer extends InboundTransformer {
}

View File

@ -14,11 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
package org.apache.activemq.transport.amqp.transform;
import javax.jms.BytesMessage;
import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQPNativeInboundTransformer extends InboundTransformer {
public AMQPNativeInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
BytesMessage rc = vendor.createBytesMessage();
rc.writeBytes(amqpMessage, offset, len);
rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority);
final long now = System.currentTimeMillis();
rc.setJMSTimestamp(now);
if( defaultTtl > 0 ) {
rc.setJMSExpiration(now + defaultTtl);
}
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
return rc;
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageFormatException;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQPNativeOutboundTransformer extends OutboundTransformer {
public AMQPNativeOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public byte[] transform(Message jms) throws Exception {
if( jms == null )
return null;
if( !(jms instanceof BytesMessage) )
return null;
long messageFormat;
try {
if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
return null;
}
messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
} catch (MessageFormatException e) {
return null;
}
// TODO: Proton should probably expose a way to set the msg format
// delivery.settMessageFormat(messageFormat);
BytesMessage bytesMessage = (BytesMessage) jms;
byte data[] = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
return data;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.engine.Delivery;
import javax.jms.Message;
import java.io.IOException;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public abstract class InboundTransformer {
JMSVendor vendor;
String prefixVendor = "JMS_AMQP_";
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
public InboundTransformer(JMSVendor vendor) {
this.vendor = vendor;
}
abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception;
public int getDefaultDeliveryMode() {
return defaultDeliveryMode;
}
public void setDefaultDeliveryMode(int defaultDeliveryMode) {
this.defaultDeliveryMode = defaultDeliveryMode;
}
public int getDefaultPriority() {
return defaultPriority;
}
public void setDefaultPriority(int defaultPriority) {
this.defaultPriority = defaultPriority;
}
public long getDefaultTtl() {
return defaultTtl;
}
public void setDefaultTtl(long defaultTtl) {
this.defaultTtl = defaultTtl;
}
public String getPrefixVendor() {
return prefixVendor;
}
public void setPrefixVendor(String prefixVendor) {
this.prefixVendor = prefixVendor;
}
public JMSVendor getVendor() {
return vendor;
}
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
}

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.messaging.*;
import javax.jms.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingInboundTransformer extends InboundTransformer {
String prefixDeliveryAnnotations = "DA_";
String prefixMessageAnnotations= "MA_";
String prefixFooter = "FT_";
public JMSMappingInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
while( len > 0 ) {
final int decoded = amqp.decode(amqpMessage, offset, len);
assert decoded > 0: "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
Message rc;
final Section body = amqp.getBody();
if( body instanceof Data ) {
Binary d = ((Data) body).getValue();
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
} else if (body instanceof AmqpSequence ) {
AmqpSequence sequence = (AmqpSequence) body;
StreamMessage m = vendor.createStreamMessage();
throw new RuntimeException("not implemented");
// jms = m;
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if( value == null ) {
rc = vendor.createMessage();
} if( value instanceof String ) {
TextMessage m = vendor.createTextMessage();
m.setText((String) value);
rc = m;
} else if( value instanceof Binary ) {
Binary d = (Binary) value;
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
} else if( value instanceof List) {
List d = (List) value;
StreamMessage m = vendor.createStreamMessage();
throw new RuntimeException("not implemented");
// jms = m;
} else if( value instanceof Map) {
Map d = (Map) value;
MapMessage m = vendor.createMapMessage();
throw new RuntimeException("not implemented");
// jms = m;
} else {
ObjectMessage m = vendor.createObjectMessage();
throw new RuntimeException("not implemented");
// jms = m;
}
} else {
throw new RuntimeException("Unexpected body type.");
}
rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority);
rc.setJMSExpiration(defaultTtl);
final Header header = amqp.getHeader();
if( header!=null ) {
if( header.getDurable()!=null ) {
rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
}
if( header.getPriority()!=null ) {
rc.setJMSPriority(header.getPriority().intValue());
}
if( header.getTtl()!=null ) {
rc.setJMSExpiration(header.getTtl().longValue());
}
if( header.getFirstAcquirer() !=null ) {
rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
}
if( header.getDeliveryCount()!=null ) {
rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue());
}
}
final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
if( da!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
}
}
final MessageAnnotations ma = amqp.getMessageAnnotations();
if( ma!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
}
}
final Properties properties = amqp.getProperties();
if( properties!=null ) {
if( properties.getMessageId()!=null ) {
rc.setJMSMessageID(properties.getMessageId().toString());
}
Binary userId = properties.getUserId();
if( userId!=null ) {
vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
}
if( properties.getTo()!=null ) {
rc.setJMSDestination(vendor.createDestination(properties.getTo()));
}
if( properties.getSubject()!=null ) {
rc.setStringProperty(prefixVendor + "Subject", properties.getSubject());
}
if( properties.getReplyTo() !=null ) {
rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
}
if( properties.getCorrelationId() !=null ) {
rc.setJMSCorrelationID(properties.getCorrelationId().toString());
}
if( properties.getContentType() !=null ) {
rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
}
if( properties.getContentEncoding() !=null ) {
rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
}
if( properties.getCreationTime()!=null ) {
rc.setJMSTimestamp(properties.getCreationTime().getTime());
}
if( properties.getGroupId()!=null ) {
vendor.setJMSXGroupID(rc, properties.getGroupId());
}
if( properties.getGroupSequence()!=null ) {
vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue());
}
if( properties.getReplyToGroupId()!=null ) {
rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
}
}
final ApplicationProperties ap = amqp.getApplicationProperties();
if( da!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, key, entry.getValue());
}
}
final Footer fp = amqp.getFooter();
if( da!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
}
}
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
return rc;
}
private void setProperty(Message msg, String key, Object value) throws JMSException {
if( value instanceof String ) {
msg.setStringProperty(key, (String) value);
// } else if( value instanceof Integer ) {
// msg.setIntProperty(key, ((Integer) value).intValue());
// } else if( value instanceof Long ) {
// msg.setLongProperty(key, ((Long) value).longValue());
} else {
throw new RuntimeException("Unexpected value type: "+value.getClass());
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageFormatException;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingOutboundTransformer extends OutboundTransformer {
public JMSMappingOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public byte[] transform(Message jms) throws Exception {
if( jms == null )
return null;
if( !(jms instanceof BytesMessage) )
return null;
long messageFormat;
try {
if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
return null;
}
messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
} catch (MessageFormatException e) {
return null;
}
// TODO: Proton should probably expose a way to set the msg format
// delivery.settMessageFormat(messageFormat);
BytesMessage bytesMessage = (BytesMessage) jms;
byte data[] = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
return data;
}
}

View File

@ -0,0 +1,29 @@
package org.apache.activemq.transport.amqp.transform;
import javax.jms.*;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
abstract public class JMSVendor {
public abstract BytesMessage createBytesMessage();
public abstract StreamMessage createStreamMessage();
public abstract Message createMessage();
public abstract TextMessage createTextMessage();
public abstract ObjectMessage createObjectMessage();
public abstract MapMessage createMapMessage();
public abstract void setJMSXUserID(Message jms, String value);
public abstract Destination createDestination(String name);
public abstract void setJMSXGroupID(Message jms, String groupId);
public abstract void setJMSXGroupSequence(Message jms, int i);
}

View File

@ -14,16 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.engine.Delivery;
import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class InboundTransformer {
public abstract class OutboundTransformer {
JMSVendor vendor;
String prefixVendor = "JMS_AMQP_";
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
public OutboundTransformer(JMSVendor vendor) {
this.vendor = vendor;
}
public abstract byte[] transform(Message jms) throws Exception;
public String getPrefixVendor() {
return prefixVendor;
}
public void setPrefixVendor(String prefixVendor) {
this.prefixVendor = prefixVendor;
}
public JMSVendor getVendor() {
return vendor;
}
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerService;
public class AmqpNioTest extends AmqpTest {
public class AmqpNioTest extends AmqpTestSupport {
protected void addAMQPConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1");
}

View File

@ -28,7 +28,7 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
@Ignore("hangs atm, needs investigation")
public class AmqpSslTest extends AmqpTest {
public class AmqpSslTest extends AmqpTestSupport {
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");

View File

@ -16,34 +16,25 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class AmqpTest {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class);
public class AmqpTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages;

View File

@ -27,7 +27,7 @@ import org.junit.Test;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class SwiftMQClientTest extends AmqpTest {
public class SwiftMQClientTest extends AmqpTestSupport {
@Test
public void testSendReceive() throws Exception {
@ -64,6 +64,7 @@ public class SwiftMQClientTest extends AmqpTest {
p.close();
session.close();
}
// {
// Session session = connection.createSession(10, 10);
// Consumer c = session.createConsumer(queue, 100, qos, true, null);

View File

@ -1372,6 +1372,12 @@
</reporting>
<profiles>
<profile>
<id>unstable</id>
<modules>
<module>activemq-amqp</module>
</modules>
</profile>
<profile>
<id>apache-release</id>
<activation>