Merge pull request #1200 from cshannon/AMQ-9475

AMQ-9475 - ConsumerControl commands should not auto create wildcard dests
This commit is contained in:
Christopher L. Shannon 2024-04-11 06:55:34 -04:00 committed by GitHub
commit 78d9555233
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 153 additions and 5 deletions

View File

@ -21,6 +21,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -543,7 +545,11 @@ public abstract class AbstractRegion implements Region {
return sub.pullMessage(context, pull);
}
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary) throws Exception {
return lookup(context, destination, createTemporary, true);
}
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary, boolean autoCreate) throws Exception {
Destination dest = null;
destinationsLock.readLock().lock();
@ -553,7 +559,7 @@ public abstract class AbstractRegion implements Region {
destinationsLock.readLock().unlock();
}
if (dest == null) {
if (autoCreate && dest == null) {
if (isAutoCreateDestinations()) {
// Try to auto create the destination... re-invoke broker
// from the
@ -679,8 +685,8 @@ public abstract class AbstractRegion implements Region {
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
Subscription sub = subscriptions.get(control.getConsumerId());
if (sub != null && sub instanceof AbstractSubscription) {
final Subscription sub = subscriptions.get(control.getConsumerId());
if (sub instanceof AbstractSubscription) {
((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination());
@ -691,7 +697,17 @@ public abstract class AbstractRegion implements Region {
LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}",
control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize());
try {
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
final ActiveMQDestination controlDest = Objects.requireNonNull(control.getDestination(),
"Destination must not be null in ConsumerControl");
// Don't auto create patterns (wildcard topics) or composite, this matches addConsumer()
final boolean autoCreate = !controlDest.isPattern() && !controlDest.isComposite();
// If autoCreate is false then lookup() will just return null if the destination
// does not exist and we can skip the call to wakeup. This will prevent creating
// wildcard destinations for wildcard consumers but will use them if they exist
Optional.ofNullable(lookup(consumerExchange.getConnectionContext(),
control.getDestination(),false, autoCreate))
.ifPresent(Destination::wakeup);
} catch (Exception e) {
LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e);
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerControl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import jakarta.jms.*;
import java.net.URI;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* This tests that subscribing to a wildcard and sending a ConsumerControl
* command for that wildcard sub will not auto create the destination
* by mistake.
*/
@RunWith(Parameterized.class)
public class AMQ9475Test {
@Parameters(name = "queue={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { true }, { false } });
}
public AMQ9475Test(boolean queue) {
this.destination1 = queue ? new ActiveMQQueue("a.>") : new ActiveMQTopic("a.>");
this.destination2 = queue ? new ActiveMQQueue("a") : new ActiveMQTopic("a");
}
private BrokerService brokerService;
private String connectionUri;
private final ActiveMQDestination destination1;
private final ActiveMQDestination destination2;
protected ConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri);
conFactory.setWatchTopicAdvisories(false);
return conFactory;
}
@Before
public void setUp() throws Exception {
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
}
// Normal use case to verify wildcard sub is not created
@Test
public void testNormalWildcardSub() throws Exception {
Session session;
try (Connection connection = createConnectionFactory().createConnection()) {
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination1);
sendMessage(session, destination2, "test");
assertNotNull(consumer.receive(1000));
assertNull(brokerService.getBroker().getDestinationMap().get(destination1));
assertNotNull(brokerService.getBroker().getDestinationMap().get(destination2));
}
}
// Test that the wildcard dest is still not auto-created even after sending the
// ConsumerControl object for it
@Test
public void testWildcardConsumerControl() throws Exception {
Session session;
try (ActiveMQConnection connection = (ActiveMQConnection) createConnectionFactory().createConnection()) {
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination1);
ConsumerControl control = new ConsumerControl();
control.setDestination(destination1);
control.setConsumerId(consumer.getConsumerId());
control.setPrefetch(10);
connection.syncSendPacket(control);
sendMessage(session, destination2, "test");
assertNotNull(consumer.receive(1000));
assertNull(brokerService.getBroker().getDestinationMap().get(destination1));
assertNotNull(brokerService.getBroker().getDestinationMap().get(destination2));
}
}
@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
private void sendMessage(Session session, Destination destination, String text) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(text));
producer.close();
}
}