This closes #3213
This commit is contained in:
commit
74698f0bcf
|
@ -538,7 +538,9 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
||||||
|
|
||||||
private SimpleString createMetadata() {
|
private SimpleString createMetadata() {
|
||||||
StringBuilder metadata = new StringBuilder();
|
StringBuilder metadata = new StringBuilder();
|
||||||
metadata.append("user=").append(user).append(";");
|
if (user != null) {
|
||||||
|
metadata.append("user=").append(user).append(";");
|
||||||
|
}
|
||||||
return SimpleString.toSimpleString(metadata.toString());
|
return SimpleString.toSimpleString(metadata.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,6 +101,14 @@ public interface PostOffice extends ActiveMQComponent {
|
||||||
|
|
||||||
QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception;
|
QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param queueConfiguration
|
||||||
|
* @param forceUpdate Setting to <code>true</code> will make <code>null</code> values override current values too
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception;
|
||||||
|
|
||||||
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
|
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,6 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueConfigurationUtils;
|
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||||
|
@ -619,6 +618,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception {
|
public QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception {
|
||||||
|
return updateQueue(queueConfiguration, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(queueConfiguration.getName());
|
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(queueConfiguration.getName());
|
||||||
if (queueBinding == null) {
|
if (queueBinding == null) {
|
||||||
|
@ -649,91 +653,71 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(queueConfiguration.getAddress().toString()));
|
//atomic update
|
||||||
|
if ((forceUpdate || queueConfiguration.getMaxConsumers() != null) && !Objects.equals(queue.getMaxConsumers(), queueConfiguration.getMaxConsumers())) {
|
||||||
// atomic update, reset to defaults if value == null
|
|
||||||
// maxConsumers
|
|
||||||
if (queue.getMaxConsumers() != queueConfiguration.getMaxConsumers()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setMaxConsumer(queueConfiguration.getMaxConsumers());
|
queue.setMaxConsumer(queueConfiguration.getMaxConsumers());
|
||||||
}
|
}
|
||||||
// routingType
|
if ((forceUpdate || queueConfiguration.getRoutingType() != null) && !Objects.equals(queue.getRoutingType(), queueConfiguration.getRoutingType())) {
|
||||||
if (queue.getRoutingType() != queueConfiguration.getRoutingType()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setRoutingType(queueConfiguration.getRoutingType());
|
queue.setRoutingType(queueConfiguration.getRoutingType());
|
||||||
}
|
}
|
||||||
// purgeOnNoConsumers
|
if ((forceUpdate || queueConfiguration.isPurgeOnNoConsumers() != null) && !Objects.equals(queue.isPurgeOnNoConsumers(), queueConfiguration.isPurgeOnNoConsumers())) {
|
||||||
if (queue.isPurgeOnNoConsumers() != queueConfiguration.isPurgeOnNoConsumers()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
|
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
|
||||||
}
|
}
|
||||||
// enabled
|
if ((forceUpdate || queueConfiguration.isEnabled() != null) && !Objects.equals(queue.isEnabled(), queueConfiguration.isEnabled())) {
|
||||||
if (queue.isEnabled() != queueConfiguration.isEnabled()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setEnabled(queueConfiguration.isEnabled());
|
queue.setEnabled(queueConfiguration.isEnabled());
|
||||||
}
|
}
|
||||||
// exclusive
|
if ((forceUpdate || queueConfiguration.isExclusive() != null) && !Objects.equals(queue.isExclusive(), queueConfiguration.isExclusive())) {
|
||||||
if (queue.isExclusive() != queueConfiguration.isExclusive()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setExclusive(queueConfiguration.isExclusive());
|
queue.setExclusive(queueConfiguration.isExclusive());
|
||||||
}
|
}
|
||||||
// groupRebalance
|
if ((forceUpdate || queueConfiguration.isGroupRebalance() != null) && !Objects.equals(queue.isGroupRebalance(), queueConfiguration.isGroupRebalance())) {
|
||||||
if (queue.isGroupRebalance() != queueConfiguration.isGroupRebalance()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
|
queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
|
||||||
}
|
}
|
||||||
// groupBuckets
|
if ((forceUpdate || queueConfiguration.getGroupBuckets() != null) && !Objects.equals(queue.getGroupBuckets(), queueConfiguration.getGroupBuckets())) {
|
||||||
if (queue.getGroupBuckets() != queueConfiguration.getGroupBuckets()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
|
queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
|
||||||
}
|
}
|
||||||
// groupFirstKey
|
if ((forceUpdate || queueConfiguration.getGroupFirstKey() != null) && !Objects.equals(queueConfiguration.getGroupFirstKey(), queue.getGroupFirstKey())) {
|
||||||
// Objects.equals() performs the null check for us
|
|
||||||
if (!Objects.equals(queue.getGroupFirstKey(), queueConfiguration.getGroupFirstKey())) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey());
|
queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey());
|
||||||
}
|
}
|
||||||
// nonDestructive
|
if ((forceUpdate || queueConfiguration.isNonDestructive() != null) && !Objects.equals(queue.isNonDestructive(), queueConfiguration.isNonDestructive())) {
|
||||||
if (queue.isNonDestructive() != queueConfiguration.isNonDestructive()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setNonDestructive(queueConfiguration.isNonDestructive());
|
queue.setNonDestructive(queueConfiguration.isNonDestructive());
|
||||||
}
|
}
|
||||||
// consumersBeforeDispatch
|
if ((forceUpdate || queueConfiguration.getConsumersBeforeDispatch() != null) && !Objects.equals(queueConfiguration.getConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch())) {
|
||||||
if (queue.getConsumersBeforeDispatch() != queueConfiguration.getConsumersBeforeDispatch()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch());
|
queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch());
|
||||||
}
|
}
|
||||||
// delayBeforeDispatch
|
if ((forceUpdate || queueConfiguration.getDelayBeforeDispatch() != null) && !Objects.equals(queueConfiguration.getDelayBeforeDispatch(), queue.getDelayBeforeDispatch())) {
|
||||||
if (queue.getDelayBeforeDispatch() != queueConfiguration.getDelayBeforeDispatch()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch());
|
queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch());
|
||||||
}
|
}
|
||||||
// filter
|
final SimpleString empty = new SimpleString("");
|
||||||
// There's no default ActiveMQDefaultConfiguration setting for a filter
|
Filter oldFilter = FilterImpl.createFilter(queue.getFilter() == null ? empty : queue.getFilter().getFilterString());
|
||||||
final Filter newFilter = FilterImpl.createFilter(queueConfiguration.getFilterString());
|
Filter newFilter = FilterImpl.createFilter(queueConfiguration.getFilterString() == null ? empty : queueConfiguration.getFilterString());
|
||||||
if (!Objects.equals(queue.getFilter(), newFilter)) {
|
if ((forceUpdate || newFilter != oldFilter) && !Objects.equals(oldFilter, newFilter)) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setFilter(newFilter);
|
queue.setFilter(newFilter);
|
||||||
}
|
}
|
||||||
// configurationManaged
|
if ((forceUpdate || queueConfiguration.isConfigurationManaged() != null) && !Objects.equals(queueConfiguration.isConfigurationManaged(), queue.isConfigurationManaged())) {
|
||||||
if (queueConfiguration.isConfigurationManaged() != queue.isConfigurationManaged()) {
|
|
||||||
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
|
|
||||||
changed = true;
|
changed = true;
|
||||||
|
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
|
||||||
}
|
}
|
||||||
if (logger.isDebugEnabled()) {
|
if ((forceUpdate || queueConfiguration.getUser() != null) && !Objects.equals(queueConfiguration.getUser(), queue.getUser())) {
|
||||||
if (queueConfiguration.getUser() == null && queue.getUser() != null) {
|
|
||||||
logger.debug("Ignoring updating Queue to a NULL user");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (queueConfiguration.getUser() != null && !queueConfiguration.getUser().equals(queue.getUser())) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setUser(queueConfiguration.getUser());
|
queue.setUser(queueConfiguration.getUser());
|
||||||
}
|
}
|
||||||
// ringSize
|
if ((forceUpdate || queueConfiguration.getRingSize() != null) && !Objects.equals(queueConfiguration.getRingSize(), queue.getRingSize())) {
|
||||||
if (queue.getRingSize() != queueConfiguration.getRingSize()) {
|
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setRingSize(queueConfiguration.getRingSize());
|
queue.setRingSize(queueConfiguration.getRingSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (changed) {
|
if (changed) {
|
||||||
final long txID = storageManager.generateID();
|
final long txID = storageManager.generateID();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -790,6 +790,15 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
*/
|
*/
|
||||||
Queue updateQueue(QueueConfiguration queueConfiguration) throws Exception;
|
Queue updateQueue(QueueConfiguration queueConfiguration) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param queueConfiguration the {@code QueueConfiguration} to use
|
||||||
|
* @param forceUpdate If <code>true</code>, no <code>null</code> check is performed and unset queueConfiguration values are reset to <code>null</code>
|
||||||
|
* @return the updated {@code Queue} instance
|
||||||
|
* @throws Exception
|
||||||
|
* @see #updateQueue(QueueConfiguration)
|
||||||
|
*/
|
||||||
|
Queue updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
|
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
|
||||||
* replace any factories with the same protocol
|
* replace any factories with the same protocol
|
||||||
|
|
|
@ -48,6 +48,8 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
|
@ -3224,7 +3226,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
// determine if there is an address::queue match; update it if so
|
// determine if there is an address::queue match; update it if so
|
||||||
if (locateQueue(config.getName()) != null && locateQueue(config.getName()).getAddress().equals(config.getAddress())) {
|
if (locateQueue(config.getName()) != null && locateQueue(config.getName()).getAddress().equals(config.getAddress())) {
|
||||||
updateQueue(config.setConfigurationManaged(true));
|
config.setConfigurationManaged(true);
|
||||||
|
setUnsetQueueParamsToDefaults(config);
|
||||||
|
updateQueue(config, true);
|
||||||
} else {
|
} else {
|
||||||
// if the address::queue doesn't exist then create it
|
// if the address::queue doesn't exist then create it
|
||||||
try {
|
try {
|
||||||
|
@ -3770,7 +3774,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Queue updateQueue(QueueConfiguration queueConfiguration) throws Exception {
|
public Queue updateQueue(QueueConfiguration queueConfiguration) throws Exception {
|
||||||
final QueueBinding queueBinding = this.postOffice.updateQueue(queueConfiguration);
|
return updateQueue(queueConfiguration, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception {
|
||||||
|
final QueueBinding queueBinding = this.postOffice.updateQueue(queueConfiguration, forceUpdate);
|
||||||
if (queueBinding != null) {
|
if (queueBinding != null) {
|
||||||
return queueBinding.getQueue();
|
return queueBinding.getQueue();
|
||||||
} else {
|
} else {
|
||||||
|
@ -3983,6 +3992,33 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <T> void setDefaultIfUnset(Supplier<T> getter, Consumer<T> setter, T defaultValue) {
|
||||||
|
if (getter.get() == null) {
|
||||||
|
setter.accept(defaultValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setUnsetQueueParamsToDefaults(QueueConfiguration c) {
|
||||||
|
// Param list taken from PostOfficeImpl::updateQueue
|
||||||
|
setDefaultIfUnset(c::getMaxConsumers, c::setMaxConsumers, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||||
|
setDefaultIfUnset(c::getRoutingType, c::setRoutingType, ActiveMQDefaultConfiguration.getDefaultRoutingType());
|
||||||
|
setDefaultIfUnset(c::isPurgeOnNoConsumers, c::setPurgeOnNoConsumers, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
|
||||||
|
setDefaultIfUnset(c::isEnabled, c::setEnabled, ActiveMQDefaultConfiguration.getDefaultEnabled());
|
||||||
|
setDefaultIfUnset(c::isExclusive, c::setExclusive, ActiveMQDefaultConfiguration.getDefaultExclusive());
|
||||||
|
setDefaultIfUnset(c::isGroupRebalance, c::setGroupRebalance, ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
|
||||||
|
setDefaultIfUnset(c::getGroupBuckets, c::setGroupBuckets, ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
|
||||||
|
setDefaultIfUnset(c::getGroupFirstKey, c::setGroupFirstKey, ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
|
||||||
|
setDefaultIfUnset(c::isNonDestructive, c::setNonDestructive, ActiveMQDefaultConfiguration.getDefaultNonDestructive());
|
||||||
|
setDefaultIfUnset(c::getConsumersBeforeDispatch, c::setConsumersBeforeDispatch, ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
|
||||||
|
setDefaultIfUnset(c::getDelayBeforeDispatch, c::setDelayBeforeDispatch, ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
|
||||||
|
setDefaultIfUnset(c::getFilterString, c::setFilterString, new SimpleString(""));
|
||||||
|
// Defaults to false automatically as per isConfigurationManaged() JavaDoc
|
||||||
|
setDefaultIfUnset(c::isConfigurationManaged, c::setConfigurationManaged, false);
|
||||||
|
// Setting to null might have side effects
|
||||||
|
setDefaultIfUnset(c::getUser, c::setUser, null);
|
||||||
|
setDefaultIfUnset(c::getRingSize, c::setRingSize, ActiveMQDefaultConfiguration.getDefaultRingSize());
|
||||||
|
}
|
||||||
|
|
||||||
private void deployReloadableConfigFromConfiguration() throws Exception {
|
private void deployReloadableConfigFromConfiguration() throws Exception {
|
||||||
if (configurationReloadDeployed.compareAndSet(false, true)) {
|
if (configurationReloadDeployed.compareAndSet(false, true)) {
|
||||||
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");
|
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");
|
||||||
|
|
|
@ -39,6 +39,7 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
|
@ -296,7 +297,7 @@ public class RedeployTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||||
Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
|
||||||
final ReusableLatch latch = new ReusableLatch(1);
|
final ReusableLatch latch = new ReusableLatch(1);
|
||||||
Runnable tick = latch::countDown;
|
Runnable tick = latch::countDown;
|
||||||
server.getActiveMQServer().getReloadManager().setTick(tick);
|
server.getActiveMQServer().getReloadManager().setTick(tick);
|
||||||
|
@ -397,6 +398,40 @@ public class RedeployTest extends ActiveMQTestBase {
|
||||||
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
|
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This one is here just to make sure it's possible to change queue parameters one by one without setting the others
|
||||||
|
* to <code>null</code>.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testQueuePartialReconfiguration() throws Exception {
|
||||||
|
|
||||||
|
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||||
|
URL url = RedeployTest.class.getClassLoader().getResource("reload-empty.xml");
|
||||||
|
Files.copy(url.openStream(), brokerXML);
|
||||||
|
|
||||||
|
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||||
|
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||||
|
embeddedActiveMQ.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
embeddedActiveMQ.getActiveMQServer().createQueue(new QueueConfiguration("virtualQueue").setUser("bob"));
|
||||||
|
embeddedActiveMQ.getActiveMQServer().updateQueue(new QueueConfiguration("virtualQueue").setFilterString("foo"));
|
||||||
|
|
||||||
|
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
|
||||||
|
.getBinding(new SimpleString("virtualQueue"));
|
||||||
|
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
|
||||||
|
|
||||||
|
assertEquals(new SimpleString("bob"), queue.getUser());
|
||||||
|
assertEquals(new SimpleString("foo"), queue.getFilter().getFilterString());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
embeddedActiveMQ.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRedeployQueueDefaults() throws Exception {
|
public void testRedeployQueueDefaults() throws Exception {
|
||||||
|
|
||||||
|
@ -413,37 +448,37 @@ public class RedeployTest extends ActiveMQTestBase {
|
||||||
.getBinding(new SimpleString("myQueue"));
|
.getBinding(new SimpleString("myQueue"));
|
||||||
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
|
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
|
||||||
|
|
||||||
assertNotEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
|
||||||
assertNotEquals(queue.getRoutingType(), RoutingType.MULTICAST);
|
assertNotEquals(RoutingType.MULTICAST, queue.getRoutingType());
|
||||||
assertNotEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queue.isPurgeOnNoConsumers());
|
||||||
assertNotEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultEnabled(), queue.isEnabled());
|
||||||
assertNotEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultExclusive(), queue.isExclusive());
|
||||||
assertNotEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), queue.isGroupRebalance());
|
||||||
assertNotEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), queue.getGroupBuckets());
|
||||||
assertNotEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(), queue.getGroupFirstKey());
|
||||||
assertNotEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultNonDestructive(), queue.isNonDestructive());
|
||||||
assertNotEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch());
|
||||||
assertNotEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), queue.getDelayBeforeDispatch());
|
||||||
assertNotEquals(queue.getFilter(), null);
|
assertNotNull(queue.getFilter());
|
||||||
assertNotEquals(queue.getUser(), "jdoe");
|
assertEquals(new SimpleString("jdoe"), queue.getUser());
|
||||||
assertNotEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
|
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
|
||||||
|
|
||||||
deployBrokerConfig(embeddedActiveMQ, newConfig);
|
deployBrokerConfig(embeddedActiveMQ, newConfig);
|
||||||
|
|
||||||
assertEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
|
||||||
assertEquals(queue.getRoutingType(), RoutingType.MULTICAST);
|
assertEquals(RoutingType.MULTICAST, queue.getRoutingType());
|
||||||
assertEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queue.isPurgeOnNoConsumers());
|
||||||
assertEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultEnabled(), queue.isEnabled());
|
||||||
assertEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultExclusive(), queue.isExclusive());
|
||||||
assertEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), queue.isGroupRebalance());
|
||||||
assertEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), queue.getGroupBuckets());
|
||||||
assertEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(), queue.getGroupFirstKey());
|
||||||
assertEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultNonDestructive(), queue.isNonDestructive());
|
||||||
assertEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch());
|
||||||
assertEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), queue.getDelayBeforeDispatch());
|
||||||
assertEquals(queue.getFilter(), null);
|
assertNull(queue.getFilter());
|
||||||
assertEquals(queue.getUser(), null);
|
assertNull(queue.getUser());
|
||||||
assertEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
|
assertEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
embeddedActiveMQ.stop();
|
embeddedActiveMQ.stop();
|
||||||
|
|
|
@ -514,8 +514,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
producer.send(m);
|
producer.send(m);
|
||||||
|
|
||||||
assertNotNull(consumer1.receiveImmediate());
|
assertNotNull(consumer1.receive(1000));
|
||||||
assertNotNull(consumer2.receiveImmediate());
|
assertNotNull(consumer2.receive(1000));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
<?xml version='1.0'?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
or more contributor license agreements. See the NOTICE file
|
||||||
|
distributed with this work for additional information
|
||||||
|
regarding copyright ownership. The ASF licenses this file
|
||||||
|
to you under the Apache License, Version 2.0 (the
|
||||||
|
"License"); you may not use this file except in compliance
|
||||||
|
with the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing,
|
||||||
|
software distributed under the License is distributed on an
|
||||||
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
KIND, either express or implied. See the License for the
|
||||||
|
specific language governing permissions and limitations
|
||||||
|
under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration xmlns="urn:activemq"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||||
|
|
||||||
|
<core xmlns="urn:activemq:core">
|
||||||
|
|
||||||
|
<name>0.0.0.0</name>
|
||||||
|
|
||||||
|
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||||
|
|
||||||
|
<persistence-enabled>false</persistence-enabled>
|
||||||
|
|
||||||
|
<security-enabled>false</security-enabled>
|
||||||
|
|
||||||
|
<!-- this could be ASYNCIO or NIO
|
||||||
|
-->
|
||||||
|
<journal-type>NIO</journal-type>
|
||||||
|
|
||||||
|
<paging-directory>./data/paging</paging-directory>
|
||||||
|
|
||||||
|
<bindings-directory>./data/bindings</bindings-directory>
|
||||||
|
|
||||||
|
<journal-directory>./data/journal</journal-directory>
|
||||||
|
|
||||||
|
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||||
|
|
||||||
|
<journal-min-files>2</journal-min-files>
|
||||||
|
|
||||||
|
<journal-pool-files>-1</journal-pool-files>
|
||||||
|
|
||||||
|
<!--
|
||||||
|
This value was determined through a calculation.
|
||||||
|
Your system could perform 25 writes per millisecond
|
||||||
|
on the current journal configuration.
|
||||||
|
That translates as a sync write every 40000 nanoseconds
|
||||||
|
-->
|
||||||
|
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||||
|
|
||||||
|
|
||||||
|
<acceptors>
|
||||||
|
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||||
|
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||||
|
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||||
|
|
||||||
|
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||||
|
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||||
|
|
||||||
|
<!-- STOMP Acceptor. -->
|
||||||
|
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||||
|
|
||||||
|
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||||
|
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||||
|
|
||||||
|
<!-- MQTT Acceptor -->
|
||||||
|
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||||
|
|
||||||
|
</acceptors>
|
||||||
|
|
||||||
|
</core>
|
||||||
|
</configuration>
|
|
@ -84,6 +84,11 @@ public class FakePostOffice implements PostOffice {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception {
|
public QueueBinding updateQueue(QueueConfiguration queueConfiguration) throws Exception {
|
||||||
|
return updateQueue(queueConfiguration, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean forceUpdate) throws Exception {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue