ARTEMIS-2797 Fixing redeploy mechanism

Queue settings are reset to their default values upon broker.xml reload ONLY.
Regular calls to PostOfficeImpl#updateQueue are no longer affected.
This commit is contained in:
Jan Šmucr 2020-07-02 15:34:58 +02:00 committed by Clebert Suconic
parent 3b8ab97629
commit 6cc370e169
8 changed files with 233 additions and 74 deletions

View File

@ -538,7 +538,9 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
private SimpleString createMetadata() {
StringBuilder metadata = new StringBuilder();
metadata.append("user=").append(user).append(";");
if (user != null) {
metadata.append("user=").append(user).append(";");
}
return SimpleString.toSimpleString(metadata.toString());
}
}

View File

@ -101,6 +101,14 @@ public interface PostOffice extends ActiveMQComponent {
QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception;
/**
* @param queueConfiguration
* @param forceUpdate Setting to <code>true</code> will make <code>null</code> values override current values too
* @return
* @throws Exception
*/
QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception;
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;

View File

@ -63,7 +63,6 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueConfigurationUtils;
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@ -619,6 +618,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception {
return updateQueue(queueConfiguration, false);
}
@Override
public QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception {
synchronized (this) {
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(queueConfiguration.getName());
if (queueBinding == null) {
@ -649,91 +653,71 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(queueConfiguration.getAddress().toString()));
// atomic update, reset to defaults if value == null
// maxConsumers
if (queue.getMaxConsumers() != queueConfiguration.getMaxConsumers()) {
//atomic update
if ((forceUpdate || queueConfiguration.getMaxConsumers() != null) && !Objects.equals(queue.getMaxConsumers(), queueConfiguration.getMaxConsumers())) {
changed = true;
queue.setMaxConsumer(queueConfiguration.getMaxConsumers());
}
// routingType
if (queue.getRoutingType() != queueConfiguration.getRoutingType()) {
if ((forceUpdate || queueConfiguration.getRoutingType() != null) && !Objects.equals(queue.getRoutingType(), queueConfiguration.getRoutingType())) {
changed = true;
queue.setRoutingType(queueConfiguration.getRoutingType());
}
// purgeOnNoConsumers
if (queue.isPurgeOnNoConsumers() != queueConfiguration.isPurgeOnNoConsumers()) {
if ((forceUpdate || queueConfiguration.isPurgeOnNoConsumers() != null) && !Objects.equals(queue.isPurgeOnNoConsumers(), queueConfiguration.isPurgeOnNoConsumers())) {
changed = true;
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
}
// enabled
if (queue.isEnabled() != queueConfiguration.isEnabled()) {
if ((forceUpdate || queueConfiguration.isEnabled() != null) && !Objects.equals(queue.isEnabled(), queueConfiguration.isEnabled())) {
changed = true;
queue.setEnabled(queueConfiguration.isEnabled());
}
// exclusive
if (queue.isExclusive() != queueConfiguration.isExclusive()) {
if ((forceUpdate || queueConfiguration.isExclusive() != null) && !Objects.equals(queue.isExclusive(), queueConfiguration.isExclusive())) {
changed = true;
queue.setExclusive(queueConfiguration.isExclusive());
}
// groupRebalance
if (queue.isGroupRebalance() != queueConfiguration.isGroupRebalance()) {
if ((forceUpdate || queueConfiguration.isGroupRebalance() != null) && !Objects.equals(queue.isGroupRebalance(), queueConfiguration.isGroupRebalance())) {
changed = true;
queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
}
// groupBuckets
if (queue.getGroupBuckets() != queueConfiguration.getGroupBuckets()) {
if ((forceUpdate || queueConfiguration.getGroupBuckets() != null) && !Objects.equals(queue.getGroupBuckets(), queueConfiguration.getGroupBuckets())) {
changed = true;
queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
}
// groupFirstKey
// Objects.equals() performs the null check for us
if (!Objects.equals(queue.getGroupFirstKey(), queueConfiguration.getGroupFirstKey())) {
if ((forceUpdate || queueConfiguration.getGroupFirstKey() != null) && !Objects.equals(queueConfiguration.getGroupFirstKey(), queue.getGroupFirstKey())) {
changed = true;
queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey());
}
// nonDestructive
if (queue.isNonDestructive() != queueConfiguration.isNonDestructive()) {
if ((forceUpdate || queueConfiguration.isNonDestructive() != null) && !Objects.equals(queue.isNonDestructive(), queueConfiguration.isNonDestructive())) {
changed = true;
queue.setNonDestructive(queueConfiguration.isNonDestructive());
}
// consumersBeforeDispatch
if (queue.getConsumersBeforeDispatch() != queueConfiguration.getConsumersBeforeDispatch()) {
if ((forceUpdate || queueConfiguration.getConsumersBeforeDispatch() != null) && !Objects.equals(queueConfiguration.getConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch())) {
changed = true;
queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch());
}
// delayBeforeDispatch
if (queue.getDelayBeforeDispatch() != queueConfiguration.getDelayBeforeDispatch()) {
if ((forceUpdate || queueConfiguration.getDelayBeforeDispatch() != null) && !Objects.equals(queueConfiguration.getDelayBeforeDispatch(), queue.getDelayBeforeDispatch())) {
changed = true;
queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch());
}
// filter
// There's no default ActiveMQDefaultConfiguration setting for a filter
final Filter newFilter = FilterImpl.createFilter(queueConfiguration.getFilterString());
if (!Objects.equals(queue.getFilter(), newFilter)) {
final SimpleString empty = new SimpleString("");
Filter oldFilter = FilterImpl.createFilter(queue.getFilter() == null ? empty : queue.getFilter().getFilterString());
Filter newFilter = FilterImpl.createFilter(queueConfiguration.getFilterString() == null ? empty : queueConfiguration.getFilterString());
if ((forceUpdate || newFilter != null) && !Objects.equals(oldFilter, newFilter)) {
changed = true;
queue.setFilter(newFilter);
}
// configurationManaged
if (queueConfiguration.isConfigurationManaged() != queue.isConfigurationManaged()) {
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
if ((forceUpdate || queueConfiguration.isConfigurationManaged() != null) && !Objects.equals(queueConfiguration.isConfigurationManaged(), queue.isConfigurationManaged())) {
changed = true;
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
}
if (logger.isDebugEnabled()) {
if (queueConfiguration.getUser() == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user");
}
}
if (queueConfiguration.getUser() != null && !queueConfiguration.getUser().equals(queue.getUser())) {
if ((forceUpdate || queueConfiguration.getUser() != null) && !Objects.equals(queueConfiguration.getUser(), queue.getUser())) {
changed = true;
queue.setUser(queueConfiguration.getUser());
}
// ringSize
if (queue.getRingSize() != queueConfiguration.getRingSize()) {
if ((forceUpdate || queueConfiguration.getRingSize() != null) && !Objects.equals(queueConfiguration.getRingSize(), queue.getRingSize())) {
changed = true;
queue.setRingSize(queueConfiguration.getRingSize());
}
if (changed) {
final long txID = storageManager.generateID();
try {

View File

@ -790,6 +790,15 @@ public interface ActiveMQServer extends ServiceComponent {
*/
Queue updateQueue(QueueConfiguration queueConfiguration) throws Exception;
/**
* @param queueConfiguration the {@code QueueConfiguration} to use
* @param forceUpdate If <code>true</code>, no <code>null</code> check is performed and unset queueConfiguration values are reset to <code>null</code>
* @return the updated {@code Queue} instance
* @throws Exception
* @see #updateQueue(QueueConfiguration)
*/
Queue updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception;
/*
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
* replace any factories with the same protocol

View File

@ -48,6 +48,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -3224,7 +3226,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// determine if there is an address::queue match; update it if so
if (locateQueue(config.getName()) != null && locateQueue(config.getName()).getAddress().equals(config.getAddress())) {
updateQueue(config.setConfigurationManaged(true));
config.setConfigurationManaged(true);
setUnsetQueueParamsToDefaults(config);
updateQueue(config, true);
} else {
// if the address::queue doesn't exist then create it
try {
@ -3770,7 +3774,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public Queue updateQueue(QueueConfiguration queueConfiguration) throws Exception {
final QueueBinding queueBinding = this.postOffice.updateQueue(queueConfiguration);
return updateQueue(queueConfiguration, false);
}
@Override
public Queue updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception {
final QueueBinding queueBinding = this.postOffice.updateQueue(queueConfiguration, forceUpdate);
if (queueBinding != null) {
return queueBinding.getQueue();
} else {
@ -3983,6 +3992,33 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
private static <T> void setDefaultIfUnset(Supplier<T> getter, Consumer<T> setter, T defaultValue) {
if (getter.get() == null) {
setter.accept(defaultValue);
}
}
private static void setUnsetQueueParamsToDefaults(QueueConfiguration c) {
// Param list taken from PostOfficeImpl::updateQueue
setDefaultIfUnset(c::getMaxConsumers, c::setMaxConsumers, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
setDefaultIfUnset(c::getRoutingType, c::setRoutingType, ActiveMQDefaultConfiguration.getDefaultRoutingType());
setDefaultIfUnset(c::isPurgeOnNoConsumers, c::setPurgeOnNoConsumers, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
setDefaultIfUnset(c::isEnabled, c::setEnabled, ActiveMQDefaultConfiguration.getDefaultEnabled());
setDefaultIfUnset(c::isExclusive, c::setExclusive, ActiveMQDefaultConfiguration.getDefaultExclusive());
setDefaultIfUnset(c::isGroupRebalance, c::setGroupRebalance, ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
setDefaultIfUnset(c::getGroupBuckets, c::setGroupBuckets, ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
setDefaultIfUnset(c::getGroupFirstKey, c::setGroupFirstKey, ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
setDefaultIfUnset(c::isNonDestructive, c::setNonDestructive, ActiveMQDefaultConfiguration.getDefaultNonDestructive());
setDefaultIfUnset(c::getConsumersBeforeDispatch, c::setConsumersBeforeDispatch, ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
setDefaultIfUnset(c::getDelayBeforeDispatch, c::setDelayBeforeDispatch, ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
setDefaultIfUnset(c::getFilterString, c::setFilterString, new SimpleString(""));
// Defaults to false automatically as per isConfigurationManaged() JavaDoc
setDefaultIfUnset(c::isConfigurationManaged, c::setConfigurationManaged, false);
// Setting to null might have side effects
setDefaultIfUnset(c::getUser, c::setUser, null);
setDefaultIfUnset(c::getRingSize, c::setRingSize, ActiveMQDefaultConfiguration.getDefaultRingSize());
}
private void deployReloadableConfigFromConfiguration() throws Exception {
if (configurationReloadDeployed.compareAndSet(false, true)) {
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");

View File

@ -39,6 +39,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -296,7 +297,7 @@ public class RedeployTest extends ActiveMQTestBase {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
server.getActiveMQServer().getReloadManager().setTick(tick);
@ -397,6 +398,40 @@ public class RedeployTest extends ActiveMQTestBase {
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
}
/**
* This one is here just to make sure it's possible to change queue parameters one by one without setting the others
* to <code>null</code>.
* @throws Exception
*/
@Test
public void testQueuePartialReconfiguration() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url = RedeployTest.class.getClassLoader().getResource("reload-empty.xml");
Files.copy(url.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
try {
embeddedActiveMQ.getActiveMQServer().createQueue(new QueueConfiguration("virtualQueue").setUser("bob"));
embeddedActiveMQ.getActiveMQServer().updateQueue(new QueueConfiguration("virtualQueue").setFilterString("foo"));
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("virtualQueue"));
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
assertEquals(new SimpleString("bob"), queue.getUser());
assertEquals(new SimpleString("foo"), queue.getFilter().getFilterString());
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployQueueDefaults() throws Exception {
@ -413,37 +448,37 @@ public class RedeployTest extends ActiveMQTestBase {
.getBinding(new SimpleString("myQueue"));
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
assertNotEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
assertNotEquals(queue.getRoutingType(), RoutingType.MULTICAST);
assertNotEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
assertNotEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
assertNotEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
assertNotEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
assertNotEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
assertNotEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
assertNotEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
assertNotEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
assertNotEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
assertNotEquals(queue.getFilter(), null);
assertNotEquals(queue.getUser(), "jdoe");
assertNotEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
assertNotEquals(RoutingType.MULTICAST, queue.getRoutingType());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queue.isPurgeOnNoConsumers());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultEnabled(), queue.isEnabled());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultExclusive(), queue.isExclusive());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), queue.isGroupRebalance());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), queue.getGroupBuckets());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(), queue.getGroupFirstKey());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultNonDestructive(), queue.isNonDestructive());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), queue.getDelayBeforeDispatch());
assertNotNull(queue.getFilter());
assertEquals(new SimpleString("jdoe"), queue.getUser());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
deployBrokerConfig(embeddedActiveMQ, newConfig);
assertEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
assertEquals(queue.getRoutingType(), RoutingType.MULTICAST);
assertEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
assertEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
assertEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
assertEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
assertEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
assertEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
assertEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
assertEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
assertEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
assertEquals(queue.getFilter(), null);
assertEquals(queue.getUser(), null);
assertEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
assertEquals(RoutingType.MULTICAST, queue.getRoutingType());
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queue.isPurgeOnNoConsumers());
assertEquals(ActiveMQDefaultConfiguration.getDefaultEnabled(), queue.isEnabled());
assertEquals(ActiveMQDefaultConfiguration.getDefaultExclusive(), queue.isExclusive());
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), queue.isGroupRebalance());
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), queue.getGroupBuckets());
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(), queue.getGroupFirstKey());
assertEquals(ActiveMQDefaultConfiguration.getDefaultNonDestructive(), queue.isNonDestructive());
assertEquals(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch());
assertEquals(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), queue.getDelayBeforeDispatch());
assertNull(queue.getFilter());
assertNull(queue.getUser());
assertEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
} finally {
embeddedActiveMQ.stop();

View File

@ -0,0 +1,80 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
</core>
</configuration>

View File

@ -84,6 +84,11 @@ public class FakePostOffice implements PostOffice {
@Override
public QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception {
return updateQueue(queueConfiguration, false);
}
@Override
public QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception {
return null;
}