diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index a414f95638..c8d483cb21 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -100,7 +100,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* Returns true
if auto-creation for this queue is enabled and if the queue queried is a JMS queue,
* false
else.
*/
- boolean isAutoCreateJmsQueues();
+ boolean isAutoCreateQueues();
/**
* Returns the number of consumers attached to the queue.
@@ -128,6 +128,14 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @return
*/
SimpleString getName();
+
+ RoutingType getRoutingType();
+
+ int getMaxConsumers();
+
+ boolean isDeleteOnNoConsumers();
+
+ boolean isAutoCreated();
}
// Lifecycle operations ------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 1ed825b0bb..dd10e5bcba 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -422,7 +422,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName, final String filter) throws ActiveMQException {
- createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName));
+ createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter));
}
/**
@@ -560,7 +560,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
routingType,
filter,
durable,
- !durable,
+ false,
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
false);
@@ -1823,8 +1823,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
filterString,
durable,
temp,
- ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
- ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ maxConsumers,
+ deleteOnNoConsumers,
autoCreated);
} finally {
endCall();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 40ea86a591..5afdd8d56d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.server.RoutingType;
public class QueueQueryImpl implements ClientSession.QueueQuery {
@@ -37,7 +38,15 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final SimpleString name;
- private final boolean autoCreateJmsQueues;
+ private final boolean autoCreateQueues;
+
+ private final boolean autoCreated;
+
+ private final RoutingType routingType;
+
+ private final boolean deleteOnNoConsumers;
+
+ private final int maxConsumers;
public QueueQueryImpl(final boolean durable,
final boolean temporary,
@@ -58,7 +67,23 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final SimpleString address,
final SimpleString name,
final boolean exists,
- final boolean autoCreateJmsQueues) {
+ final boolean autoCreateQueues) {
+ this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, -1, false, false, RoutingType.MULTICAST);
+ }
+
+ public QueueQueryImpl(final boolean durable,
+ final boolean temporary,
+ final int consumerCount,
+ final long messageCount,
+ final SimpleString filterString,
+ final SimpleString address,
+ final SimpleString name,
+ final boolean exists,
+ final boolean autoCreateQueues,
+ final int maxConsumers,
+ final boolean autoCreated,
+ final boolean deleteOnNoConsumers,
+ final RoutingType routingType) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@@ -67,7 +92,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.address = address;
this.name = name;
this.exists = exists;
- this.autoCreateJmsQueues = autoCreateJmsQueues;
+ this.autoCreateQueues = autoCreateQueues;
+ this.maxConsumers = maxConsumers;
+ this.autoCreated = autoCreated;
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ this.routingType = routingType;
}
@Override
@@ -101,8 +130,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
}
@Override
- public boolean isAutoCreateJmsQueues() {
- return autoCreateJmsQueues;
+ public boolean isAutoCreateQueues() {
+ return autoCreateQueues;
}
@Override
@@ -115,5 +144,25 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
return exists;
}
+ @Override
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ @Override
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
+
+ @Override
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ @Override
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index ed0814249c..57076458d7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -79,7 +79,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@@ -265,7 +265,7 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
- SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
+ SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
return response.toQueueQuery();
}
@@ -290,7 +290,7 @@ public class ActiveMQSessionContext extends SessionContext {
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
- SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
+ SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
@@ -710,8 +710,7 @@ public class ActiveMQSessionContext extends SessionContext {
// they are defined in broker.xml
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
if (!queueInfo.isDurable()) {
- // TODO (mtaylor) QueueInfo needs updating to include new parameters, this method should pass in del mode
- CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false);
+ CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isDeleteOnNoConsumers(), queueInfo.isAutoCreated(), false);
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 41be0804c4..d1b17bf7a8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -172,6 +172,8 @@ public final class ChannelImpl implements Channel {
return version >= 126;
case PacketImpl.SESS_BINDINGQUERY_RESP_V3:
return version >= 127;
+ case PacketImpl.SESS_QUEUEQUERY_RESP_V3:
+ return version >= 129;
default:
return true;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 15629c8c11..89a6c9abd6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
@@ -127,6 +128,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
@@ -241,6 +243,10 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionQueueQueryResponseMessage_V2();
break;
}
+ case SESS_QUEUEQUERY_RESP_V3: {
+ packet = new SessionQueueQueryResponseMessage_V3();
+ break;
+ }
case CREATE_ADDRESS: {
packet = new CreateAddressMessage();
break;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index a65bdfcac5..5bdf727345 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -255,6 +255,8 @@ public class PacketImpl implements Packet {
public static final byte CREATE_SHARED_QUEUE_V2 = -13;
+ public static final byte SESS_QUEUEQUERY_RESP_V3 = -14;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
index b8313b2eae..7d9c184e20 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
@@ -49,8 +49,8 @@ public class SessionQueueQueryResponseMessage extends PacketImpl {
this(null, null, false, false, null, 0, 0, false);
}
- public SessionQueueQueryResponseMessage(byte v2) {
- super(v2);
+ public SessionQueueQueryResponseMessage(byte v) {
+ super(v);
}
private SessionQueueQueryResponseMessage(final SimpleString name,
@@ -159,6 +159,13 @@ public class SessionQueueQueryResponseMessage extends PacketImpl {
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
+ buff.append("]");
+ return buff.toString();
+ }
+
+ @Override
+ public String getParentString() {
+ StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", address=" + address);
buff.append(", name=" + name);
buff.append(", consumerCount=" + consumerCount);
@@ -167,7 +174,6 @@ public class SessionQueueQueryResponseMessage extends PacketImpl {
buff.append(", exists=" + exists);
buff.append(", temporary=" + temporary);
buff.append(", messageCount=" + messageCount);
- buff.append("]");
return buff.toString();
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java
index 77ad0f32c8..667ce6ea95 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java
@@ -24,10 +24,10 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage {
- private boolean autoCreationEnabled;
+ protected boolean autoCreateQueues;
public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result) {
- this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues());
+ this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues());
}
public SessionQueueQueryResponseMessage_V2() {
@@ -42,7 +42,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon
final int consumerCount,
final long messageCount,
final boolean exists,
- final boolean autoCreationEnabled) {
+ final boolean autoCreateQueues) {
super(SESS_QUEUEQUERY_RESP_V2);
this.durable = durable;
@@ -61,52 +61,53 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon
this.exists = exists;
- this.autoCreationEnabled = autoCreationEnabled;
+ this.autoCreateQueues = autoCreateQueues;
+ }
+ public SessionQueueQueryResponseMessage_V2(byte v) {
+ super(v);
}
- public boolean isAutoCreationEnabled() {
- return autoCreationEnabled;
+ public boolean isAutoCreateQueues() {
+ return autoCreateQueues;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
- buffer.writeBoolean(autoCreationEnabled);
+ buffer.writeBoolean(autoCreateQueues);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
- autoCreationEnabled = buffer.readBoolean();
+ autoCreateQueues = buffer.readBoolean();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
- result = prime * result + (autoCreationEnabled ? 1231 : 1237);
+ result = prime * result + (autoCreateQueues ? 1231 : 1237);
return result;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", address=" + address);
- buff.append(", name=" + name);
- buff.append(", consumerCount=" + consumerCount);
- buff.append(", filterString=" + filterString);
- buff.append(", durable=" + durable);
- buff.append(", exists=" + exists);
- buff.append(", temporary=" + temporary);
- buff.append(", messageCount=" + messageCount);
- buff.append(", autoCreationEnabled=" + autoCreationEnabled);
buff.append("]");
return buff.toString();
}
+ @Override
+ public String getParentString() {
+ StringBuffer buff = new StringBuffer(super.getParentString());
+ buff.append(", autoCreationEnabled=" + autoCreateQueues);
+ return buff.toString();
+ }
+
@Override
public ClientSession.QueueQuery toQueueQuery() {
- return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreationEnabled());
+ return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues());
}
@Override
@@ -118,7 +119,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon
if (!(obj instanceof SessionQueueQueryResponseMessage_V2))
return false;
SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2) obj;
- if (autoCreationEnabled != other.autoCreationEnabled)
+ if (autoCreateQueues != other.autoCreateQueues)
return false;
return true;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
new file mode 100644
index 0000000000..b3664da601
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.client.impl.QueueQueryImpl;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
+
+public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryResponseMessage_V2 {
+
+ protected boolean autoCreated;
+
+ protected boolean deleteOnNoConsumers;
+
+ protected RoutingType routingType;
+
+ protected int maxConsumers;
+
+ public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
+ this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isDeleteOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers());
+ }
+
+ public SessionQueueQueryResponseMessage_V3() {
+ this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1);
+ }
+
+ private SessionQueueQueryResponseMessage_V3(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final long messageCount,
+ final boolean exists,
+ final boolean autoCreateQueues,
+ final boolean autoCreated,
+ final boolean deleteOnNoConsumers,
+ final RoutingType routingType,
+ final int maxConsumers) {
+ super(SESS_QUEUEQUERY_RESP_V3);
+
+ this.durable = durable;
+
+ this.temporary = temporary;
+
+ this.consumerCount = consumerCount;
+
+ this.messageCount = messageCount;
+
+ this.filterString = filterString;
+
+ this.address = address;
+
+ this.name = name;
+
+ this.exists = exists;
+
+ this.autoCreateQueues = autoCreateQueues;
+
+ this.autoCreated = autoCreated;
+
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+
+ this.routingType = routingType;
+
+ this.maxConsumers = maxConsumers;
+ }
+
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
+ public void setAutoCreated(boolean autoCreated) {
+ this.autoCreated = autoCreated;
+ }
+
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ }
+
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ public void setRoutingType(RoutingType routingType) {
+ this.routingType = routingType;
+ }
+
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
+
+ public void setMaxConsumers(int maxConsumers) {
+ this.maxConsumers = maxConsumers;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeBoolean(autoCreated);
+ buffer.writeBoolean(deleteOnNoConsumers);
+ buffer.writeByte(routingType.getType());
+ buffer.writeInt(maxConsumers);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ autoCreated = buffer.readBoolean();
+ deleteOnNoConsumers = buffer.readBoolean();
+ routingType = RoutingType.getType(buffer.readByte());
+ maxConsumers = buffer.readInt();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (autoCreated ? 1231 : 1237);
+ result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
+ result = prime * result + routingType.hashCode();
+ result = prime * result + maxConsumers;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append("]");
+ return buff.toString();
+ }
+
+ @Override
+ public String getParentString() {
+ StringBuffer buff = new StringBuffer(super.getParentString());
+ buff.append(", autoCreated=" + autoCreated);
+ buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
+ buff.append(", routingType=" + routingType);
+ buff.append(", maxConsumers=" + maxConsumers);
+ return buff.toString();
+ }
+
+ @Override
+ public ClientSession.QueueQuery toQueueQuery() {
+ return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isDeleteOnNoConsumers(), getRoutingType());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (!(obj instanceof SessionQueueQueryResponseMessage_V3))
+ return false;
+ SessionQueueQueryResponseMessage_V3 other = (SessionQueueQueryResponseMessage_V3) obj;
+ if (autoCreated != other.autoCreated)
+ return false;
+ if (deleteOnNoConsumers != other.deleteOnNoConsumers)
+ return false;
+ if (routingType == null) {
+ if (other.routingType != null)
+ return false;
+ } else if (!routingType.equals(other.routingType))
+ return false;
+ if (maxConsumers != other.maxConsumers)
+ return false;
+ return true;
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index f9740de024..de14888cc3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -36,7 +36,15 @@ public class QueueQueryResult {
private boolean temporary;
- private boolean autoCreateJmsQueues;
+ private boolean autoCreateQueues;
+
+ private boolean autoCreated;
+
+ private boolean deleteOnNoConsumers;
+
+ private RoutingType routingType;
+
+ private int maxConsumers;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
@@ -45,19 +53,12 @@ public class QueueQueryResult {
final SimpleString filterString,
final int consumerCount,
final long messageCount,
- final boolean autoCreateJmsQueues) {
- this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true);
- }
-
- public QueueQueryResult(final SimpleString name,
- final SimpleString address,
- final boolean durable,
- final boolean temporary,
- final SimpleString filterString,
- final int consumerCount,
- final long messageCount,
- final boolean autoCreateJmsQueues,
- final boolean exists) {
+ final boolean autoCreateQueues,
+ final boolean exists,
+ final boolean autoCreated,
+ final boolean deleteOnNoConsumers,
+ final RoutingType routingType,
+ final int maxConsumers) {
this.durable = durable;
this.temporary = temporary;
@@ -72,9 +73,17 @@ public class QueueQueryResult {
this.name = name;
- this.autoCreateJmsQueues = autoCreateJmsQueues;
+ this.autoCreateQueues = autoCreateQueues;
this.exists = exists;
+
+ this.autoCreated = autoCreated;
+
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+
+ this.routingType = routingType;
+
+ this.maxConsumers = maxConsumers;
}
public boolean isExists() {
@@ -109,8 +118,23 @@ public class QueueQueryResult {
return temporary;
}
- public boolean isAutoCreateJmsQueues() {
- return autoCreateJmsQueues;
+ public boolean isAutoCreateQueues() {
+ return autoCreateQueues;
}
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 09b190254b..9d37cd3d96 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -120,12 +120,19 @@ public class MessageUtil {
}
public static void clearProperties(Message message) {
+ /**
+ * JavaDoc for this method states:
+ * Clears a message's properties.
+ * The message's header fields and body are not cleared.
+ *
+ * Since the {@code Message.HDR_ROUTING_TYPE} is used for the JMSDestination header it isn't cleared
+ */
List toRemove = new ArrayList<>();
for (SimpleString propName : message.getPropertyNames()) {
- if (!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
- propName.startsWith(JMS_)) {
+ if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+ propName.startsWith(JMS_)) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
toRemove.add(propName);
}
}
@@ -140,7 +147,7 @@ public class MessageUtil {
for (SimpleString propName : message.getPropertyNames()) {
if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
- propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) {
+ propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
set.add(propName.toString());
}
}
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 283f95832f..4f0be81b78 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
@@ -201,8 +202,6 @@ public class ActiveMQMessage implements javax.jms.Message {
private long jmsDeliveryTime;
- private boolean fromQueue;
-
// Constructors --------------------------------------------------
/*
@@ -399,8 +398,17 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddress();
+ String prefix = "";
+ if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
+ RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
+ if (routingType.equals(RoutingType.ANYCAST)) {
+ prefix = QUEUE_QUALIFIED_PREFIX;
+ } else if (routingType.equals(RoutingType.MULTICAST)) {
+ prefix = TOPIC_QUALIFIED_PREFIX;
+ }
+ }
- dest = address == null ? null : ActiveMQDestination.fromPrefixedName((fromQueue ? QUEUE_QUALIFIED_PREFIX : TOPIC_QUALIFIED_PREFIX) + address.toString());
+ dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
}
return dest;
@@ -779,10 +787,6 @@ public class ActiveMQMessage implements javax.jms.Message {
// Public --------------------------------------------------------
- public void setFromQueue(boolean fromQueue) {
- this.fromQueue = fromQueue;
- }
-
public void setIndividualAcknowledge() {
this.individualAck = true;
}
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index b449aeaacc..8bc1fd8181 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -240,8 +240,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
} else {
coreMessage.acknowledge();
}
-
- jmsMsg.setFromQueue(destination instanceof ActiveMQQueue);
}
return jmsMsg;
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index aa4754b733..4c1d33564f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -41,7 +41,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -493,7 +492,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
- coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType);
+ coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType);
try {
/**
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
index 2632daed1c..a6d047a227 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
@@ -76,6 +76,26 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
return "ActiveMQQueue[" + name + "]";
}
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof ActiveMQQueue)) {
+ return false;
+ }
+
+ ActiveMQQueue that = (ActiveMQQueue) o;
+
+ return super.getAddress().equals(that.getAddress());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.getAddress().hashCode();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index fe2a1a0f6d..3e9b76f06c 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -820,7 +820,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
SimpleString simpleAddress = queue.getSimpleAddress();
- session.createTemporaryQueue(simpleAddress, simpleAddress);
+ session.createTemporaryQueue(simpleAddress, RoutingType.ANYCAST, simpleAddress);
connection.addTemporaryQueue(simpleAddress);
@@ -1074,7 +1074,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
- if (!response.isExists() && !response.isAutoCreateJmsQueues()) {
+ if (!response.isExists() && !response.isAutoCreateQueues()) {
return null;
} else {
return queue;
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java
index fa01409ed6..88a822ade7 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java
@@ -52,6 +52,26 @@ public class ActiveMQTemporaryQueue extends ActiveMQQueue implements TemporaryQu
return "ActiveMQTemporaryQueue[" + name + "]";
}
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof ActiveMQTemporaryQueue)) {
+ return false;
+ }
+
+ ActiveMQTemporaryQueue that = (ActiveMQTemporaryQueue) o;
+
+ return super.getAddress().equals(that.getAddress());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.getAddress().hashCode();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java
index 07c3ec9dcb..98b5ba6c36 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java
@@ -36,6 +36,26 @@ public class ActiveMQTemporaryTopic extends ActiveMQTopic implements TemporaryTo
// Public --------------------------------------------------------
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof ActiveMQTemporaryTopic)) {
+ return false;
+ }
+
+ ActiveMQTemporaryTopic that = (ActiveMQTemporaryTopic) o;
+
+ return super.getAddress().equals(that.getAddress());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.getAddress().hashCode();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
index 5ffd918619..941b4408d0 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
@@ -71,6 +71,26 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
return "ActiveMQTopic[" + name + "]";
}
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof ActiveMQTopic)) {
+ return false;
+ }
+
+ ActiveMQTopic that = (ActiveMQTopic) o;
+
+ return super.getAddress().equals(that.getAddress());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.getAddress().hashCode();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 9e103f492b..e6c32c8248 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -564,7 +564,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
- @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
+ @Parameter(name = "routingType", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
checkStarted();
clearIO();
@@ -665,7 +665,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
- server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+ server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} finally {
blockOnIO();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 3a0c240594..29f42773f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -53,4 +53,8 @@ public interface QueueBindingInfo {
boolean isDeleteOnNoConsumers();
void setDeleteOnNoConsumers(boolean deleteOnNoConsumers);
+
+ byte getRoutingType();
+
+ void setRoutingType(byte routingType);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index ee03aa9754..15083e8fe9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1221,7 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers());
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers(), queue.getRoutingType().getType());
readLock();
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 88bc1cff43..36a0ae6d8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.util.LinkedList;
import java.util.List;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -45,6 +46,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean deleteOnNoConsumers;
+ public byte routingType;
+
public PersistentQueueBindingEncoding() {
}
@@ -65,6 +68,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
maxConsumers +
", deleteOnNoConsumers=" +
deleteOnNoConsumers +
+ ", routingType=" +
+ routingType +
"]";
}
@@ -74,7 +79,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final SimpleString user,
final boolean autoCreated,
final int maxConsumers,
- final boolean deleteOnNoConsumers) {
+ final boolean deleteOnNoConsumers,
+ final byte routingType) {
this.name = name;
this.address = address;
this.filterString = filterString;
@@ -82,6 +88,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers;
+ this.routingType = routingType;
}
@Override
@@ -156,6 +163,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
+ @Override
+ public byte getRoutingType() {
+ return routingType;
+ }
+
+ @Override
+ public void setRoutingType(byte routingType) {
+ this.routingType = routingType;
+ }
+
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
@@ -180,9 +197,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
if (buffer.readableBytes() > 0) {
maxConsumers = buffer.readInt();
deleteOnNoConsumers = buffer.readBoolean();
+ routingType = buffer.readByte();
} else {
- maxConsumers = -1;
- deleteOnNoConsumers = false;
+ maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
+ deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
+ routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType().getType();
}
}
@@ -195,6 +214,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeBoolean(autoCreated);
buffer.writeInt(maxConsumers);
buffer.writeBoolean(deleteOnNoConsumers);
+ buffer.writeByte(routingType);
}
@Override
@@ -203,7 +223,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
SimpleString.sizeofNullableString(createMetadata()) +
DataConstants.SIZE_INT +
- DataConstants.SIZE_BOOLEAN;
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_BYTE;
}
private SimpleString createMetadata() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index d02f0f0668..e09d108418 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.Bindable;
@@ -131,8 +131,8 @@ public class LocalQueueBinding implements QueueBinding {
}
private boolean isMatchRoutingType(ServerMessage message) {
- if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) {
- return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType();
+ if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
+ return message.getByteProperty(Message.HDR_ROUTING_TYPE).equals(queue.getRoutingType().getType());
}
return true;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2fc34093aa..e060542075 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -664,10 +664,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
SimpleString address = message.getAddress();
- if (address.toString().equals("testQueue")) {
- System.out.println("f");
- }
-
setPagingStore(message);
AtomicBoolean startedTX = new AtomicBoolean(false);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 65ffc69676..d3cc6176c0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@@ -218,7 +219,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
- if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
+ if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
+ response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
+ } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
} else {
response = new SessionQueueQueryResponseMessage(queueQueryResult);
@@ -284,7 +287,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
- if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
+ if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
+ response = new SessionQueueQueryResponseMessage_V3(result);
+ } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
response = new SessionQueueQueryResponseMessage_V2(result);
} else {
response = new SessionQueueQueryResponseMessage(result);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 3435ca0ae3..6e0d5afaa6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -126,7 +126,7 @@ public final class QueueConfig {
return this;
}
- public Builder deliveryMode(RoutingType routingType) {
+ public Builder routingType(RoutingType routingType) {
this.routingType = routingType;
return this;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index d6e626cef1..06852ceaba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -719,7 +719,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
- boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
+ boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
QueueQueryResult response;
@@ -734,14 +734,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
+ response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isDeleteOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers());
} else if (name.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
- response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
- } else if (autoCreateJmsQueues) {
- response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
+ response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1);
+ } else if (autoCreateQueues) {
+ response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, false, RoutingType.MULTICAST, 0);
} else {
- response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
+ response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0);
}
return response;
@@ -1657,7 +1657,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
- // TODO: fix logging here as this could be for a topic or queue
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
@@ -2476,14 +2475,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (info == null) {
if (autoCreateAddress) {
- postOffice.addAddressInfo(defaultAddressInfo);
+ postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true));
info = postOffice.getAddressInfo(addressName);
} else {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
}
- final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deliveryMode(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
+ final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 7816cde3b4..6384ae98da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -119,6 +119,11 @@ public class AddressInfo {
for (RoutingType routingType : routingTypes) {
buff.append(routingType.toString() + ",");
}
+ // delete hanging comma
+ if (buff.charAt(buff.length() - 1) == ',') {
+ buff.deleteCharAt(buff.length() - 1);
+ }
+ buff.append("}");
buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 20ef545b0c..f52b5cc945 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
@@ -149,7 +150,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
.temporary(false)
.autoCreated(queueBindingInfo.isAutoCreated())
.deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
- .maxConsumers(queueBindingInfo.getMaxConsumers());
+ .maxConsumers(queueBindingInfo.getMaxConsumers())
+ .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));