Allow for an anonymous relay using a configurable node name when
creating the new link, default is $relay.  Message's that arrive without
a to field set are rejected as this is required for a relay.
This commit is contained in:
Timothy Bish 2014-10-14 17:32:23 -04:00
parent 27833d025e
commit 78cb1120b7
2 changed files with 62 additions and 4 deletions

View File

@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
@ -111,10 +112,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private static final int CHANNEL_MAX = 32767;
private final AmqpTransport amqpTransport; private final AmqpTransport amqpTransport;
private static final Symbol COPY = Symbol.getSymbol("copy"); private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
protected int prefetch; protected int prefetch;
@ -132,10 +135,33 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
this.protonTransport.bind(this.protonConnection); this.protonTransport.bind(this.protonConnection);
// NOTE: QPid JMS client has a bug where the channel max is stored as a
// short value in the Connection class which means that if we allow
// the default channel max of 65535 to be sent then no new sessions
// can be created because the value would be -1 when checked.
this.protonTransport.setChannelMax(CHANNEL_MAX);
this.protonConnection.collect(eventCollector); this.protonConnection.collect(eventCollector);
this.protonConnection.setProperties(getConnectionProperties());
updateTracer(); updateTracer();
} }
/**
* Load and return a <code>Map<Symbol, Object></code> that contains the connection
* properties which will allow the client to better communicate with this broker.
*
* @return the properties that are sent to new clients on connect.
*/
protected Map<Symbol, Object> getConnectionProperties() {
Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
properties.put(ANONYMOUS_RELAY, amqpTransport.getWireFormat().getAnonymousNodeName());
return properties;
}
@Override @Override
public void updateTracer() { public void updateTracer() {
if (amqpTransport.isTrace()) { if (amqpTransport.isTrace()) {
@ -559,10 +585,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination; private final ActiveMQDestination destination;
private boolean closed; private boolean closed;
private final boolean anonymous;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) { public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
this.producerId = producerId; this.producerId = producerId;
this.destination = destination; this.destination = destination;
this.anonymous = anonymous;
} }
@Override @Override
@ -581,6 +609,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (destination != null) { if (destination != null) {
message.setJMSDestination(destination); message.setJMSDestination(destination);
} else if (isAnonymous()) {
Destination toDestination = message.getJMSDestination();
if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription("Missing to field for message sent to an anonymous producer");
rejected.setError(condition);
delivery.disposition(rejected);
return;
}
} }
message.setProducerId(producerId); message.setProducerId(producerId);
@ -673,6 +712,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
sendToActiveMQ(new RemoveInfo(producerId), null); sendToActiveMQ(new RemoveInfo(producerId), null);
} }
} }
public boolean isAnonymous() {
return anonymous;
}
} }
long nextTransactionId = 1; long nextTransactionId = 1;
@ -795,8 +838,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else { } else {
Target target = (Target) remoteTarget; Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest; ActiveMQDestination dest = null;
if (target.getDynamic()) { boolean anonymous = false;
if (target.getAddress().equals(amqpTransport.getWireFormat().getAnonymousNodeName())) {
anonymous = true;
} else if (target.getDynamic()) {
dest = createTempQueue(); dest = createTempQueue();
Target actualTarget = new Target(); Target actualTarget = new Target();
actualTarget.setAddress(dest.getQualifiedName()); actualTarget.setAddress(dest.getQualifiedName());
@ -806,10 +853,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
dest = createDestination(remoteTarget); dest = createDestination(remoteTarget);
} }
ProducerContext producerContext = new ProducerContext(producerId, dest); ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
receiver.setContext(producerContext); receiver.setContext(producerContext);
receiver.flow(flow); receiver.flow(flow);
ProducerInfo producerInfo = new ProducerInfo(producerId); ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest); producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() { sendToActiveMQ(producerInfo, new ResponseHandler() {
@ -1383,6 +1431,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
return condition; return condition;
} }
@Override
public void setPrefetch(int prefetch) { public void setPrefetch(int prefetch) {
this.prefetch = prefetch; this.prefetch = prefetch;
} }

View File

@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat {
private int version = 1; private int version = 1;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private String anonymousNodeName = "$relay";
@Override @Override
public ByteSequence marshal(Object command) throws IOException { public ByteSequence marshal(Object command) throws IOException {
@ -126,4 +127,12 @@ public class AmqpWireFormat implements WireFormat {
public void setMaxAmqpFrameSize(int maxAmqpFrameSize) { public void setMaxAmqpFrameSize(int maxAmqpFrameSize) {
this.maxAmqpFrameSize = maxAmqpFrameSize; this.maxAmqpFrameSize = maxAmqpFrameSize;
} }
public String getAnonymousNodeName() {
return anonymousNodeName;
}
public void setAnonymousNodeName(String anonymousNodeName) {
this.anonymousNodeName = anonymousNodeName;
}
} }