When the AMQP source or target address is not set, close the sender/receiver and report the error.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1438540 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-01-25 14:35:18 +00:00
parent 6473236e34
commit 37423d661c
3 changed files with 158 additions and 131 deletions

View File

@ -581,58 +581,68 @@ class AmqpProtocolConverter {
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
// Client is producing to this receiver object
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
if( remoteTarget instanceof Coordinator ) {
pumpProtonToSocket();
receiver.setContext(coordinatorContext);
receiver.flow(prefetch);
receiver.open();
pumpProtonToSocket();
} else {
org.apache.qpid.proton.amqp.messaging.Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest;
if( target.getDynamic() ) {
dest = createTempQueue();
org.apache.qpid.proton.amqp.messaging.Target actualTarget = new org.apache.qpid.proton.amqp.messaging.Target();
actualTarget.setAddress(dest.getQualifiedName());
actualTarget.setDynamic(true);
receiver.setTarget(actualTarget);
try {
if( remoteTarget instanceof Coordinator ) {
pumpProtonToSocket();
receiver.setContext(coordinatorContext);
receiver.flow(prefetch);
receiver.open();
pumpProtonToSocket();
} else {
dest = createDestination(remoteTarget);
}
ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext);
receiver.flow(prefetch);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
receiver.close();
} else {
receiver.open();
}
pumpProtonToSocket();
Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest;
if( target.getDynamic() ) {
dest = createTempQueue();
Target actualTarget = new Target();
actualTarget.setAddress(dest.getQualifiedName());
actualTarget.setDynamic(true);
receiver.setTarget(actualTarget);
} else {
dest = createDestination(remoteTarget);
}
});
ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext);
receiver.flow(prefetch);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
receiver.close();
} else {
receiver.open();
}
pumpProtonToSocket();
}
});
}
} catch (AmqpProtocolException exception) {
receiver.setTarget(null);
((LinkImpl)receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage()));
receiver.close();
}
}
private ActiveMQDestination createDestination(Object terminus) {
private ActiveMQDestination createDestination(Object terminus) throws AmqpProtocolException {
if( terminus == null ) {
return null;
} else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)terminus;
if( source.getAddress() == null ) {
throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
}
return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target)terminus;
if( target.getAddress() == null ) {
throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
}
return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof Coordinator ) {
Coordinator target = (Coordinator)terminus;
@ -854,116 +864,121 @@ class AmqpProtocolConverter {
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)sender.getRemoteSource();
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
sender.setContext(consumerContext);
try {
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
sender.setContext(consumerContext);
String selector = null;
if( source!=null ) {
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
if( value!=null ) {
selector = value.getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.setSource(null);
((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
sender.close();
consumerContext.closed = true;
return;
String selector = null;
if( source!=null ) {
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
if( value!=null ) {
selector = value.getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.setSource(null);
((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
sender.close();
consumerContext.closed = true;
return;
}
}
}
}
}
ActiveMQDestination dest;
if( source == null ) {
ActiveMQDestination dest;
if( source == null ) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress("");
source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
sender.setSource(source);
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress("");
source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
sender.setSource(source);
// Looks like durable sub removal.
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
// Looks like durable sub removal.
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
consumerContext.closed=true;
sendToActiveMQ(rsi, new ResponseHandler() {
consumerContext.closed=true;
sendToActiveMQ(rsi, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
}
sender.open();
pumpProtonToSocket();
}
});
return;
} else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
consumerContext.closed=true;
sender.close();
pumpProtonToSocket();
return;
} else if( source.getDynamic() ) {
// lets create a temp dest.
dest = createTempQueue();
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(dest.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
} else {
dest = createDestination(source);
}
subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
if( source.getDistributionMode() == COPY && dest.isQueue() ) {
consumerInfo.setBrowser(true);
}
if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
consumerInfo.setSubscriptionName(sender.getName());
}
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(NO_LOCAL);
if( value!=null ) {
consumerInfo.setNoLocal(true);
}
}
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
if( exception instanceof InvalidSelectorException ) {
name = "amqp:invalid-field";
}
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
sender.open();
}
sender.open();
pumpProtonToSocket();
}
});
return;
} else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
consumerContext.closed=true;
} catch (AmqpProtocolException e) {
sender.setSource(null);
((LinkImpl)sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
sender.close();
pumpProtonToSocket();
return;
} else if( source.getDynamic() ) {
// lets create a temp dest.
dest = createTempQueue();
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(dest.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
} else {
dest = createDestination(source);
}
subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
if( source.getDistributionMode() == COPY && dest.isQueue() ) {
consumerInfo.setBrowser(true);
}
if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
consumerInfo.setSubscriptionName(sender.getName());
}
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(NO_LOCAL);
if( value!=null ) {
consumerInfo.setNoLocal(true);
}
}
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
if( exception instanceof InvalidSelectorException ) {
name = "amqp:invalid-field";
}
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
sender.open();
}
pumpProtonToSocket();
}
});
}
static private boolean contains(Symbol[] haystack, Symbol needle) {

View File

@ -23,6 +23,7 @@ public class AmqpProtocolException extends IOException {
private static final long serialVersionUID = -2869735532997332242L;
private final String symbolicName;
private final boolean fatal;
public AmqpProtocolException() {
@ -37,8 +38,17 @@ public class AmqpProtocolException extends IOException {
this(s, fatal, null);
}
public AmqpProtocolException(String s, String msg) {
this(s, msg, false, null);
}
public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
this("error", s, fatal, cause);
}
public AmqpProtocolException(String symbolicName, String s, boolean fatal, Throwable cause) {
super(s);
this.symbolicName = symbolicName;
this.fatal = fatal;
initCause(cause);
}
@ -47,4 +57,7 @@ public class AmqpProtocolException extends IOException {
return fatal;
}
public String getSymbolicName() {
return symbolicName;
}
}

View File

@ -89,7 +89,6 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da
// static helper methods for working with destinations
// -------------------------------------------------------------------------
public static ActiveMQDestination createDestination(String name, byte defaultType) {
if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
} else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {