ARTEMIS-5182 STOMP sub w/noLocal + selector missing messages

This commit is contained in:
Justin Bertram 2024-12-17 16:19:43 -06:00 committed by Bruscino Domenico Francesco
parent 505180dc7b
commit 2b7737301f
3 changed files with 105 additions and 15 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.protocol.stomp; package org.apache.activemq.artemis.core.protocol.stomp;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -57,7 +60,6 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING; import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
@ -558,6 +560,7 @@ public final class StompConnection extends AbstractRemotingConnection {
boolean noLocal, boolean noLocal,
RoutingType subscriptionType, RoutingType subscriptionType,
Integer consumerWindowSize) throws ActiveMQStompException { Integer consumerWindowSize) throws ActiveMQStompException {
validateSelector(selector);
autoCreateDestinationIfPossible(destination, subscriptionType); autoCreateDestinationIfPossible(destination, subscriptionType);
checkDestination(destination); checkDestination(destination);
checkRoutingSemantics(destination, subscriptionType); checkRoutingSemantics(destination, subscriptionType);
@ -566,7 +569,7 @@ public final class StompConnection extends AbstractRemotingConnection {
if (selector == null) { if (selector == null) {
selector = noLocalFilter; selector = noLocalFilter;
} else { } else {
selector += " AND " + noLocalFilter; selector = "(" + selector + ") AND " + noLocalFilter;
} }
} }
@ -593,6 +596,21 @@ public final class StompConnection extends AbstractRemotingConnection {
} }
} }
private void validateSelector(String selector) throws ActiveMQStompException {
// user may not specify a selector; that's ok
if (selector == null) {
return;
}
if (selector.isEmpty()) {
throw new ActiveMQStompException("Selector cannot be empty").setHandler(frameHandler);
}
try {
SelectorParser.parse(selector);
} catch (FilterException e) {
throw new ActiveMQStompException("Invalid selector \"" + selector + "\"", e).setHandler(frameHandler);
}
}
public void unsubscribe(String subscriptionID, String durableSubscriptionName) throws ActiveMQStompException { public void unsubscribe(String subscriptionID, String durableSubscriptionName) throws ActiveMQStompException {
try { try {
manager.unsubscribe(this, subscriptionID, durableSubscriptionName); manager.unsubscribe(this, subscriptionID, durableSubscriptionName);

View File

@ -1501,6 +1501,28 @@ public class StompTest extends StompTestBase {
conn.disconnect(); conn.disconnect();
} }
@Test
public void testSubscribeToTopicWithNoLocalAndSelector() throws Exception {
conn.connect(defUser, defPass);
subscribeTopic(conn, null, null, null, true, true, "a=foo OR b=bar");
// send a message on the same connection => it should not be received as noLocal = true on subscribe
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getTopicPrefix() + getTopicName()).addHeader("b", "bar").setBody("Hello World");
conn.sendFrame(frame);
frame = conn.receiveFrame(100);
assertNull(frame, "No message should have been received since noLocal=true");
// send message on another JMS connection => it should be received
sendJmsMessage(getName().getBytes(StandardCharsets.UTF_8), "b", "bar", topic);
frame = conn.receiveFrame(10000);
assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
assertEquals(getName(), frame.getBody());
conn.disconnect();
}
@Test @Test
public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception { public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception {
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
@ -2170,4 +2192,40 @@ public class StompTest extends StompTestBase {
conn_r2.disconnect(); conn_r2.disconnect();
} }
} }
@Test
public void testSubscribeToTopicWithEmptySelector() throws Exception {
conn.connect(defUser, defPass);
ClientStompFrame frame = subscribeTopic(conn, null, null, null, true, true, " ");
assertNotNull(frame);
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
assertEquals("Selector cannot be empty", frame.getHeader(Stomp.Headers.Error.MESSAGE));
}
@Test
public void testSubscribeToQueueWithEmptySelector() throws Exception {
conn.connect(defUser, defPass);
ClientStompFrame frame = subscribe(conn, null, null, null, " ");
assertNotNull(frame);
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
assertEquals("Selector cannot be empty", frame.getHeader(Stomp.Headers.Error.MESSAGE));
}
@Test
public void testSubscribeToTopicWithInvalidSelector() throws Exception {
conn.connect(defUser, defPass);
ClientStompFrame frame = subscribeTopic(conn, null, null, null, true, true, ") foo = 'bar' (");
assertNotNull(frame);
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
assertTrue(frame.getHeader(Stomp.Headers.Error.MESSAGE).contains("Invalid selector"));
}
@Test
public void testSubscribeToQueueWithInvalidSelector() throws Exception {
conn.connect(defUser, defPass);
ClientStompFrame frame = subscribe(conn, null, null, null, ") foo = 'bar' (");
assertNotNull(frame);
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
assertTrue(frame.getHeader(Stomp.Headers.Error.MESSAGE).contains("Invalid selector"));
}
} }

View File

@ -478,7 +478,17 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String durableId, String durableId,
boolean receipt, boolean receipt,
boolean noLocal) throws IOException, InterruptedException { boolean noLocal) throws IOException, InterruptedException {
return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal); return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, null);
}
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
String subscriptionId,
String ack,
String durableId,
boolean receipt,
boolean noLocal,
String selector) throws IOException, InterruptedException {
return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, selector);
} }
public static ClientStompFrame subscribeTopicLegacyActiveMQ(StompClientConnection conn, public static ClientStompFrame subscribeTopicLegacyActiveMQ(StompClientConnection conn,
@ -487,7 +497,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String durableId, String durableId,
boolean receipt, boolean receipt,
boolean noLocal) throws IOException, InterruptedException { boolean noLocal) throws IOException, InterruptedException {
return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal); return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, null);
} }
public static ClientStompFrame subscribeTopic(StompClientConnection conn, public static ClientStompFrame subscribeTopic(StompClientConnection conn,
@ -496,7 +506,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String durableId, String durableId,
String durableIdHeader, String durableIdHeader,
boolean receipt, boolean receipt,
boolean noLocal) throws IOException, InterruptedException { boolean noLocal,
String selector) throws IOException, InterruptedException {
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, RoutingType.MULTICAST.toString()) .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, RoutingType.MULTICAST.toString())
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() + getTopicName()); .addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() + getTopicName());
@ -516,6 +527,9 @@ public abstract class StompTestBase extends ActiveMQTestBase {
if (noLocal) { if (noLocal) {
frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true"); frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true");
} }
if (selector != null) {
frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
}
frame = conn.sendFrame(frame); frame = conn.sendFrame(frame);