initial amqp selector support

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401674 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-10-24 13:32:32 +00:00
parent d3571a6154
commit 2a0a0b66f6
3 changed files with 27 additions and 14 deletions

View File

@ -22,12 +22,16 @@ import org.apache.activemq.transport.amqp.transform.*;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.codec.Decoder;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.DescribedType;
import org.apache.qpid.proton.type.Symbol;
import org.apache.qpid.proton.type.messaging.*;
import org.apache.qpid.proton.type.messaging.Modified;
import org.apache.qpid.proton.type.messaging.Rejected;
@ -359,7 +363,8 @@ class AmqpProtocolConverter {
}
}
InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
//InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
abstract class BaseProducerContext extends AmqpDeliveryListener {
@ -776,7 +781,6 @@ class AmqpProtocolConverter {
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
// sender.get
ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
@ -810,6 +814,11 @@ class AmqpProtocolConverter {
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
Map filter = ((org.apache.qpid.proton.type.messaging.Source)remoteSource).getFilter();
if (filter != null) {
DescribedType type = (DescribedType)filter.get(Symbol.valueOf("jms-selector"));
consumerInfo.setSelector(type.getDescribed().toString());
}
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {

View File

@ -182,7 +182,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
}
final ApplicationProperties ap = amqp.getApplicationProperties();
if( da!=null ) {
if( ap !=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, key, entry.getValue());
@ -190,7 +190,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
}
final Footer fp = amqp.getFooter();
if( da!=null ) {
if( fp !=null ) {
for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
@ -203,12 +203,13 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
}
private void setProperty(Message msg, String key, Object value) throws JMSException {
//TODO support all types
if( value instanceof String ) {
msg.setStringProperty(key, (String) value);
// } else if( value instanceof Integer ) {
// msg.setIntProperty(key, ((Integer) value).intValue());
// } else if( value instanceof Long ) {
// msg.setLongProperty(key, ((Long) value).longValue());
} else if( value instanceof Integer ) {
msg.setIntProperty(key, ((Integer) value).intValue());
} else if( value instanceof Long ) {
msg.setLongProperty(key, ((Long) value).longValue());
} else {
throw new RuntimeException("Unexpected value type: "+value.getClass());
}

View File

@ -24,6 +24,7 @@ import org.junit.Test;
import javax.jms.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -39,13 +40,15 @@ public class JMSClientTest extends AmqpTestSupport {
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
p.send(session.createTextMessage("Hello World"));
Message msg = session.createTextMessage("Hello World");
msg.setObjectProperty("x", 1);
p.send(msg);
// session.commit();
MessageConsumer c = session.createConsumer(queue);
Message msg = c.receive();
System.out.println("first:"+msg);
System.out.println(msg.getJMSRedelivered());
MessageConsumer c = session.createConsumer(queue, "x = 1");
Message received = c.receive(2000);
assertNotNull(received);
System.out.println("first: " + ((TextMessage)received).getText());
System.out.println(received.getJMSRedelivered());
// session.rollback();
//