ARTEMIS-5302 use QueueConfiguration more

There are several places across the code-base that repeat all the
configuration parameters of a queue. This commit simplifies the code
and increases readability by using o.a.a.a.a.c.QueueConfiguration as
often as possible.
This commit is contained in:
Justin Bertram 2025-02-11 19:34:47 -06:00 committed by clebertsuconic
parent 6f15e33642
commit 00f69a18d8
15 changed files with 650 additions and 1225 deletions

View File

@ -257,7 +257,7 @@ public class PrintData extends DBOption {
Set<Long> existingQueues = new HashSet<>();
if (bindingsDescribe != null && bindingsDescribe.getBindingEncodings() != null) {
bindingsDescribe.getBindingEncodings().forEach(e -> existingQueues.add(e.getId()));
bindingsDescribe.getBindingEncodings().forEach(e -> existingQueues.add(e.getQueueConfiguration().getId()));
}
Set<Long> pgTXs = cursorACKs.getPgTXs();

View File

@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.DBOption;
@ -67,8 +67,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Option;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter extends DBOption {
@ -325,7 +325,7 @@ public final class XmlDataExporter extends DBOption {
for (RecordInfo info : records) {
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
queueBindings.put(bindingEncoding.getId(), bindingEncoding);
queueBindings.put(bindingEncoding.getQueueConfiguration().getId(), bindingEncoding);
} else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
addressBindings.put(bindingEncoding.getId(), bindingEncoding);
@ -368,17 +368,13 @@ public final class XmlDataExporter extends DBOption {
bindingsPrinted++;
}
for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
QueueConfiguration queueConfig = queueBindings.get(queueBindingEncodingEntry.getKey()).getQueueConfiguration();
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD);
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString());
String filter = "";
if (bindingEncoding.getFilterString() != null) {
filter = bindingEncoding.getFilterString().toString();
}
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter);
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString());
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId()));
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString());
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, queueConfig.getAddress().toString());
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, queueConfig.getFilterString() == null ? "" : queueConfig.getFilterString().toString());
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, queueConfig.getName().toString());
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(queueConfig.getId()));
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, queueConfig.getRoutingType().toString());
bindingsPrinted++;
}
xmlWriter.writeEndElement(); // end BINDINGS_PARENT
@ -466,7 +462,7 @@ public final class XmlDataExporter extends DBOption {
if (!acked) {
PersistentQueueBindingEncoding queueBinding = queueBindings.get(queueID);
if (queueBinding != null) {
SimpleString queueName = queueBinding.getQueueName();
SimpleString queueName = queueBinding.getQueueConfiguration().getName();
queueNames.add(queueName.toString());
}
}
@ -496,17 +492,15 @@ public final class XmlDataExporter extends DBOption {
for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
String queueName;
PersistentQueueBindingEncoding persistentQueueBindingEncoding = queueBindings.get(ref.refEncoding.queueID);
long id = ref.refEncoding.queueID;
PersistentQueueBindingEncoding persistentQueueBindingEncoding = queueBindings.get(id);
if (persistentQueueBindingEncoding == null) {
PersistentQueueBindingEncoding undefinedQueue = new PersistentQueueBindingEncoding();
undefinedQueue.setId(ref.refEncoding.queueID);
undefinedQueue.replaceQueueName(SimpleString.of(undefinedPrefix + ref.refEncoding.queueID));
undefinedQueue.replaceAddress(undefinedQueue.getQueueName());
queueBindings.put(undefinedQueue.getId(), undefinedQueue);
queueName = String.valueOf(undefinedQueue.getQueueName());
getActionContext().err.println("Queue ID " + ref.refEncoding.queueID + " not defined. Exporting it as " + undefinedQueue.getQueueName());
String name = undefinedPrefix + id;
queueBindings.put(id, new PersistentQueueBindingEncoding(QueueConfiguration.of(name).setAddress(name)));
queueName = String.valueOf(name);
getActionContext().err.println("Queue ID " + id + " not defined. Exporting it as " + name);
} else {
queueName = String.valueOf(persistentQueueBindingEncoding.getQueueName());
queueName = String.valueOf(persistentQueueBindingEncoding.getQueueConfiguration().getName());
}
queues.add(queueName);

View File

@ -140,11 +140,6 @@ public class QueueConfiguration implements Serializable {
return new QueueConfiguration(queueConfiguration);
}
/**
* @deprecated
* Use {@link #of(String)} instead.
*/
@Deprecated(forRemoval = true)
public QueueConfiguration() {
}
@ -416,12 +411,16 @@ public class QueueConfiguration implements Serializable {
}
public QueueConfiguration setFilterString(SimpleString filterString) {
this.filterString = filterString;
if (filterString != null && !filterString.isEmpty() && !filterString.isBlank()) {
this.filterString = filterString;
} else if (filterString == null) {
this.filterString = filterString;
}
return this;
}
public QueueConfiguration setFilterString(String filterString) {
return setFilterString(filterString == null ? null : SimpleString.of(filterString));
return setFilterString(SimpleString.of(filterString));
}
/**

View File

@ -195,6 +195,17 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
data[1] = high;
}
public boolean isBlank() {
boolean result = true;
for (int i = 0; i < length(); i++) {
if (!Character.isWhitespace(charAt(i))) {
result = false;
break;
}
}
return result;
}
@Override
public boolean isEmpty() {
return data.length == 0;

View File

@ -21,7 +21,9 @@ import io.netty.buffer.ByteBufAllocator;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SimpleStringTest {
@ -38,4 +40,23 @@ public class SimpleStringTest {
}
assertInstanceOf(IndexOutOfBoundsException.class, e);
}
@Test
public void testBlank() {
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of(" ".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\t".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\n".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\r".repeat(i)).isBlank());
}
for (int i = 1; i <= 10; i++) {
assertFalse(SimpleString.of("x".repeat(i)).isBlank());
}
}
}

View File

@ -16,160 +16,41 @@
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
public class QueueQueryResult {
private SimpleString name;
private QueueConfiguration config;
private boolean exists;
private boolean durable;
private int consumerCount;
private long messageCount;
private SimpleString filterString;
private SimpleString address;
private boolean temporary;
private boolean autoCreateQueues;
private boolean autoCreated;
private boolean purgeOnNoConsumers;
private RoutingType routingType;
private int maxConsumers;
private Boolean exclusive;
private Boolean groupRebalance;
private Boolean groupRebalancePauseDispatch;
private Integer groupBuckets;
private SimpleString groupFirstKey;
private Boolean lastValue;
private SimpleString lastValueKey;
private Boolean nonDestructive;
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
private Integer defaultConsumerWindowSize;
private Long ringSize;
private Boolean enabled;
private Boolean configurationManaged;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
public QueueQueryResult(final QueueConfiguration config,
final int consumerCount,
final long messageCount,
final boolean autoCreateQueues,
final boolean exists,
final boolean autoCreated,
final boolean purgeOnNoConsumers,
final RoutingType routingType,
final int maxConsumers,
final Boolean exclusive,
final Boolean groupRebalance,
final Boolean groupRebalancePauseDispatch,
final Integer groupBuckets,
final SimpleString groupFirstKey,
final Boolean lastValue,
final SimpleString lastValueKey,
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
final Boolean enabled,
final Boolean configurationManaged) {
this.durable = durable;
this.temporary = temporary;
final Integer defaultConsumerWindowSize) {
this.config = config;
this.consumerCount = consumerCount;
this.messageCount = messageCount;
this.filterString = filterString;
this.address = address;
this.name = name;
this.autoCreateQueues = autoCreateQueues;
this.exists = exists;
this.autoCreated = autoCreated;
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.routingType = routingType;
this.maxConsumers = maxConsumers;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
this.lastValue = lastValue;
this.lastValueKey = lastValueKey;
this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
this.enabled = enabled;
this.configurationManaged = configurationManaged;
}
public boolean isExists() {
@ -177,7 +58,7 @@ public class QueueQueryResult {
}
public boolean isDurable() {
return durable;
return config.isDurable();
}
public int getConsumerCount() {
@ -189,19 +70,19 @@ public class QueueQueryResult {
}
public SimpleString getFilterString() {
return filterString;
return config.getFilterString();
}
public SimpleString getAddress() {
return address;
return config.getAddress();
}
public SimpleString getName() {
return name;
return config.getName();
}
public boolean isTemporary() {
return temporary;
return config.isTemporary();
}
public boolean isAutoCreateQueues() {
@ -209,47 +90,47 @@ public class QueueQueryResult {
}
public boolean isAutoCreated() {
return autoCreated;
return config.isAutoCreated();
}
public boolean isPurgeOnNoConsumers() {
return purgeOnNoConsumers;
return config.isPurgeOnNoConsumers();
}
public RoutingType getRoutingType() {
return routingType;
return config.getRoutingType();
}
public int getMaxConsumers() {
return maxConsumers;
return config.getMaxConsumers();
}
public void setAddress(SimpleString address) {
this.address = address;
config.setAddress(address);
}
public Boolean isExclusive() {
return exclusive;
return config.isExclusive();
}
public Boolean isLastValue() {
return lastValue;
return config.isLastValue();
}
public SimpleString getLastValueKey() {
return lastValueKey;
return config.getLastValueKey();
}
public Boolean isNonDestructive() {
return nonDestructive;
return config.isNonDestructive();
}
public Integer getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
return config.getConsumersBeforeDispatch();
}
public Long getDelayBeforeDispatch() {
return delayBeforeDispatch;
return config.getDelayBeforeDispatch();
}
public Integer getDefaultConsumerWindowSize() {
@ -257,42 +138,42 @@ public class QueueQueryResult {
}
public Boolean isGroupRebalance() {
return groupRebalance;
return config.isGroupRebalance();
}
public Boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
return config.isGroupRebalancePauseDispatch();
}
public Integer getGroupBuckets() {
return groupBuckets;
return config.getGroupBuckets();
}
public SimpleString getGroupFirstKey() {
return groupFirstKey;
return config.getGroupFirstKey();
}
public Boolean isAutoDelete() {
return autoDelete;
return config.isAutoDelete();
}
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
return config.getAutoDeleteDelay();
}
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
return config.getAutoDeleteMessageCount();
}
public Long getRingSize() {
return ringSize;
return config.getRingSize();
}
public Boolean isEnabled() {
return enabled;
return config.isEnabled();
}
public Boolean isConfigurationManaged() {
return configurationManaged;
return config.isConfigurationManaged();
}
}

View File

@ -18,93 +18,14 @@ package org.apache.activemq.artemis.core.persistence;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
public interface QueueBindingInfo {
long getId();
SimpleString getAddress();
SimpleString getQueueName();
/**
* used to rename the queue in case of a duplication during load time
*
* @param newName
*/
void replaceQueueName(SimpleString newName);
SimpleString getFilterString();
boolean isAutoCreated();
boolean isConfigurationManaged();
void setConfigurationManaged(boolean configurationManaged);
SimpleString getUser();
QueueConfiguration getQueueConfiguration();
void addQueueStatusEncoding(QueueStatusEncoding status);
List<QueueStatusEncoding> getQueueStatusEncodings();
int getMaxConsumers();
void setMaxConsumers(int maxConsumers);
boolean isPurgeOnNoConsumers();
void setPurgeOnNoConsumers(boolean purgeOnNoConsumers);
boolean isEnabled();
void setEnabled(boolean enabled);
boolean isExclusive();
void setExclusive(boolean exclusive);
boolean isLastValue();
void setLastValue(boolean lastValue);
SimpleString getLastValueKey();
void setLastValueKey(SimpleString lastValue);
boolean isNonDestructive();
void setNonDestructive(boolean nonDestructive);
int getConsumersBeforeDispatch();
void setConsumersBeforeDispatch(int consumersBeforeDispatch);
long getDelayBeforeDispatch();
void setDelayBeforeDispatch(long delayBeforeDispatch);
byte getRoutingType();
void setRoutingType(byte routingType);
boolean isGroupRebalance();
boolean isGroupRebalancePauseDispatch();
int getGroupBuckets();
SimpleString getGroupFirstKey();
boolean isAutoDelete();
long getAutoDeleteDelay();
long getAutoDeleteMessageCount();
long getRingSize();
boolean isInternal();
}

View File

@ -16,14 +16,8 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.HashMap;
@ -40,8 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import javax.transaction.xa.Xid;
import java.util.function.Consumer;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -60,20 +53,20 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.cursor.QueryPagedReferenceImpl;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.QueryPagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
@ -134,8 +127,14 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.CriticalMeasure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
/**
* Controls access to the journals and other storage files such as the ones used to store pages and
@ -1052,7 +1051,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final MutableLong recordNumber = new MutableLong();
final CoreMessageObjectPools pools;
if (totalSize > 0) {
final int addresses = (int) Math.max(DEFAULT_POOL_CAPACITY, queueInfos == null ? 0 : queueInfos.values().stream().map(QueueBindingInfo::getAddress).filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH).count() * 2);
final int addresses = (int) Math.max(DEFAULT_POOL_CAPACITY, queueInfos == null ? 0 : queueInfos.values().stream().map(qInfo -> qInfo.getQueueConfiguration().getAddress()).filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH).count() * 2);
pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
} else {
pools = null;
@ -1470,7 +1469,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
QueueBindingInfo queueInfo = queueInfos.get(queueID);
if (queueInfo != null) {
SimpleString address = queueInfo.getAddress();
SimpleString address = queueInfo.getQueueConfiguration().getAddress();
PagingStore store = pagingManager.getPageStore(address);
if (store == null) {
return null;
@ -1518,7 +1517,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize(), queue.isInternalQueue());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getQueueConfiguration());
try (ArtemisCloseable lock = closeableReadLock()) {
if (update) {
@ -1676,7 +1675,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
mapBindings.put(bindingEncoding.getQueueConfiguration().getId(), bindingEncoding);
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
@ -2272,7 +2271,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
bindingEncoding.decode(buffer);
bindingEncoding.setId(id);
bindingEncoding.getQueueConfiguration().setId(id);
return bindingEncoding;
}

View File

@ -19,205 +19,37 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.server.impl.QueueConfigurationUtils;
import org.apache.activemq.artemis.utils.DataConstants;
public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {
private long id;
private SimpleString name;
private SimpleString address;
private SimpleString filterString;
private boolean autoCreated;
private SimpleString user;
private QueueConfiguration config;
private List<QueueStatusEncoding> queueStatusEncodings;
private int maxConsumers;
private boolean purgeOnNoConsumers;
private boolean enabled;
private boolean exclusive;
private boolean lastValue;
private SimpleString lastValueKey;
private boolean nonDestructive;
private int consumersBeforeDispatch;
private long delayBeforeDispatch;
private byte routingType;
private boolean configurationManaged;
private boolean groupRebalance;
private boolean groupRebalancePauseDispatch;
private int groupBuckets;
private SimpleString groupFirstKey;
private boolean autoDelete;
private long autoDeleteDelay;
private long autoDeleteMessageCount;
private long ringSize;
private boolean internal;
public PersistentQueueBindingEncoding() {
config = new QueueConfiguration();
}
@Override
public String toString() {
return "PersistentQueueBindingEncoding [id=" + id +
", name=" + name +
", address=" + address +
", filterString=" + filterString +
", user=" + user +
", autoCreated=" + autoCreated +
", maxConsumers=" + maxConsumers +
", purgeOnNoConsumers=" + purgeOnNoConsumers +
", enabled=" + enabled +
", exclusive=" + exclusive +
", lastValue=" + lastValue +
", lastValueKey=" + lastValueKey +
", nonDestructive=" + nonDestructive +
", consumersBeforeDispatch=" + consumersBeforeDispatch +
", delayBeforeDispatch=" + delayBeforeDispatch +
", routingType=" + routingType +
", configurationManaged=" + configurationManaged +
", groupRebalance=" + groupRebalance +
", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch +
", groupBuckets=" + groupBuckets +
", groupFirstKey=" + groupFirstKey +
", autoDelete=" + autoDelete +
", autoDeleteDelay=" + autoDeleteDelay +
", autoDeleteMessageCount=" + autoDeleteMessageCount +
", internal=" + internal +
"]";
return "PersistentQueueBindingEncoding [queueConfiguration=" + config + "]";
}
public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address,
final SimpleString filterString,
final SimpleString user,
final boolean autoCreated,
final int maxConsumers,
final boolean purgeOnNoConsumers,
final boolean enabled,
final boolean exclusive,
final boolean groupRebalance,
final boolean groupRebalancePauseDispatch,
final int groupBuckets,
final SimpleString groupFirstKey,
final boolean lastValue,
final SimpleString lastValueKey,
final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final byte routingType,
final boolean configurationManaged,
final long ringSize,
final boolean internal) {
this.name = name;
this.address = address;
this.filterString = filterString;
this.user = user;
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
this.enabled = enabled;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupBuckets = groupBuckets;
this.groupFirstKey = groupFirstKey;
this.lastValue = lastValue;
this.lastValueKey = lastValueKey;
this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.routingType = routingType;
this.configurationManaged = configurationManaged;
this.ringSize = ringSize;
this.internal = internal;
public PersistentQueueBindingEncoding(final QueueConfiguration config) {
this.config = config;
}
@Override
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
@Override
public SimpleString getAddress() {
return address;
}
@Override
public void replaceQueueName(SimpleString newName) {
this.name = newName;
}
public void replaceAddress(SimpleString address) {
this.address = address;
}
@Override
public SimpleString getFilterString() {
return filterString;
}
@Override
public SimpleString getQueueName() {
return name;
}
@Override
public SimpleString getUser() {
return user;
}
@Override
public boolean isAutoCreated() {
return autoCreated;
}
@Override
public boolean isConfigurationManaged() {
return configurationManaged;
}
@Override
public void setConfigurationManaged(boolean configurationManaged) {
this.configurationManaged = configurationManaged;
public QueueConfiguration getQueueConfiguration() {
return config;
}
@Override
@ -233,156 +65,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
return queueStatusEncodings;
}
@Override
public int getMaxConsumers() {
return maxConsumers;
}
@Override
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
@Override
public boolean isPurgeOnNoConsumers() {
return purgeOnNoConsumers;
}
@Override
public void setPurgeOnNoConsumers(boolean purgeOnNoConsumers) {
this.purgeOnNoConsumers = purgeOnNoConsumers;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
@Override
public boolean isExclusive() {
return exclusive;
}
@Override
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
@Override
public boolean isLastValue() {
return lastValue;
}
@Override
public void setLastValue(boolean lastValue) {
this.lastValue = lastValue;
}
@Override
public SimpleString getLastValueKey() {
return lastValueKey;
}
@Override
public void setLastValueKey(SimpleString lastValueKey) {
this.lastValueKey = lastValueKey;
}
@Override
public boolean isNonDestructive() {
return nonDestructive;
}
@Override
public void setNonDestructive(boolean nonDestructive) {
this.nonDestructive = nonDestructive;
}
@Override
public int getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}
@Override
public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
this.consumersBeforeDispatch = consumersBeforeDispatch;
}
@Override
public long getDelayBeforeDispatch() {
return delayBeforeDispatch;
}
@Override
public void setDelayBeforeDispatch(long delayBeforeDispatch) {
this.delayBeforeDispatch = delayBeforeDispatch;
}
@Override
public byte getRoutingType() {
return routingType;
}
@Override
public void setRoutingType(byte routingType) {
this.routingType = routingType;
}
@Override
public boolean isGroupRebalance() {
return groupRebalance;
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return groupRebalancePauseDispatch;
}
@Override
public int getGroupBuckets() {
return groupBuckets;
}
@Override
public SimpleString getGroupFirstKey() {
return groupFirstKey;
}
@Override
public boolean isAutoDelete() {
return autoDelete;
}
@Override
public long getAutoDeleteDelay() {
return autoDeleteDelay;
}
@Override
public long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
@Override
public long getRingSize() {
return ringSize;
}
@Override
public boolean isInternal() {
return internal;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
address = buffer.readSimpleString();
filterString = buffer.readNullableSimpleString();
config.setName(buffer.readSimpleString());
config.setAddress(buffer.readSimpleString());
config.setFilterString(buffer.readNullableSimpleString());
String metadata = buffer.readNullableSimpleString().toString();
if (metadata != null) {
@ -391,146 +78,107 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
String[] keyValuePair = element.split("=");
if (keyValuePair.length == 2) {
if (keyValuePair[0].equals("user")) {
user = SimpleString.of(keyValuePair[1]);
config.setUser(SimpleString.of(keyValuePair[1]));
}
}
}
}
autoCreated = buffer.readBoolean();
config.setAutoCreated(buffer.readBoolean());
if (buffer.readableBytes() > 0) {
maxConsumers = buffer.readInt();
purgeOnNoConsumers = buffer.readBoolean();
routingType = buffer.readByte();
} else {
maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType().getType();
if (buffer.readable()) {
config.setMaxConsumers(buffer.readInt());
config.setPurgeOnNoConsumers(buffer.readBoolean());
config.setRoutingType(RoutingType.getType(buffer.readByte()));
}
if (buffer.readableBytes() > 0) {
exclusive = buffer.readBoolean();
} else {
exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
if (buffer.readable()) {
config.setExclusive(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
lastValue = buffer.readBoolean();
} else {
lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
if (buffer.readable()) {
config.setLastValue(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
configurationManaged = buffer.readBoolean();
} else {
configurationManaged = false;
if (buffer.readable()) {
config.setConfigurationManaged(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
consumersBeforeDispatch = buffer.readInt();
} else {
consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
if (buffer.readable()) {
config.setConsumersBeforeDispatch(buffer.readInt());
}
if (buffer.readableBytes() > 0) {
delayBeforeDispatch = buffer.readLong();
} else {
delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
if (buffer.readable()) {
config.setDelayBeforeDispatch(buffer.readLong());
}
if (buffer.readableBytes() > 0) {
lastValueKey = buffer.readNullableSimpleString();
} else {
lastValueKey = ActiveMQDefaultConfiguration.getDefaultLastValueKey();
if (buffer.readable()) {
config.setLastValueKey(buffer.readNullableSimpleString());
}
if (buffer.readableBytes() > 0) {
nonDestructive = buffer.readBoolean();
} else {
nonDestructive = ActiveMQDefaultConfiguration.getDefaultNonDestructive();
if (buffer.readable()) {
config.setNonDestructive(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
groupRebalance = buffer.readBoolean();
} else {
groupRebalance = ActiveMQDefaultConfiguration.getDefaultGroupRebalance();
if (buffer.readable()) {
config.setGroupRebalance(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
groupBuckets = buffer.readInt();
} else {
groupBuckets = ActiveMQDefaultConfiguration.getDefaultGroupBuckets();
if (buffer.readable()) {
config.setGroupBuckets(buffer.readInt());
}
if (buffer.readableBytes() > 0) {
autoDelete = buffer.readBoolean();
} else {
autoDelete = ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete(autoCreated);
if (buffer.readable()) {
config.setAutoDelete(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
autoDeleteDelay = buffer.readLong();
} else {
autoDeleteDelay = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay();
if (buffer.readable()) {
config.setAutoDeleteDelay(buffer.readLong());
}
if (buffer.readableBytes() > 0) {
autoDeleteMessageCount = buffer.readLong();
} else {
autoDeleteMessageCount = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount();
if (buffer.readable()) {
config.setAutoDeleteMessageCount(buffer.readLong());
}
if (buffer.readableBytes() > 0) {
groupFirstKey = buffer.readNullableSimpleString();
} else {
groupFirstKey = ActiveMQDefaultConfiguration.getDefaultGroupFirstKey();
if (buffer.readable()) {
config.setGroupFirstKey(buffer.readNullableSimpleString());
}
if (buffer.readableBytes() > 0) {
ringSize = buffer.readLong();
} else {
ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
if (buffer.readable()) {
config.setRingSize(buffer.readLong());
}
if (buffer.readableBytes() > 0) {
enabled = buffer.readBoolean();
} else {
enabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
if (buffer.readable()) {
config.setEnabled(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = buffer.readBoolean();
} else {
groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
if (buffer.readable()) {
config.setGroupRebalancePauseDispatch(buffer.readBoolean());
}
if (buffer.readableBytes() > 0) {
internal = buffer.readBoolean();
} else {
internal = ActiveMQDefaultConfiguration.getDefaultInternal();
if (buffer.readable()) {
config.setInternal(buffer.readBoolean());
}
QueueConfigurationUtils.applyStaticDefaults(config);
}
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name);
buffer.writeSimpleString(address);
buffer.writeNullableSimpleString(filterString);
buffer.writeSimpleString(config.getName());
buffer.writeSimpleString(config.getAddress());
buffer.writeNullableSimpleString(config.getFilterString());
buffer.writeNullableSimpleString(createMetadata());
buffer.writeBoolean(autoCreated);
buffer.writeInt(maxConsumers);
buffer.writeBoolean(purgeOnNoConsumers);
buffer.writeByte(routingType);
buffer.writeBoolean(exclusive);
buffer.writeBoolean(lastValue);
buffer.writeBoolean(configurationManaged);
buffer.writeInt(consumersBeforeDispatch);
buffer.writeLong(delayBeforeDispatch);
buffer.writeNullableSimpleString(lastValueKey);
buffer.writeBoolean(nonDestructive);
buffer.writeBoolean(groupRebalance);
buffer.writeInt(groupBuckets);
buffer.writeBoolean(autoDelete);
buffer.writeLong(autoDeleteDelay);
buffer.writeLong(autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
buffer.writeLong(ringSize);
buffer.writeBoolean(enabled);
buffer.writeBoolean(groupRebalancePauseDispatch);
buffer.writeBoolean(internal);
buffer.writeBoolean(config.isAutoCreated());
buffer.writeInt(config.getMaxConsumers());
buffer.writeBoolean(config.isPurgeOnNoConsumers());
buffer.writeByte(config.getRoutingType().getType());
buffer.writeBoolean(config.isExclusive());
buffer.writeBoolean(config.isLastValue());
buffer.writeBoolean(config.isConfigurationManaged());
buffer.writeInt(config.getConsumersBeforeDispatch());
buffer.writeLong(config.getDelayBeforeDispatch());
buffer.writeNullableSimpleString(config.getLastValueKey());
buffer.writeBoolean(config.isNonDestructive());
buffer.writeBoolean(config.isGroupRebalance());
buffer.writeInt(config.getGroupBuckets());
buffer.writeBoolean(config.isAutoDelete());
buffer.writeLong(config.getAutoDeleteDelay());
buffer.writeLong(config.getAutoDeleteMessageCount());
buffer.writeNullableSimpleString(config.getGroupFirstKey());
buffer.writeLong(config.getRingSize());
buffer.writeBoolean(config.isEnabled());
buffer.writeBoolean(config.isGroupRebalancePauseDispatch());
buffer.writeBoolean(config.isInternal());
}
@Override
public int getEncodeSize() {
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
return SimpleString.sizeofString(config.getName()) + SimpleString.sizeofString(config.getAddress()) +
SimpleString.sizeofNullableString(config.getFilterString()) + DataConstants.SIZE_BOOLEAN +
SimpleString.sizeofNullableString(createMetadata()) +
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN +
@ -540,14 +188,14 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_INT +
DataConstants.SIZE_LONG +
SimpleString.sizeofNullableString(lastValueKey) +
SimpleString.sizeofNullableString(config.getLastValueKey()) +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
SimpleString.sizeofNullableString(groupFirstKey) +
SimpleString.sizeofNullableString(config.getGroupFirstKey()) +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN +
@ -556,8 +204,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
private SimpleString createMetadata() {
StringBuilder metadata = new StringBuilder();
if (user != null) {
metadata.append("user=").append(user).append(";");
if (config.getUser() != null) {
metadata.append("user=").append(config.getUser()).append(";");
}
return SimpleString.of(metadata.toString());
}

View File

@ -1116,45 +1116,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Binding binding = getPostOffice().getBinding(realName);
final SimpleString addressName = binding != null && binding.getType() == BindingType.LOCAL_QUEUE
? binding.getAddress() : CompositeAddress.extractAddressName(name);
final SimpleString addressName = binding != null && binding.getType() == BindingType.LOCAL_QUEUE ? binding.getAddress() : CompositeAddress.extractAddressName(name);
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
boolean defaultExclusiveQueue = addressSettings.isDefaultExclusiveQueue();
boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
SimpleString defaultLastValueKey = addressSettings.getDefaultLastValueKey();
boolean defaultNonDestructive = addressSettings.isDefaultNonDestructive();
int defaultConsumersBeforeDispatch = addressSettings.getDefaultConsumersBeforeDispatch();
long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch();
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance();
boolean defaultGroupRebalancePauseDispatch = addressSettings.isDefaultGroupRebalancePauseDispatch();
int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets();
SimpleString defaultGroupFirstKey = addressSettings.getDefaultGroupFirstKey();
long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
long autoDeleteQueuesMessageCount = addressSettings.getAutoDeleteQueuesMessageCount();
long defaultRingSize = addressSettings.getDefaultRingSize();
boolean defaultEnabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
Filter filter = queue.getFilter();
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled(), queue.isConfigurationManaged());
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null, null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null, null);
if (binding != null && binding.getBindable() instanceof Queue queue) {
response = new QueueQueryResult(queue.getQueueConfiguration(), queue.getConsumerCount(), queue.getMessageCount(), addressSettings.isAutoCreateQueues(), true, addressSettings.getDefaultConsumerWindowSize());
} else {
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled, false);
QueueConfiguration config = QueueConfiguration.of(realName).setAddress(addressName);
QueueConfigurationUtils.applyDynamicDefaults(config, addressSettings);
boolean isManagement = realName.equals(managementAddress);
response = new QueueQueryResult(config, isManagement ? -1 : 0, isManagement ? -1 : 0, addressSettings.isAutoCreateQueues(), isManagement ? true : false, addressSettings.getDefaultConsumerWindowSize());
}
return response;
@ -3797,7 +3771,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListQueueConfiguration(List<QueueConfiguration> queues) throws Exception {
for (QueueConfiguration config : queues) {
try {
QueueConfigurationUtils.applyDynamicQueueDefaults(config, addressSettingsRepository.getMatch(config.getAddress().toString()));
QueueConfigurationUtils.applyDynamicDefaults(config, addressSettingsRepository.getMatch(config.getAddress().toString()));
config.setAutoCreateAddress(true);
@ -3806,7 +3780,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// determine if there is an address::queue match; update it if so
if (locateQueue(config.getName()) != null && locateQueue(config.getName()).getAddress().equals(config.getAddress())) {
config.setConfigurationManaged(true);
setUnsetQueueParamsToDefaults(config);
QueueConfigurationUtils.applyStaticDefaults(config);
updateQueue(config, true);
} else {
// if the address::queue doesn't exist then create it
@ -4107,7 +4081,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString()));
QueueConfigurationUtils.applyDynamicDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString()));
AddressInfo info = postOfficeInUse.getAddressInfo(queueConfiguration.getAddress());
if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@ -28,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
@ -66,7 +66,6 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class PostOfficeJournalLoader implements JournalLoader {
@ -121,50 +120,27 @@ public class PostOfficeJournalLoader implements JournalLoader {
List<QueueBindingInfo> queueBindingInfos) throws Exception {
int duplicateID = 0;
for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) {
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
QueueConfiguration queueConfig = queueBindingInfo.getQueueConfiguration();
queueBindingInfosMap.put(queueConfig.getId(), queueBindingInfo);
Filter filter = FilterImpl.createFilter(queueConfig.getFilterString());
if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) {
if (postOffice.getBinding(queueConfig.getName()) != null) {
if (FilterUtils.isTopicIdentification(filter)) {
final long tx = storageManager.generateID();
storageManager.deleteQueueBinding(tx, queueBindingInfo.getId());
storageManager.deleteQueueBinding(tx, queueConfig.getId());
storageManager.commitBindings(tx);
continue;
} else {
final SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++));
ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueBindingInfo.getQueueName().toString(), newName.toString());
queueBindingInfo.replaceQueueName(newName);
final SimpleString newName = queueConfig.getName().concat("-" + (duplicateID++));
ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueConfig.getName().toString(), newName.toString());
queueConfig.setName(newName);
}
}
final Queue queue = queueFactory.createQueueWith(QueueConfiguration.of(queueBindingInfo.getQueueName())
.setId(queueBindingInfo.getId())
.setAddress(queueBindingInfo.getAddress())
.setFilterString(queueBindingInfo.getFilterString())
.setUser(queueBindingInfo.getUser())
final Queue queue = queueFactory.createQueueWith(QueueConfiguration.of(queueConfig)
.setDurable(true)
.setTemporary(false)
.setAutoCreated(queueBindingInfo.isAutoCreated())
.setPurgeOnNoConsumers(queueBindingInfo.isPurgeOnNoConsumers())
.setEnabled(queueBindingInfo.isEnabled())
.setMaxConsumers(queueBindingInfo.getMaxConsumers())
.setExclusive(queueBindingInfo.isExclusive())
.setGroupRebalance(queueBindingInfo.isGroupRebalance())
.setGroupBuckets(queueBindingInfo.getGroupBuckets())
.setGroupFirstKey(queueBindingInfo.getGroupFirstKey())
.setLastValue(queueBindingInfo.isLastValue())
.setLastValueKey(queueBindingInfo.getLastValueKey())
.setNonDestructive(queueBindingInfo.isNonDestructive())
.setConsumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch())
.setDelayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch())
.setAutoDelete(queueBindingInfo.isAutoDelete())
.setAutoDeleteDelay(queueBindingInfo.getAutoDeleteDelay())
.setAutoDeleteMessageCount(queueBindingInfo.getAutoDeleteMessageCount())
.setRoutingType(RoutingType.getType(queueBindingInfo.getRoutingType()))
.setConfigurationManaged(queueBindingInfo.isConfigurationManaged())
.setRingSize(queueBindingInfo.getRingSize())
.setInternal(queueBindingInfo.isInternal()),
.setTemporary(false),
pagingManager,
filter);

View File

@ -22,28 +22,147 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
public class QueueConfigurationUtils {
public static void applyDynamicQueueDefaults(QueueConfiguration config, AddressSettings as) {
config.setMaxConsumers(config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers());
config.setExclusive(config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
config.setGroupRebalance(config.isGroupRebalance() == null ? as.isDefaultGroupRebalance() : config.isGroupRebalance());
config.setGroupRebalancePauseDispatch(config.isGroupRebalancePauseDispatch() == null ? as.isDefaultGroupRebalancePauseDispatch() : config.isGroupRebalancePauseDispatch());
config.setGroupBuckets(config.getGroupBuckets() == null ? as.getDefaultGroupBuckets() : config.getGroupBuckets());
config.setGroupFirstKey(config.getGroupFirstKey() == null ? as.getDefaultGroupFirstKey() : config.getGroupFirstKey());
config.setLastValue(config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue());
config.setLastValueKey(config.getLastValueKey() == null ? as.getDefaultLastValueKey() : config.getLastValueKey());
config.setNonDestructive(config.isNonDestructive() == null ? as.isDefaultNonDestructive() : config.isNonDestructive());
config.setConsumersBeforeDispatch(config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch());
config.setDelayBeforeDispatch(config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch());
config.setRingSize(config.getRingSize() == null ? as.getDefaultRingSize() : config.getRingSize());
config.setRoutingType(config.getRoutingType() == null ? as.getDefaultQueueRoutingType() : config.getRoutingType());
config.setPurgeOnNoConsumers(config.isPurgeOnNoConsumers() == null ? as.isDefaultPurgeOnNoConsumers() : config.isPurgeOnNoConsumers());
config.setAutoCreateAddress(config.isAutoCreateAddress() == null ? as.isAutoCreateAddresses() : config.isAutoCreateAddress());
// set the default auto-delete
config.setAutoDelete(config.isAutoDelete() == null ? !config.isConfigurationManaged() && ((config.isAutoCreated() && as.isAutoDeleteQueues()) || (!config.isAutoCreated() && as.isAutoDeleteCreatedQueues())) : config.isAutoDelete());
/**
* This method inspects the {@code QueueConfiguration} and applies default values to it based on the {@code
* AddressSettings} as well as {@code static} defaults. The {@code static} values are applied only after the values
* from the {@code AddressSettings} are applied. Values are only changed to defaults if they are {@code null}.
* @param config the {@code QueueConfiguration} to modify with default values
* @param as the {@code AddressSettings} to use when applying dynamic default values
*/
public static void applyDefaults(final QueueConfiguration config, AddressSettings as) {
applyDynamicDefaults(config, as);
applyStaticDefaults(config);
}
config.setAutoDeleteDelay(config.getAutoDeleteDelay() == null ? as.getAutoDeleteQueuesDelay() : config.getAutoDeleteDelay());
config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount());
/**
* This method inspects the {@code QueueConfiguration} and applies default values to it based on the {@code
* AddressSettings}. Values are only changed to defaults if they are {@code null}.
* @param config the {@code QueueConfiguration} to modify with default values
* @param as the {@code AddressSettings} to use when applying dynamic default values
*/
public static void applyDynamicDefaults(final QueueConfiguration config, AddressSettings as) {
if (config.getMaxConsumers() == null) {
config.setMaxConsumers(as.getDefaultMaxConsumers());
}
if (config.isExclusive() == null) {
config.setExclusive(as.isDefaultExclusiveQueue());
}
if (config.isGroupRebalance() == null) {
config.setGroupRebalance(as.isDefaultGroupRebalance());
}
if (config.isGroupRebalancePauseDispatch() == null) {
config.setGroupRebalancePauseDispatch(as.isDefaultGroupRebalancePauseDispatch());
}
if (config.getGroupBuckets() == null) {
config.setGroupBuckets(as.getDefaultGroupBuckets());
}
if (config.getGroupFirstKey() == null) {
config.setGroupFirstKey(as.getDefaultGroupFirstKey());
}
if (config.isLastValue() == null) {
config.setLastValue(as.isDefaultLastValueQueue());
}
if (config.getLastValueKey() == null) {
config.setLastValueKey(as.getDefaultLastValueKey());
}
if (config.isNonDestructive() == null) {
config.setNonDestructive(as.isDefaultNonDestructive());
}
if (config.getConsumersBeforeDispatch() == null) {
config.setConsumersBeforeDispatch(as.getDefaultConsumersBeforeDispatch());
}
if (config.getDelayBeforeDispatch() == null) {
config.setDelayBeforeDispatch(as.getDefaultDelayBeforeDispatch());
}
if (config.getRingSize() == null) {
config.setRingSize(as.getDefaultRingSize());
}
if (config.getRoutingType() == null) {
config.setRoutingType(as.getDefaultQueueRoutingType());
}
if (config.isPurgeOnNoConsumers() == null) {
config.setPurgeOnNoConsumers(as.isDefaultPurgeOnNoConsumers());
}
if (config.isAutoCreateAddress() == null) {
config.setAutoCreateAddress(as.isAutoCreateAddresses());
}
if (config.isAutoDelete() == null) {
config.setAutoDelete(!config.isConfigurationManaged() && ((config.isAutoCreated() && as.isAutoDeleteQueues()) || (!config.isAutoCreated() && as.isAutoDeleteCreatedQueues())));
}
if (config.getAutoDeleteDelay() == null) {
config.setAutoDeleteDelay(as.getAutoDeleteQueuesDelay());
}
if (config.getAutoDeleteMessageCount() == null) {
config.setAutoDeleteMessageCount(as.getAutoDeleteQueuesMessageCount());
}
}
config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled());
/**
* This method inspects the {@code QueueConfiguration} and applies default values to it based on the {@code static}
* defaults. Values are only changed to defaults if they are {@code null}.
* <br>
* Static defaults are not applied directly in {@code QueueConfiguration} because {@code null} values allow us to
* determine whether the fields have actually been set. This allows us, for example, to omit unset fields from JSON
* payloads during queue-related management operations.
* @param config the {@code QueueConfiguration} to modify with default values
*/
public static void applyStaticDefaults(final QueueConfiguration config) {
if (config.getMaxConsumers() == null) {
config.setMaxConsumers(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
}
if (config.getRoutingType() == null) {
config.setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType());
}
if (config.isExclusive() == null) {
config.setExclusive(ActiveMQDefaultConfiguration.getDefaultExclusive());
}
if (config.isNonDestructive() == null) {
config.setNonDestructive(ActiveMQDefaultConfiguration.getDefaultNonDestructive());
}
if (config.isPurgeOnNoConsumers() == null) {
config.setPurgeOnNoConsumers(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
}
if (config.isEnabled() == null) {
config.setEnabled(ActiveMQDefaultConfiguration.getDefaultEnabled());
}
if (config.getConsumersBeforeDispatch() == null) {
config.setConsumersBeforeDispatch(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
}
if (config.getDelayBeforeDispatch() == null) {
config.setDelayBeforeDispatch(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
}
if (config.isGroupRebalance() == null) {
config.setGroupRebalance(ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
}
if (config.isGroupRebalancePauseDispatch() == null) {
config.setGroupRebalancePauseDispatch(ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch());
}
if (config.getGroupBuckets() == null) {
config.setGroupBuckets(ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
}
if (config.getGroupFirstKey() == null) {
config.setGroupFirstKey(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
}
if (config.isAutoDelete() == null) {
config.setAutoDelete(ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete(config.isAutoCreated()));
}
if (config.getAutoDeleteDelay() == null) {
config.setAutoDeleteDelay(ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay());
}
if (config.getAutoDeleteMessageCount() == null) {
config.setAutoDeleteMessageCount(ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount());
}
if (config.getRingSize() == null) {
config.setRingSize(ActiveMQDefaultConfiguration.getDefaultRingSize());
}
if (config.isLastValue() == null) {
config.setLastValue(ActiveMQDefaultConfiguration.getDefaultLastValue());
}
if (config.getLastValueKey() == null) {
config.setLastValueKey(ActiveMQDefaultConfiguration.getDefaultLastValueKey());
}
if (config.isInternal() == null) {
config.setInternal(ActiveMQDefaultConfiguration.getDefaultInternal());
}
}
}

View File

@ -16,101 +16,79 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class QueueBindingEncodingTest {
@Test
public void testEncodeDecode() {
final SimpleString name = RandomUtil.randomUUIDSimpleString();
final SimpleString address = RandomUtil.randomUUIDSimpleString();
final SimpleString filterString = RandomUtil.randomUUIDSimpleString();
final SimpleString user = RandomUtil.randomUUIDSimpleString();
final boolean autoCreated = RandomUtil.randomBoolean();
final int maxConsumers = RandomUtil.randomInt();
final boolean purgeOnNoConsumers = RandomUtil.randomBoolean();
final boolean exclusive = RandomUtil.randomBoolean();
final boolean groupRebalance = RandomUtil.randomBoolean();
final int groupBuckets = RandomUtil.randomInt();
final SimpleString groupFirstKey = RandomUtil.randomUUIDSimpleString();
final boolean lastValue = RandomUtil.randomBoolean();
final SimpleString lastValueKey = RandomUtil.randomUUIDSimpleString();
final boolean nonDestructive = RandomUtil.randomBoolean();
final int consumersBeforeDispatch = RandomUtil.randomInt();
final long delayBeforeDispatch = RandomUtil.randomLong();
final boolean autoDelete = RandomUtil.randomBoolean();
final long autoDeleteDelay = RandomUtil.randomLong();
final long autoDeleteMessageCount = RandomUtil.randomLong();
final byte routingType = RandomUtil.randomByte();
final boolean configurationManaged = RandomUtil.randomBoolean();
final long ringSize = RandomUtil.randomLong();
final boolean enabled = RandomUtil.randomBoolean();
final boolean groupRebalancePauseDispatch = RandomUtil.randomBoolean();
final boolean internal = RandomUtil.randomBoolean();
final QueueConfiguration config = QueueConfiguration.of(RandomUtil.randomUUIDSimpleString())
.setAddress(RandomUtil.randomUUIDSimpleString())
.setFilterString(RandomUtil.randomUUIDSimpleString())
.setUser(RandomUtil.randomUUIDSimpleString())
.setAutoCreated(RandomUtil.randomBoolean())
.setMaxConsumers(RandomUtil.randomInt())
.setPurgeOnNoConsumers(RandomUtil.randomBoolean())
.setEnabled(RandomUtil.randomBoolean())
.setExclusive(RandomUtil.randomBoolean())
.setGroupRebalance(RandomUtil.randomBoolean())
.setGroupRebalancePauseDispatch(RandomUtil.randomBoolean())
.setGroupBuckets(RandomUtil.randomInt())
.setGroupFirstKey(RandomUtil.randomUUIDSimpleString())
.setLastValue(RandomUtil.randomBoolean())
.setLastValueKey(RandomUtil.randomUUIDSimpleString())
.setNonDestructive(RandomUtil.randomBoolean())
.setConsumersBeforeDispatch(RandomUtil.randomInt())
.setDelayBeforeDispatch(RandomUtil.randomLong())
.setAutoDelete(RandomUtil.randomBoolean())
.setAutoDeleteDelay(RandomUtil.randomLong())
.setAutoDeleteMessageCount(RandomUtil.randomLong())
.setRoutingType(RoutingType.getType((byte) RandomUtil.randomInterval(0, 1)))
.setConfigurationManaged(RandomUtil.randomBoolean())
.setRingSize(RandomUtil.randomLong())
.setInternal(RandomUtil.randomBoolean());
PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
address,
filterString,
user,
autoCreated,
maxConsumers,
purgeOnNoConsumers,
enabled,
exclusive,
groupRebalance,
groupRebalancePauseDispatch,
groupBuckets,
groupFirstKey,
lastValue,
lastValueKey,
nonDestructive,
consumersBeforeDispatch,
delayBeforeDispatch,
autoDelete,
autoDeleteDelay,
autoDeleteMessageCount,
routingType,
configurationManaged,
ringSize,
internal);
PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(config);
int size = encoding.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size);
encoding.encode(encodedBuffer);
PersistentQueueBindingEncoding decoding = new PersistentQueueBindingEncoding();
decoding.decode(encodedBuffer);
QueueConfiguration decodedQueueConfig = decoding.getQueueConfiguration();
assertEquals(name, decoding.getQueueName());
assertEquals(address, decoding.getAddress());
assertEquals(filterString, decoding.getFilterString());
assertEquals(user, decoding.getUser());
assertEquals(autoCreated, decoding.isAutoCreated());
assertEquals(maxConsumers, decoding.getMaxConsumers());
assertEquals(purgeOnNoConsumers, decoding.isPurgeOnNoConsumers());
assertEquals(enabled, decoding.isEnabled());
assertEquals(exclusive, decoding.isExclusive());
assertEquals(groupRebalance, decoding.isGroupRebalance());
assertEquals(groupBuckets, decoding.getGroupBuckets());
assertEquals(groupFirstKey, decoding.getGroupFirstKey());
assertEquals(lastValue, decoding.isLastValue());
assertEquals(lastValueKey, decoding.getLastValueKey());
assertEquals(nonDestructive, decoding.isNonDestructive());
assertEquals(consumersBeforeDispatch, decoding.getConsumersBeforeDispatch());
assertEquals(delayBeforeDispatch, decoding.getDelayBeforeDispatch());
assertEquals(autoDelete, decoding.isAutoDelete());
assertEquals(autoDeleteDelay, decoding.getAutoDeleteDelay());
assertEquals(autoDeleteMessageCount, decoding.getAutoDeleteMessageCount());
assertEquals(routingType, decoding.getRoutingType());
assertEquals(configurationManaged, decoding.isConfigurationManaged());
assertEquals(ringSize, decoding.getRingSize());
assertEquals(groupRebalancePauseDispatch, decoding.isGroupRebalancePauseDispatch());
assertEquals(internal, decoding.isInternal());
assertEquals(config.getName(), decodedQueueConfig.getName());
assertEquals(config.getAddress(), decodedQueueConfig.getAddress());
assertEquals(config.getFilterString(), decodedQueueConfig.getFilterString());
assertEquals(config.getUser(), decodedQueueConfig.getUser());
assertEquals(config.isAutoCreated(), decodedQueueConfig.isAutoCreated());
assertEquals(config.getMaxConsumers(), decodedQueueConfig.getMaxConsumers());
assertEquals(config.isPurgeOnNoConsumers(), decodedQueueConfig.isPurgeOnNoConsumers());
assertEquals(config.isEnabled(), decodedQueueConfig.isEnabled());
assertEquals(config.isExclusive(), decodedQueueConfig.isExclusive());
assertEquals(config.isGroupRebalance(), decodedQueueConfig.isGroupRebalance());
assertEquals(config.getGroupBuckets(), decodedQueueConfig.getGroupBuckets());
assertEquals(config.getGroupFirstKey(), decodedQueueConfig.getGroupFirstKey());
assertEquals(config.isLastValue(), decodedQueueConfig.isLastValue());
assertEquals(config.getLastValueKey(), decodedQueueConfig.getLastValueKey());
assertEquals(config.isNonDestructive(), decodedQueueConfig.isNonDestructive());
assertEquals(config.getConsumersBeforeDispatch(), decodedQueueConfig.getConsumersBeforeDispatch());
assertEquals(config.getDelayBeforeDispatch(), decodedQueueConfig.getDelayBeforeDispatch());
assertEquals(config.isAutoDelete(), decodedQueueConfig.isAutoDelete());
assertEquals(config.getAutoDeleteDelay(), decodedQueueConfig.getAutoDeleteDelay());
assertEquals(config.getAutoDeleteMessageCount(), decodedQueueConfig.getAutoDeleteMessageCount());
assertEquals(config.getRoutingType(), decodedQueueConfig.getRoutingType());
assertEquals(config.isConfigurationManaged(), decodedQueueConfig.isConfigurationManaged());
assertEquals(config.getRingSize(), decodedQueueConfig.getRingSize());
assertEquals(config.isGroupRebalancePauseDispatch(), decodedQueueConfig.isGroupRebalancePauseDispatch());
assertEquals(config.isInternal(), decodedQueueConfig.isInternal());
}
}

View File

@ -50,6 +50,32 @@ public class QueueQueryTest extends ActiveMQTestBase {
server.start();
}
@Test
public void testQueueQueryOnManagement() throws Exception {
SimpleString addressName = server.getConfiguration().getManagementAddress();
SimpleString queueName = server.getConfiguration().getManagementAddress();
QueueQueryResult queueQueryResult = server.queueQuery(queueName);
assertTrue(queueQueryResult.isExists());
assertEquals(RoutingType.MULTICAST, queueQueryResult.getRoutingType());
assertEquals(queueName, queueQueryResult.getName());
assertTrue(queueQueryResult.isAutoCreateQueues());
assertNull(queueQueryResult.getFilterString());
assertFalse(queueQueryResult.isAutoCreated());
assertEquals(addressName, queueQueryResult.getAddress());
assertEquals(-1, queueQueryResult.getMessageCount());
assertEquals(-1, queueQueryResult.getConsumerCount());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_MAX_QUEUE_CONSUMERS, queueQueryResult.getMaxConsumers());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_CONSUMERS_BEFORE_DISPATCH, queueQueryResult.getConsumersBeforeDispatch().intValue());
assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, queueQueryResult.getDefaultConsumerWindowSize().intValue());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_DELAY_BEFORE_DISPATCH, queueQueryResult.getDelayBeforeDispatch().longValue());
assertNull(queueQueryResult.getLastValueKey());
assertTrue(queueQueryResult.isDurable());
assertFalse(queueQueryResult.isPurgeOnNoConsumers());
assertFalse(queueQueryResult.isTemporary());
assertFalse(queueQueryResult.isExclusive());
assertFalse(queueQueryResult.isNonDestructive());
}
@Test
public void testQueueQueryDefaultsOnStaticQueue() throws Exception {
SimpleString addressName = SimpleString.of(UUID.randomUUID().toString());