ARTEMIS-825 - Invalid selector not handled correctly in AMQP
this changes propogates the error to the client and closes the sender correctly https://issues.apache.org/jira/browse/ARTEMIS-825
This commit is contained in:
parent
329c533d21
commit
6628db4892
|
@ -20,6 +20,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
@ -140,8 +141,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
try {
|
try {
|
||||||
SelectorParser.parse(selector);
|
SelectorParser.parse(selector);
|
||||||
} catch (FilterException e) {
|
} catch (FilterException e) {
|
||||||
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
|
throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
supportedFilters.put(filter.getKey(), filter.getValue());
|
supportedFilters.put(filter.getKey(), filter.getValue());
|
||||||
|
@ -313,6 +313,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
@Override
|
@Override
|
||||||
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
|
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
|
||||||
closed = true;
|
closed = true;
|
||||||
|
if (condition != null) {
|
||||||
|
sender.setCondition(condition);
|
||||||
|
}
|
||||||
protonSession.removeSender(sender);
|
protonSession.removeSender(sender);
|
||||||
synchronized (connection.getLock()) {
|
synchronized (connection.getLock()) {
|
||||||
sender.close();
|
sender.close();
|
||||||
|
|
|
@ -45,6 +45,8 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test basic send and receive scenarios using only AMQP sender and receiver links.
|
* Test basic send and receive scenarios using only AMQP sender and receiver links.
|
||||||
*/
|
*/
|
||||||
|
@ -132,6 +134,24 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testInvalidFilter() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
try {
|
||||||
|
session.createReceiver(getTestName(), "null = 'f''", true);
|
||||||
|
fail("should throw exception");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e.getCause() instanceof JMSException);
|
||||||
|
//passed
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testQueueReceiverReadMessage() throws Exception {
|
public void testQueueReceiverReadMessage() throws Exception {
|
||||||
sendMessages(getTestName(), 1);
|
sendMessages(getTestName(), 1);
|
||||||
|
|
Loading…
Reference in New Issue