Temp queues verified to work now.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1403689 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-30 13:31:04 +00:00
parent 4018202226
commit 3b0b1fa7da
2 changed files with 62 additions and 48 deletions

View File

@ -324,7 +324,6 @@ class AmqpProtocolConverter {
private ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
private long nextTempDestinationId = 0;
HashMap<Sender, ActiveMQDestination> tempDestinations = new HashMap<Sender, ActiveMQDestination>();
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
@ -604,8 +603,19 @@ class AmqpProtocolConverter {
receiver.open();
pumpProtonToSocket();
} else {
org.apache.qpid.proton.type.messaging.Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest = createDestination(remoteTarget);
ActiveMQDestination dest;
if( target.getDynamic() ) {
dest = createTempQueue();
org.apache.qpid.proton.type.messaging.Target actualTarget = new org.apache.qpid.proton.type.messaging.Target();
actualTarget.setAddress(dest.getQualifiedName());
actualTarget.setDynamic(true);
receiver.setTarget(actualTarget);
} else {
dest = createDestination(remoteTarget);
}
ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext);
@ -645,12 +655,6 @@ class AmqpProtocolConverter {
}
}
private Source createSource(ActiveMQDestination dest) {
org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
rc.setAddress(getInboundTransformer().getVendor().toAddress(dest));
return rc;
}
OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
@ -736,22 +740,27 @@ class AmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
jms.setRedeliveryCounter(md.getRedeliveryCounter());
final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
if( presettle ) {
currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
final byte[] tag = nextTag();
currentDelivery = sender.delivery(tag, 0, tag.length);
}
currentDelivery.setContext(md);
if( jms==null ) {
// It's the end of browse signal.
sender.drained();
} else {
// TODO: message could not be generated what now?
jms.setRedeliveryCounter(md.getRedeliveryCounter());
final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
if( presettle ) {
currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
final byte[] tag = nextTag();
currentDelivery = sender.delivery(tag, 0, tag.length);
}
currentDelivery.setContext(md);
} else {
// TODO: message could not be generated what now?
}
}
} catch (Exception e) {
e.printStackTrace();
@ -857,27 +866,18 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.put(id, consumerContext);
ActiveMQDestination dest;
final Source remoteSource = sender.getRemoteSource();
if( remoteSource != null ) {
dest = createDestination(remoteSource);
org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
if( source != null && !source.getDynamic() ) {
dest = createDestination(source);
} else {
// lets create a temp dest.
// if (topic) {
// dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
// } else {
dest = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
// }
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionId);
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(dest);
sendToActiveMQ(info, null);
tempDestinations.put(sender, dest);
sender.setSource(createSource(dest));
dest = createTempQueue();
source = new org.apache.qpid.proton.type.messaging.Source();
source.setAddress(dest.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
}
sender.setContext(consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(dest);
@ -903,6 +903,17 @@ class AmqpProtocolConverter {
}
private ActiveMQDestination createTempQueue() {
ActiveMQDestination rc;
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionId);
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(rc);
sendToActiveMQ(info, null);
return rc;
}
// void onUnSubscribe(UNSUBSCRIBE command) {
// UTF8Buffer[] topics = command.topics();
// if (topics != null) {

View File

@ -194,14 +194,17 @@ public abstract class InboundTransformer {
private void setProperty(Message msg, String key, Object value) throws JMSException {
//TODO support all types
if( value instanceof String ) {
msg.setStringProperty(key, (String) value);
} else if( value instanceof Integer ) {
msg.setIntProperty(key, ((Integer) value).intValue());
} else if( value instanceof Long ) {
msg.setLongProperty(key, ((Long) value).longValue());
} else {
throw new RuntimeException("Unexpected value type: "+value.getClass());
}
msg.setObjectProperty(key, value);
// if( value instanceof String ) {
// msg.setStringProperty(key, (String) value);
// } else if( value instanceof Double ) {
// msg.setDoubleProperty(key, ((Double) value).doubleValue());
// } else if( value instanceof Integer ) {
// msg.setIntProperty(key, ((Integer) value).intValue());
// } else if( value instanceof Long ) {
// msg.setLongProperty(key, ((Long) value).longValue());
// } else {
// throw new RuntimeException("Unexpected value type: "+value.getClass());
// }
}
}