ARTEMIS-2076 Make Filter update-able

Add Tests
Add implementation inline with other queue updatable settings.
Enhance tests to ensure queue is not destroyed during config change and messages in queue already are preserved
This commit is contained in:
Michael André Pearce 2018-09-07 06:05:33 +01:00 committed by Clebert Suconic
parent cbe46ce89e
commit 4b88f38b2d
17 changed files with 290 additions and 15 deletions

View File

@ -705,6 +705,7 @@ public interface ActiveMQServerControl {
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,

View File

@ -868,12 +868,13 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
Boolean purgeOnNoConsumers,
Boolean exclusive,
String user) throws Exception {
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
}
@Override
public String updateQueue(String name,
String routingType,
String filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
@ -885,7 +886,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
if (queue == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
}

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -66,6 +67,7 @@ public interface PostOffice extends ActiveMQComponent {
QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
Filter filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,

View File

@ -32,8 +32,6 @@ public class LocalQueueBinding implements QueueBinding {
private final Queue queue;
private final Filter filter;
private final SimpleString clusterName;
private SimpleString name;
@ -45,8 +43,6 @@ public class LocalQueueBinding implements QueueBinding {
this.name = queue.getName();
filter = queue.getFilter();
clusterName = queue.getName().concat(nodeID);
}
@ -57,7 +53,7 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public Filter getFilter() {
return filter;
return queue.getFilter();
}
@Override
@ -158,7 +154,7 @@ public class LocalQueueBinding implements QueueBinding {
", queue=" +
queue +
", filter=" +
filter +
getFilter() +
", name=" +
name +
", clusterName=" +

View File

@ -465,6 +465,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
Filter filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
@ -522,6 +523,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
}
if (filter != null && !filter.equals(queue.getFilter())) {
changed = true;
queue.setFilter(filter);
}
if (logger.isDebugEnabled()) {
if (user == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user");

View File

@ -537,6 +537,7 @@ public interface ActiveMQServer extends ServiceComponent {
Queue updateQueue(String name,
RoutingType routingType,
String filterString,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,

View File

@ -45,6 +45,8 @@ public interface Queue extends Bindable,CriticalComponent {
Filter getFilter();
void setFilter(Filter filter);
PageSubscription getPageSubscription();
RoutingType getRoutingType();

View File

@ -2790,7 +2790,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
} else {
// if the address::queue doesn't exist then create it
try {
@ -3252,19 +3252,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Boolean purgeOnNoConsumers,
Boolean exclusive,
String user) throws Exception {
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
}
@Override
public Queue updateQueue(String name,
RoutingType routingType,
String filterString,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
String user) throws Exception {
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
final Filter filter = FilterImpl.createFilter(filterString);
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
if (queueBinding != null) {
final Queue queue = queueBinding.getQueue();
return queue;

View File

@ -40,8 +40,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@ -119,6 +119,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private static final Logger logger = Logger.getLogger(QueueImpl.class);
private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime");
private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter");
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@ -695,7 +696,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public Filter getFilter() {
return filter;
return filterUpdater.get(this);
}
@Override
public void setFilter(Filter filter) {
filterUpdater.set(this, filter);
}
@Override

View File

@ -882,6 +882,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public void setFilter(Filter filter) {
}
@Override
public PageSubscription getPageSubscription() {
return null;

View File

@ -38,6 +38,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
@ -106,6 +107,102 @@ public class RedeployTest extends ActiveMQTestBase {
}
}
@Test
public void testRedeployFilter() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setStringProperty("x", "x");
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(5000));
consumer.close();
}
//Send a message that should remain in the queue (this ensures config change is non-destructive)
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("hello");
message.setStringProperty("x", "x");
producer.send(message);
}
Binding binding = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue"));
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Binding bindingAfterChange = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue"));
assertTrue("Instance should be the same (as should be non destructive)", binding == bindingAfterChange);
assertEquals(binding.getID(), bindingAfterChange.getID());
//Check that after the config change we can still consume a message that was sent before, ensuring config change was non-destructive of the queue.
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("hello", ((TextMessage)message).getText());
consumer.close();
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setStringProperty("x", "y");
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(2000));
consumer.close();
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployWithFailover() throws Exception {
EmbeddedActiveMQ live = new EmbeddedActiveMQ();

View File

@ -156,8 +156,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception {
return null;
public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
@Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
@Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
@Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception {
return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
}
@Override

View File

@ -88,4 +88,63 @@ public class ConfigChangeTest extends ActiveMQTestBase {
server.stop();
}
@Test
public void testChangeQueueFilterOnRestart() throws Exception {
final String filter1 = "x = 'x'";
final String filter2 = "x = 'y'";
Configuration configuration = createDefaultInVMConfig( );
configuration.addAddressesSetting("#", new AddressSettings());
List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new CoreQueueConfiguration()
.setName("myQueue")
.setAddress("myAddress")
.setFilterString(filter1)
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().setProperty("x", "x").send(context.createQueue("myAddress"), "hello");
}
long originalBindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
server.stop();
addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new CoreQueueConfiguration()
.setName("myQueue")
.setAddress("myAddress")
.setFilterString(filter2)
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server.start();
assertEquals(filter2, server.locateQueue(SimpleString.toSimpleString("myQueue")).getFilter().getFilterString().toString());
//Ensures the queue is not destroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createConsumer(context.createQueue("myAddress::myQueue")).receive();
assertEquals("hello", ((TextMessage) message).getText());
}
long bindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
assertEquals("Ensure the original queue is not destroyed by checking the binding id is the same", originalBindingId, bindingId);
server.stop();
}
}

View File

@ -0,0 +1,42 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<addresses>
<address name="myQueue">
<anycast>
<queue name="myQueue">
<filter string="x = 'y'"/>
</queue>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,42 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<addresses>
<address name="myQueue">
<anycast>
<queue name="myQueue">
<filter string="x = 'x'"/>
</queue>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -426,6 +426,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null;
}
@Override
public void setFilter(Filter filter) {
}
@Override
public long getMessageCount() {
return messageCount;

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -46,6 +47,7 @@ public class FakePostOffice implements PostOffice {
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
Filter filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,