Add support for encoding the destination type in transformed messages as
a byte value to supoort future JMS->AMQP spec mappings.
This commit is contained in:
Timothy Bish 2014-10-20 09:49:36 -04:00
parent 5a6129b512
commit 4881a848dc
2 changed files with 28 additions and 0 deletions

View File

@ -118,12 +118,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay"); private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
protected int prefetch; protected int prefetch;
protected Transport protonTransport = Proton.transport(); protected Transport protonTransport = Proton.transport();
protected Connection protonConnection = Proton.connection(); protected Connection protonConnection = Proton.connection();
protected Collector eventCollector = new CollectorImpl(); protected Collector eventCollector = new CollectorImpl();
protected boolean useByteDestinationTypeAnnotation;
public AmqpProtocolConverter(AmqpTransport transport) { public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport; this.amqpTransport = transport;
@ -134,6 +136,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
this.protonTransport.setMaxFrameSize(maxFrameSize); this.protonTransport.setMaxFrameSize(maxFrameSize);
} }
useByteDestinationTypeAnnotation = transport.getWireFormat().isUseByteDestinationTypeAnnotation();
this.protonTransport.bind(this.protonConnection); this.protonTransport.bind(this.protonConnection);
// NOTE: QPid JMS client has a bug where the channel max is stored as a // NOTE: QPid JMS client has a bug where the channel max is stored as a
@ -456,6 +460,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
connectionInfo.setClientId(clientId); connectionInfo.setClientId(clientId);
} }
Map<Symbol, Object> props = protonConnection.getRemoteProperties();
if (props != null) {
if (props.containsKey(JMS_MAPPING_VERSION)) {
useByteDestinationTypeAnnotation = true;
}
}
if (useByteDestinationTypeAnnotation) {
outboundTransformer.setUseByteDestinationTypeAnnotations(true);
}
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() { sendToActiveMQ(connectionInfo, new ResponseHandler() {
@ -529,6 +544,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
LOG.warn("Unknown transformer type {} using native one instead", transformer); LOG.warn("Unknown transformer type {} using native one instead", transformer);
inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
} }
if (useByteDestinationTypeAnnotation) {
inboundTransformer.setUseByteDestinationTypeAnnotations(true);
}
} }
return inboundTransformer; return inboundTransformer;
} }

View File

@ -41,6 +41,7 @@ public class AmqpWireFormat implements WireFormat {
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private String anonymousNodeName = "$relay"; private String anonymousNodeName = "$relay";
private boolean useByteDestinationTypeAnnotation = false;
@Override @Override
public ByteSequence marshal(Object command) throws IOException { public ByteSequence marshal(Object command) throws IOException {
@ -135,4 +136,12 @@ public class AmqpWireFormat implements WireFormat {
public void setAnonymousNodeName(String anonymousNodeName) { public void setAnonymousNodeName(String anonymousNodeName) {
this.anonymousNodeName = anonymousNodeName; this.anonymousNodeName = anonymousNodeName;
} }
public boolean isUseByteDestinationTypeAnnotation() {
return useByteDestinationTypeAnnotation;
}
public void setUseByteDestinationTypeAnnotation(boolean useByteDestinationTypeAnnotation) {
this.useByteDestinationTypeAnnotation = useByteDestinationTypeAnnotation;
}
} }