Moved the JMS mapping logic into a proton module.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1414990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-11-28 23:16:42 +00:00
parent 2a8427d7d5
commit 3016b39249
19 changed files with 122 additions and 1059 deletions

View File

@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton</artifactId>
<artifactId>proton-jms</artifactId>
<version>${qpid-proton-version}</version>
</dependency>

View File

@ -1,7 +1,7 @@
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.amqp.transform.JMSVendor;
import org.apache.qpid.proton.jms.JMSVendor;
import javax.jms.*;
import javax.jms.Message;

View File

@ -18,16 +18,18 @@ package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.amqp.transform.*;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.*;
import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.jms.*;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.DescribedType;
import org.apache.qpid.proton.type.Symbol;
import org.apache.qpid.proton.type.UnsignedInteger;
import org.apache.qpid.proton.type.messaging.*;
import org.apache.qpid.proton.type.messaging.Modified;
import org.apache.qpid.proton.type.messaging.Rejected;
@ -60,6 +62,8 @@ class AmqpProtocolConverter {
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final UnsignedInteger DURABLE = new UnsignedInteger(2);
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
this.amqpTransport = amqpTransport;
@ -86,7 +90,7 @@ class AmqpProtocolConverter {
// private String clientId;
// private final String QOS_PROPERTY_NAME = "QoSPropertyName";
int prefetch = 100;
boolean trace = false;
boolean trace = true;
TransportImpl protonTransport = new TransportImpl();
ConnectionImpl protonConnection = new ConnectionImpl();
@ -345,8 +349,6 @@ class AmqpProtocolConverter {
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
} else {
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
}
@ -679,6 +681,7 @@ class AmqpProtocolConverter {
private final ConsumerId consumerId;
private final Sender sender;
private boolean presettle;
private boolean closed;
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
@ -714,23 +717,28 @@ class AmqpProtocolConverter {
@Override
public void onClose() throws Exception {
sendToActiveMQ(new RemoveInfo(consumerId), null);
if( !closed ) {
closed = true;
sendToActiveMQ(new RemoveInfo(consumerId), null);
}
}
LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
outbound.addLast(md);
pumpOutbound();
pumpProtonToSocket();
if( !closed ) {
outbound.addLast(md);
pumpOutbound();
pumpProtonToSocket();
}
}
Buffer currentBuffer;
Delivery currentDelivery;
public void pumpOutbound() throws Exception {
while(true) {
while(!closed) {
while( currentBuffer !=null ) {
int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
@ -876,41 +884,94 @@ class AmqpProtocolConverter {
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
// sender.get
ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
subscriptionsByConsumerId.put(id, consumerContext);
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
sender.setContext(consumerContext);
String selector = null;
if( source!=null ) {
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
if( value!=null ) {
selector = value.getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.setSource(null);
((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
sender.close();
consumerContext.closed = true;
return;
}
}
}
}
ActiveMQDestination dest;
org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
if( source != null && !source.getDynamic() ) {
dest = createDestination(source);
} else {
if( source == null ) {
source = new org.apache.qpid.proton.type.messaging.Source();
source.setAddress("");
source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
sender.setSource(source);
// Looks like durable sub removal.
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
consumerContext.closed=true;
sendToActiveMQ(rsi, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
}
sender.open();
pumpProtonToSocket();
}
});
return;
} else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
consumerContext.closed=true;
sender.close();
pumpProtonToSocket();
return;
} else if( source.getDynamic() ) {
// lets create a temp dest.
dest = createTempQueue();
source = new org.apache.qpid.proton.type.messaging.Source();
source.setAddress(dest.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
} else {
dest = createDestination(source);
}
sender.setContext(consumerContext);
subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
if( source.getDistributionMode() == COPY && dest.isQueue() ) {
consumerInfo.setBrowser(true);
}
if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
consumerInfo.setSubscriptionName(sender.getName());
}
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
if( value!=null ) {
consumerInfo.setSelector(value.getDescribed().toString());
}
value = (DescribedType)filter.get(NO_LOCAL);
DescribedType value = (DescribedType)filter.get(NO_LOCAL);
if( value!=null ) {
consumerInfo.setNoLocal(true);
}
@ -926,6 +987,7 @@ class AmqpProtocolConverter {
name = "amqp:invalid-field";
}
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
sender.open();
@ -936,6 +998,17 @@ class AmqpProtocolConverter {
}
static private boolean contains(Symbol[] haystack, Symbol needle) {
if( haystack!=null ) {
for (Symbol capability : haystack) {
if( capability == needle) {
return true;
}
}
}
return false;
}
private ActiveMQDestination createTempQueue() {
ActiveMQDestination rc;
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);

View File

@ -21,7 +21,7 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.amqp.transform.InboundTransformer;
import org.apache.qpid.proton.jms.InboundTransformer;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;

View File

@ -1,41 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import javax.jms.BytesMessage;
import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
public AMQPNativeInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
Message rc = super.transform(amqpMessage);
populateMessage(rc, amqp);
return rc;
}
}

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.type.UnsignedInteger;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageFormatException;
import java.nio.ByteBuffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQPNativeOutboundTransformer extends OutboundTransformer {
public AMQPNativeOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public EncodedMessage transform(Message msg) throws Exception {
if( msg == null )
return null;
if( !(msg instanceof BytesMessage) )
return null;
try {
if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
return null;
}
} catch (MessageFormatException e) {
return null;
}
return transform(this, (BytesMessage) msg);
}
static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
long messageFormat;
try {
messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
} catch (MessageFormatException e) {
return null;
}
byte data[] = new byte[(int) msg.getBodyLength()];
int dataSize = data.length;
msg.readBytes(data);
msg.reset();
try {
int count = msg.getIntProperty("JMSXDeliveryCount");
if( count > 1 ) {
// decode...
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
int offset = 0;
int len = data.length;
while( len > 0 ) {
final int decoded = amqp.decode(data, offset, len);
assert decoded > 0: "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
// Update the DeliveryCount header...
amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
// Re-encode...
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
if( overflow.position() > 0 ) {
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
}
data = buffer.array();
dataSize = c;
}
} catch (JMSException e) {
}
return new EncodedMessage(messageFormat, data, 0, dataSize);
}
}

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import javax.jms.BytesMessage;
import javax.jms.Message;
public class AMQPRawInboundTransformer extends InboundTransformer {
public AMQPRawInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
BytesMessage rc = vendor.createBytesMessage();
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority);
final long now = System.currentTimeMillis();
rc.setJMSTimestamp(now);
if( defaultTtl > 0 ) {
rc.setJMSExpiration(now + defaultTtl);
}
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
return rc;
}
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageFormatException;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
public AutoOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public EncodedMessage transform(Message msg) throws Exception {
if( msg == null )
return null;
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
if( msg instanceof BytesMessage ) {
return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
} else {
return null;
}
} else {
return JMSMappingOutboundTransformer.transform(this, msg);
}
}
}

View File

@ -1,96 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.codec.WritableBuffer;
import java.nio.ByteBuffer;
public class DroppingWritableBuffer implements WritableBuffer
{
int pos = 0;
@Override
public boolean hasRemaining() {
return true;
}
@Override
public void put(byte b) {
pos += 1;
}
@Override
public void putFloat(float f) {
pos += 4;
}
@Override
public void putDouble(double d) {
pos += 8;
}
@Override
public void put(byte[] src, int offset, int length) {
pos += length;
}
@Override
public void putShort(short s) {
pos += 2;
}
@Override
public void putInt(int i) {
pos += 4;
}
@Override
public void putLong(long l) {
pos += 8;
}
@Override
public int remaining() {
return Integer.MAX_VALUE - pos;
}
@Override
public int position() {
return pos;
}
@Override
public void position(int position) {
pos = position;
}
@Override
public void put(ByteBuffer payload) {
pos += payload.remaining();
payload.position(payload.limit());
}
@Override
public int limit() {
return Integer.MAX_VALUE;
}
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.type.Binary;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class EncodedMessage extends Binary {
final long messageFormat;
public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
super(data, offset, length);
this.messageFormat = messageFormat;
}
public long getMessageFormat() {
return messageFormat;
}
public Message decode() throws Exception {
Message amqp = new Message();
int offset = getArrayOffset();
int len = getLength();
while( len > 0 ) {
final int decoded = amqp.decode(getArray(), offset, len);
assert decoded > 0: "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
return amqp;
}
}

View File

@ -1,222 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.messaging.ApplicationProperties;
import org.apache.qpid.proton.type.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.type.messaging.Footer;
import org.apache.qpid.proton.type.messaging.Header;
import org.apache.qpid.proton.type.messaging.MessageAnnotations;
import org.apache.qpid.proton.type.messaging.Properties;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public abstract class InboundTransformer {
JMSVendor vendor;
public static final String TRANSFORMER_NATIVE = "native";
public static final String TRANSFORMER_RAW = "raw";
public static final String TRANSFORMER_JMS = "jms";
String prefixVendor = "JMS_AMQP_";
String prefixDeliveryAnnotations = "DA_";
String prefixMessageAnnotations= "MA_";
String prefixFooter = "FT_";
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
public InboundTransformer(JMSVendor vendor) {
this.vendor = vendor;
}
abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
public int getDefaultDeliveryMode() {
return defaultDeliveryMode;
}
public void setDefaultDeliveryMode(int defaultDeliveryMode) {
this.defaultDeliveryMode = defaultDeliveryMode;
}
public int getDefaultPriority() {
return defaultPriority;
}
public void setDefaultPriority(int defaultPriority) {
this.defaultPriority = defaultPriority;
}
public long getDefaultTtl() {
return defaultTtl;
}
public void setDefaultTtl(long defaultTtl) {
this.defaultTtl = defaultTtl;
}
public String getPrefixVendor() {
return prefixVendor;
}
public void setPrefixVendor(String prefixVendor) {
this.prefixVendor = prefixVendor;
}
public JMSVendor getVendor() {
return vendor;
}
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();
if( header==null ) {
header = new Header();
}
if( header.getDurable()!=null ) {
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
} else {
jms.setJMSDeliveryMode(defaultDeliveryMode);
}
if( header.getPriority()!=null ) {
jms.setJMSPriority(header.getPriority().intValue());
} else {
jms.setJMSPriority(defaultPriority);
}
if( header.getTtl()!=null ) {
jms.setJMSExpiration(header.getTtl().longValue());
} else {
jms.setJMSExpiration(defaultTtl);
}
if( header.getFirstAcquirer() !=null ) {
jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
}
if( header.getDeliveryCount()!=null ) {
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
}
final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
if( da!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
}
}
final MessageAnnotations ma = amqp.getMessageAnnotations();
if( ma!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
String key = entry.getKey().toString();
if( "x-opt-jms-type".equals(key) ) {
jms.setJMSType(entry.getValue().toString());
} else {
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
}
}
}
final Properties properties = amqp.getProperties();
if( properties!=null ) {
if( properties.getMessageId()!=null ) {
jms.setJMSMessageID(properties.getMessageId().toString());
}
Binary userId = properties.getUserId();
if( userId!=null ) {
vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
}
if( properties.getTo()!=null ) {
jms.setJMSDestination(vendor.createDestination(properties.getTo()));
}
if( properties.getSubject()!=null ) {
jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
}
if( properties.getReplyTo() !=null ) {
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
}
if( properties.getCorrelationId() !=null ) {
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
}
if( properties.getContentType() !=null ) {
jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
}
if( properties.getContentEncoding() !=null ) {
jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
}
if( properties.getCreationTime()!=null ) {
jms.setJMSTimestamp(properties.getCreationTime().getTime());
}
if( properties.getGroupId()!=null ) {
vendor.setJMSXGroupID(jms, properties.getGroupId());
}
if( properties.getGroupSequence()!=null ) {
vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
}
if( properties.getReplyToGroupId()!=null ) {
jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
}
}
final ApplicationProperties ap = amqp.getApplicationProperties();
if( ap !=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, key, entry.getValue());
}
}
final Footer fp = amqp.getFooter();
if( fp !=null ) {
for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
}
}
}
private void setProperty(Message msg, String key, Object value) throws JMSException {
//TODO support all types
msg.setObjectProperty(key, value);
// if( value instanceof String ) {
// msg.setStringProperty(key, (String) value);
// } else if( value instanceof Double ) {
// msg.setDoubleProperty(key, ((Double) value).doubleValue());
// } else if( value instanceof Integer ) {
// msg.setIntProperty(key, ((Integer) value).intValue());
// } else if( value instanceof Long ) {
// msg.setLongProperty(key, ((Long) value).longValue());
// } else {
// throw new RuntimeException("Unexpected value type: "+value.getClass());
// }
}
}

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.messaging.*;
import javax.jms.*;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingInboundTransformer extends InboundTransformer {
public JMSMappingInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
Message rc;
final Section body = amqp.getBody();
if( body == null ) {
rc = vendor.createMessage();
} else if( body instanceof Data ) {
Binary d = ((Data) body).getValue();
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
} else if (body instanceof AmqpSequence ) {
AmqpSequence sequence = (AmqpSequence) body;
StreamMessage m = vendor.createStreamMessage();
for( Object item : sequence.getValue()) {
m.writeObject(item);
}
rc = m;
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if( value == null ) {
rc = vendor.createObjectMessage();
} if( value instanceof String ) {
TextMessage m = vendor.createTextMessage();
m.setText((String) value);
rc = m;
} else if( value instanceof Binary ) {
Binary d = (Binary) value;
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
} else if( value instanceof List) {
StreamMessage m = vendor.createStreamMessage();
for( Object item : (List) value) {
m.writeObject(item);
}
rc = m;
} else if( value instanceof Map) {
MapMessage m = vendor.createMapMessage();
final Set<Map.Entry<String, Object>> set = ((Map) value).entrySet();
for (Map.Entry<String, Object> entry : set) {
m.setObject(entry.getKey(), entry.getValue());
}
rc = m;
} else {
ObjectMessage m = vendor.createObjectMessage();
m.setObject((Serializable) value);
rc = m;
}
} else {
throw new RuntimeException("Unexpected body type: "+body.getClass());
}
rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority);
rc.setJMSExpiration(defaultTtl);
populateMessage(rc, amqp);
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
return rc;
}
}

View File

@ -1,212 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.Symbol;
import org.apache.qpid.proton.type.UnsignedByte;
import org.apache.qpid.proton.type.UnsignedInteger;
import org.apache.qpid.proton.type.messaging.*;
import javax.jms.*;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingOutboundTransformer extends OutboundTransformer {
String prefixDeliveryAnnotations = "DA_";
String prefixMessageAnnotations= "MA_";
String prefixFooter = "FT_";
public JMSMappingOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public EncodedMessage transform(Message msg) throws Exception {
if( msg == null )
return null;
try {
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
return null;
}
} catch (MessageFormatException e) {
return null;
}
return transform(this, msg);
}
static EncodedMessage transform(JMSMappingOutboundTransformer options, Message msg) throws JMSException, UnsupportedEncodingException {
final JMSVendor vendor = options.vendor;
final String messageFormatKey = options.prefixVendor + "MESSAGE_FORMAT";
final String nativeKey = options.prefixVendor + "NATIVE";
final String firstAcquirerKey = options.prefixVendor + "FirstAcquirer";
final String prefixDeliveryAnnotationsKey = options.prefixVendor + options.prefixDeliveryAnnotations;
final String prefixMessageAnnotationsKey = options.prefixVendor + options.prefixMessageAnnotations;
final String subjectKey = options.prefixVendor +"Subject";
final String contentTypeKey = options.prefixVendor +"ContentType";
final String contentEncodingKey = options.prefixVendor +"ContentEncoding";
final String replyToGroupIDKey = options.prefixVendor +"ReplyToGroupID";
final String prefixFooterKey = options.prefixVendor + options.prefixFooter;
long messageFormat;
try {
messageFormat = msg.getLongProperty(messageFormatKey);
} catch (MessageFormatException e) {
return null;
}
Header header = new Header();
Properties props=new Properties();
HashMap daMap = null;
HashMap maMap = null;
HashMap apMap = null;
Section body=null;
HashMap footerMap = null;
if( msg instanceof BytesMessage ) {
BytesMessage m = (BytesMessage)msg;
byte data[] = new byte[(int) m.getBodyLength()];
m.readBytes(data);
body = new Data(new Binary(data));
} if( msg instanceof TextMessage ) {
body = new AmqpValue(((TextMessage) msg).getText());
} if( msg instanceof MapMessage ) {
final HashMap map = new HashMap();
final MapMessage m = (MapMessage) msg;
final Enumeration names = m.getMapNames();
while (names.hasMoreElements()) {
String key = (String) names.nextElement();
map.put(key, m.getObject(key));
}
body = new AmqpValue(map);
} if( msg instanceof StreamMessage ) {
ArrayList list = new ArrayList();
final StreamMessage m = (StreamMessage) msg;
try {
while(true) {
list.add(m.readObject());
}
} catch(MessageEOFException e){}
body = new AmqpSequence(list);
} if( msg instanceof ObjectMessage ) {
body = new AmqpValue(((ObjectMessage) msg).getObject());
}
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
if( msg.getJMSExpiration() != 0 ) {
header.setTtl(new UnsignedInteger((int) msg.getJMSExpiration()));
}
if( msg.getJMSType()!=null ) {
if( maMap==null ) maMap = new HashMap();
maMap.put("x-opt-jms-type", msg.getJMSType());
}
if( msg.getJMSMessageID()!=null ) {
props.setMessageId(msg.getJMSMessageID());
}
if( msg.getJMSDestination()!=null ) {
props.setTo(vendor.toAddress(msg.getJMSDestination()));
}
if( msg.getJMSReplyTo()!=null ) {
props.setReplyTo(vendor.toAddress(msg.getJMSDestination()));
}
if( msg.getJMSCorrelationID()!=null ) {
props.setCorrelationId(msg.getJMSCorrelationID());
}
if( msg.getJMSExpiration() != 0 ) {
props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
}
if( msg.getJMSTimestamp()!= 0 ) {
props.setCreationTime(new Date(msg.getJMSTimestamp()));
}
final Enumeration keys = msg.getPropertyNames();
while (keys.hasMoreElements()) {
String key = (String) keys.nextElement();
if( key.equals(messageFormatKey) || key.equals(nativeKey)) {
// skip..
} else if( key.equals(firstAcquirerKey) ) {
header.setFirstAcquirer(msg.getBooleanProperty(key));
} else if( key.startsWith("JMSXDeliveryCount") ) {
header.setDeliveryCount(new UnsignedInteger(msg.getIntProperty(key)));
} else if( key.startsWith("JMSXUserID") ) {
props.setUserId(new Binary(msg.getStringProperty(key).getBytes("UTF-8")));
} else if( key.startsWith("JMSXGroupID") ) {
props.setGroupId(msg.getStringProperty(key));
} else if( key.startsWith("JMSXGroupSeq") ) {
props.setGroupSequence(new UnsignedInteger(msg.getIntProperty(key)));
} else if( key.startsWith(prefixDeliveryAnnotationsKey) ) {
if( daMap == null ) daMap = new HashMap();
String name = key.substring(prefixDeliveryAnnotationsKey.length());
daMap.put(name, msg.getObjectProperty(key));
} else if( key.startsWith(prefixMessageAnnotationsKey) ) {
if( maMap==null ) maMap = new HashMap();
String name = key.substring(prefixMessageAnnotationsKey.length());
maMap.put(name, msg.getObjectProperty(key));
} else if( key.equals(subjectKey) ) {
props.setSubject(msg.getStringProperty(key));
} else if( key.equals(contentTypeKey) ) {
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
} else if( key.equals(contentEncodingKey) ) {
props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
} else if( key.equals(replyToGroupIDKey) ) {
props.setReplyToGroupId(msg.getStringProperty(key));
} else if( key.startsWith(prefixFooterKey) ) {
if( footerMap==null ) footerMap = new HashMap();
String name = key.substring(prefixFooterKey.length());
footerMap.put(name, msg.getObjectProperty(key));
} else {
if( apMap==null ) apMap = new HashMap();
apMap.put(key, msg.getObjectProperty(key));
}
}
MessageAnnotations ma=null;
if( maMap!=null ) ma = new MessageAnnotations(maMap);
DeliveryAnnotations da=null;
if( daMap!=null ) da = new DeliveryAnnotations(daMap);
ApplicationProperties ap=null;
if( apMap!=null ) ap = new ApplicationProperties(apMap);
Footer footer=null;
if( footerMap!=null ) footer = new Footer(footerMap);
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(header, da, ma, props, ap, body, footer);
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
if( overflow.position() > 0 ) {
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
}
return new EncodedMessage(messageFormat, buffer.array(), 0, c);
}
}

View File

@ -1,34 +0,0 @@
package org.apache.activemq.transport.amqp.transform;
import javax.jms.*;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
abstract public class JMSVendor {
public abstract BytesMessage createBytesMessage();
public abstract StreamMessage createStreamMessage();
public abstract Message createMessage();
public abstract TextMessage createTextMessage();
public abstract ObjectMessage createObjectMessage();
public abstract MapMessage createMapMessage();
public abstract void setJMSXUserID(Message msg, String value);
public abstract Destination createDestination(String name);
public abstract void setJMSXGroupID(Message msg, String groupId);
public abstract void setJMSXGroupSequence(Message msg, int i);
public abstract void setJMSXDeliveryCount(Message rc, long l);
public abstract String toAddress(Destination msgDestination);
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.engine.Delivery;
import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public abstract class OutboundTransformer {
JMSVendor vendor;
String prefixVendor = "JMS_AMQP_";
public OutboundTransformer(JMSVendor vendor) {
this.vendor = vendor;
}
public abstract EncodedMessage transform(Message jms) throws Exception;
public String getPrefixVendor() {
return prefixVendor;
}
public void setPrefixVendor(String prefixVendor) {
this.prefixVendor = prefixVendor;
}
public JMSVendor getVendor() {
return vendor;
}
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
}

View File

@ -20,6 +20,7 @@ import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import javax.jms.*;
@ -27,6 +28,7 @@ import java.util.Enumeration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -42,30 +44,21 @@ public class JMSClientTest extends AmqpTestSupport {
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
Message msg = session.createTextMessage("Hello World");
msg.setObjectProperty("x", 1);
p.send(msg);
// session.commit();
/* MessageConsumer c = session.createConsumer(queue, "x = 1");
Message received = c.receive(2000);
assertNotNull(received);
System.out.println("first: " + ((TextMessage)received).getText());
System.out.println(received.getJMSRedelivered());*/
TextMessage message = session.createTextMessage();
message.setText("hello");
p.send(message);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
System.out.println("BROWSE " + enumeration.nextElement());
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
}
// session.rollback();
//
// msg = c.receive();
// System.out.println("second:"+msg);
// System.out.println(msg.getJMSRedelivered());
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertTrue(message instanceof TextMessage);
}
connection.close();

View File

@ -52,19 +52,21 @@ public class JoramJmsTest extends TestCase {
TestSuite suite = new TestSuite();
// TODO: Fix these tests..
if (false) {
// Fails due to durable subs not being implemented.
suite.addTestSuite(TopicSessionTest.class);
// Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl vs QueueImpl mapping issues
if (true) {
// Fails due to https://issues.apache.org/jira/browse/QPID-4454
suite.addTestSuite(MessageHeaderTest.class);
// Fails due to inconsistent Message mapping in the JMS client.
suite.addTestSuite(MessageTypeTest.class);
// Fails due to https://issues.apache.org/jira/browse/QPID-4455
suite.addTestSuite(QueueBrowserTest.class);
}
// TODO: enable once QPID 0.21 is released
if(true) {
suite.addTestSuite(MessageTypeTest.class);
}
suite.addTestSuite(TopicSessionTest.class);
// TODO: enable once QPID 0.19 is released
if(false) {
if(true) {
suite.addTestSuite(UnifiedSessionTest.class);
suite.addTestSuite(TemporaryTopicTest.class);
suite.addTestSuite(TopicConnectionTest.class);

View File

@ -183,7 +183,8 @@
<include>org.jasypt:jasypt</include>
<include>org.jasypt:jasypt-spring3</include>
<include>javax.jmdns:jmdns</include>
<include>org.apache.qpid:qpid-proton</include>
<include>org.apache.qpid:proton</include>
<include>org.apache.qpid:proton-jms</include>
</includes>
</dependencySet>
<dependencySet>

View File

@ -92,7 +92,7 @@
<org.osgi.core-version>4.2.0</org.osgi.core-version>
<p2psockets-version>1.1.2</p2psockets-version>
<qpid-proton-version>1.0-SNAPSHOT</qpid-proton-version>
<qpid-jms-version>0.18</qpid-jms-version>
<qpid-jms-version>0.21-SNAPSHOT</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.4</saxon-version>