From a20b23bf370c9c011f64f722c318498c9c6dfc15 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 9 Dec 2016 17:59:25 +0000 Subject: [PATCH] ARTEMIS-787 Update CORE Protocol/Client Packets --- .../api/core/client/ClientSession.java | 10 +- .../core/client/impl/ClientSessionImpl.java | 8 +- .../core/client/impl/QueueQueryImpl.java | 59 +++++- .../core/impl/ActiveMQSessionContext.java | 9 +- .../core/protocol/core/impl/ChannelImpl.java | 2 + .../protocol/core/impl/PacketDecoder.java | 6 + .../core/protocol/core/impl/PacketImpl.java | 2 + .../SessionQueueQueryResponseMessage.java | 12 +- .../SessionQueueQueryResponseMessage_V2.java | 41 ++-- .../SessionQueueQueryResponseMessage_V3.java | 191 ++++++++++++++++++ .../artemis/core/server/QueueQueryResult.java | 58 ++++-- .../activemq/artemis/reader/MessageUtil.java | 13 +- .../artemis/jms/client/ActiveMQMessage.java | 18 +- .../jms/client/ActiveMQMessageConsumer.java | 2 - .../jms/client/ActiveMQMessageProducer.java | 3 +- .../artemis/jms/client/ActiveMQQueue.java | 20 ++ .../artemis/jms/client/ActiveMQSession.java | 4 +- .../jms/client/ActiveMQTemporaryQueue.java | 20 ++ .../jms/client/ActiveMQTemporaryTopic.java | 20 ++ .../artemis/jms/client/ActiveMQTopic.java | 20 ++ .../impl/ActiveMQServerControlImpl.java | 4 +- .../core/persistence/QueueBindingInfo.java | 4 + .../AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 29 ++- .../postoffice/impl/LocalQueueBinding.java | 6 +- .../core/postoffice/impl/PostOfficeImpl.java | 4 - .../core/ServerSessionPacketHandler.java | 9 +- .../artemis/core/server/QueueConfig.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 17 +- .../artemis/core/server/impl/AddressInfo.java | 5 + .../server/impl/PostOfficeJournalLoader.java | 4 +- 31 files changed, 506 insertions(+), 98 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index a414f95638..c8d483cb21 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -100,7 +100,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * Returns true if auto-creation for this queue is enabled and if the queue queried is a JMS queue, * false else. */ - boolean isAutoCreateJmsQueues(); + boolean isAutoCreateQueues(); /** * Returns the number of consumers attached to the queue. @@ -128,6 +128,14 @@ public interface ClientSession extends XAResource, AutoCloseable { * @return */ SimpleString getName(); + + RoutingType getRoutingType(); + + int getMaxConsumers(); + + boolean isDeleteOnNoConsumers(); + + boolean isAutoCreated(); } // Lifecycle operations ------------------------------------------ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 1ed825b0bb..dd10e5bcba 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -422,7 +422,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName, final String filter) throws ActiveMQException { - createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName)); + createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter)); } /** @@ -560,7 +560,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi routingType, filter, durable, - !durable, + false, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false); @@ -1823,8 +1823,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi filterString, durable, temp, - ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), - ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + maxConsumers, + deleteOnNoConsumers, autoCreated); } finally { endCall(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java index 40ea86a591..5afdd8d56d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.client.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.server.RoutingType; public class QueueQueryImpl implements ClientSession.QueueQuery { @@ -37,7 +38,15 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { private final SimpleString name; - private final boolean autoCreateJmsQueues; + private final boolean autoCreateQueues; + + private final boolean autoCreated; + + private final RoutingType routingType; + + private final boolean deleteOnNoConsumers; + + private final int maxConsumers; public QueueQueryImpl(final boolean durable, final boolean temporary, @@ -58,7 +67,23 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final SimpleString address, final SimpleString name, final boolean exists, - final boolean autoCreateJmsQueues) { + final boolean autoCreateQueues) { + this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, -1, false, false, RoutingType.MULTICAST); + } + + public QueueQueryImpl(final boolean durable, + final boolean temporary, + final int consumerCount, + final long messageCount, + final SimpleString filterString, + final SimpleString address, + final SimpleString name, + final boolean exists, + final boolean autoCreateQueues, + final int maxConsumers, + final boolean autoCreated, + final boolean deleteOnNoConsumers, + final RoutingType routingType) { this.durable = durable; this.temporary = temporary; this.consumerCount = consumerCount; @@ -67,7 +92,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { this.address = address; this.name = name; this.exists = exists; - this.autoCreateJmsQueues = autoCreateJmsQueues; + this.autoCreateQueues = autoCreateQueues; + this.maxConsumers = maxConsumers; + this.autoCreated = autoCreated; + this.deleteOnNoConsumers = deleteOnNoConsumers; + this.routingType = routingType; } @Override @@ -101,8 +130,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { } @Override - public boolean isAutoCreateJmsQueues() { - return autoCreateJmsQueues; + public boolean isAutoCreateQueues() { + return autoCreateQueues; } @Override @@ -115,5 +144,25 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { return exists; } + @Override + public RoutingType getRoutingType() { + return routingType; + } + + @Override + public int getMaxConsumers() { + return maxConsumers; + } + + @Override + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + @Override + public boolean isAutoCreated() { + return autoCreated; + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index ed0814249c..57076458d7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -79,7 +79,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; @@ -265,7 +265,7 @@ public class ActiveMQSessionContext extends SessionContext { @Override public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); - SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); + SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); return response.toQueueQuery(); } @@ -290,7 +290,7 @@ public class ActiveMQSessionContext extends SessionContext { SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); - SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); + SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); // The actual windows size that gets used is determined by the user since // could be overridden on the queue settings @@ -710,8 +710,7 @@ public class ActiveMQSessionContext extends SessionContext { // they are defined in broker.xml // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover if (!queueInfo.isDurable()) { - // TODO (mtaylor) QueueInfo needs updating to include new parameters, this method should pass in del mode - CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false); + CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isDeleteOnNoConsumers(), queueInfo.isAutoCreated(), false); sendPacketWithoutLock(sessionChannel, createQueueRequest); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 41be0804c4..d1b17bf7a8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -172,6 +172,8 @@ public final class ChannelImpl implements Channel { return version >= 126; case PacketImpl.SESS_BINDINGQUERY_RESP_V3: return version >= 127; + case PacketImpl.SESS_QUEUEQUERY_RESP_V3: + return version >= 129; default: return true; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 15629c8c11..89a6c9abd6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; @@ -127,6 +128,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; @@ -241,6 +243,10 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionQueueQueryResponseMessage_V2(); break; } + case SESS_QUEUEQUERY_RESP_V3: { + packet = new SessionQueueQueryResponseMessage_V3(); + break; + } case CREATE_ADDRESS: { packet = new CreateAddressMessage(); break; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index a65bdfcac5..5bdf727345 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -255,6 +255,8 @@ public class PacketImpl implements Packet { public static final byte CREATE_SHARED_QUEUE_V2 = -13; + public static final byte SESS_QUEUEQUERY_RESP_V3 = -14; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java index b8313b2eae..7d9c184e20 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java @@ -49,8 +49,8 @@ public class SessionQueueQueryResponseMessage extends PacketImpl { this(null, null, false, false, null, 0, 0, false); } - public SessionQueueQueryResponseMessage(byte v2) { - super(v2); + public SessionQueueQueryResponseMessage(byte v) { + super(v); } private SessionQueueQueryResponseMessage(final SimpleString name, @@ -159,6 +159,13 @@ public class SessionQueueQueryResponseMessage extends PacketImpl { @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); + buff.append("]"); + return buff.toString(); + } + + @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); buff.append(", address=" + address); buff.append(", name=" + name); buff.append(", consumerCount=" + consumerCount); @@ -167,7 +174,6 @@ public class SessionQueueQueryResponseMessage extends PacketImpl { buff.append(", exists=" + exists); buff.append(", temporary=" + temporary); buff.append(", messageCount=" + messageCount); - buff.append("]"); return buff.toString(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java index 77ad0f32c8..667ce6ea95 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java @@ -24,10 +24,10 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage { - private boolean autoCreationEnabled; + protected boolean autoCreateQueues; public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result) { - this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues()); + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues()); } public SessionQueueQueryResponseMessage_V2() { @@ -42,7 +42,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon final int consumerCount, final long messageCount, final boolean exists, - final boolean autoCreationEnabled) { + final boolean autoCreateQueues) { super(SESS_QUEUEQUERY_RESP_V2); this.durable = durable; @@ -61,52 +61,53 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon this.exists = exists; - this.autoCreationEnabled = autoCreationEnabled; + this.autoCreateQueues = autoCreateQueues; + } + public SessionQueueQueryResponseMessage_V2(byte v) { + super(v); } - public boolean isAutoCreationEnabled() { - return autoCreationEnabled; + public boolean isAutoCreateQueues() { + return autoCreateQueues; } @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); - buffer.writeBoolean(autoCreationEnabled); + buffer.writeBoolean(autoCreateQueues); } @Override public void decodeRest(final ActiveMQBuffer buffer) { super.decodeRest(buffer); - autoCreationEnabled = buffer.readBoolean(); + autoCreateQueues = buffer.readBoolean(); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + (autoCreationEnabled ? 1231 : 1237); + result = prime * result + (autoCreateQueues ? 1231 : 1237); return result; } @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); - buff.append(", address=" + address); - buff.append(", name=" + name); - buff.append(", consumerCount=" + consumerCount); - buff.append(", filterString=" + filterString); - buff.append(", durable=" + durable); - buff.append(", exists=" + exists); - buff.append(", temporary=" + temporary); - buff.append(", messageCount=" + messageCount); - buff.append(", autoCreationEnabled=" + autoCreationEnabled); buff.append("]"); return buff.toString(); } + @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); + buff.append(", autoCreationEnabled=" + autoCreateQueues); + return buff.toString(); + } + @Override public ClientSession.QueueQuery toQueueQuery() { - return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreationEnabled()); + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues()); } @Override @@ -118,7 +119,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon if (!(obj instanceof SessionQueueQueryResponseMessage_V2)) return false; SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2) obj; - if (autoCreationEnabled != other.autoCreationEnabled) + if (autoCreateQueues != other.autoCreateQueues) return false; return true; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java new file mode 100644 index 0000000000..b3664da601 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.client.impl.QueueQueryImpl; +import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; + +public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryResponseMessage_V2 { + + protected boolean autoCreated; + + protected boolean deleteOnNoConsumers; + + protected RoutingType routingType; + + protected int maxConsumers; + + public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) { + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isDeleteOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers()); + } + + public SessionQueueQueryResponseMessage_V3() { + this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1); + } + + private SessionQueueQueryResponseMessage_V3(final SimpleString name, + final SimpleString address, + final boolean durable, + final boolean temporary, + final SimpleString filterString, + final int consumerCount, + final long messageCount, + final boolean exists, + final boolean autoCreateQueues, + final boolean autoCreated, + final boolean deleteOnNoConsumers, + final RoutingType routingType, + final int maxConsumers) { + super(SESS_QUEUEQUERY_RESP_V3); + + this.durable = durable; + + this.temporary = temporary; + + this.consumerCount = consumerCount; + + this.messageCount = messageCount; + + this.filterString = filterString; + + this.address = address; + + this.name = name; + + this.exists = exists; + + this.autoCreateQueues = autoCreateQueues; + + this.autoCreated = autoCreated; + + this.deleteOnNoConsumers = deleteOnNoConsumers; + + this.routingType = routingType; + + this.maxConsumers = maxConsumers; + } + + public boolean isAutoCreated() { + return autoCreated; + } + + public void setAutoCreated(boolean autoCreated) { + this.autoCreated = autoCreated; + } + + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + } + + public RoutingType getRoutingType() { + return routingType; + } + + public void setRoutingType(RoutingType routingType) { + this.routingType = routingType; + } + + public int getMaxConsumers() { + return maxConsumers; + } + + public void setMaxConsumers(int maxConsumers) { + this.maxConsumers = maxConsumers; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeBoolean(autoCreated); + buffer.writeBoolean(deleteOnNoConsumers); + buffer.writeByte(routingType.getType()); + buffer.writeInt(maxConsumers); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + autoCreated = buffer.readBoolean(); + deleteOnNoConsumers = buffer.readBoolean(); + routingType = RoutingType.getType(buffer.readByte()); + maxConsumers = buffer.readInt(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (autoCreated ? 1231 : 1237); + result = prime * result + (deleteOnNoConsumers ? 1231 : 1237); + result = prime * result + routingType.hashCode(); + result = prime * result + maxConsumers; + return result; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append("]"); + return buff.toString(); + } + + @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); + buff.append(", autoCreated=" + autoCreated); + buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers); + buff.append(", routingType=" + routingType); + buff.append(", maxConsumers=" + maxConsumers); + return buff.toString(); + } + + @Override + public ClientSession.QueueQuery toQueueQuery() { + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isDeleteOnNoConsumers(), getRoutingType()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionQueueQueryResponseMessage_V3)) + return false; + SessionQueueQueryResponseMessage_V3 other = (SessionQueueQueryResponseMessage_V3) obj; + if (autoCreated != other.autoCreated) + return false; + if (deleteOnNoConsumers != other.deleteOnNoConsumers) + return false; + if (routingType == null) { + if (other.routingType != null) + return false; + } else if (!routingType.equals(other.routingType)) + return false; + if (maxConsumers != other.maxConsumers) + return false; + return true; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index f9740de024..de14888cc3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -36,7 +36,15 @@ public class QueueQueryResult { private boolean temporary; - private boolean autoCreateJmsQueues; + private boolean autoCreateQueues; + + private boolean autoCreated; + + private boolean deleteOnNoConsumers; + + private RoutingType routingType; + + private int maxConsumers; public QueueQueryResult(final SimpleString name, final SimpleString address, @@ -45,19 +53,12 @@ public class QueueQueryResult { final SimpleString filterString, final int consumerCount, final long messageCount, - final boolean autoCreateJmsQueues) { - this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true); - } - - public QueueQueryResult(final SimpleString name, - final SimpleString address, - final boolean durable, - final boolean temporary, - final SimpleString filterString, - final int consumerCount, - final long messageCount, - final boolean autoCreateJmsQueues, - final boolean exists) { + final boolean autoCreateQueues, + final boolean exists, + final boolean autoCreated, + final boolean deleteOnNoConsumers, + final RoutingType routingType, + final int maxConsumers) { this.durable = durable; this.temporary = temporary; @@ -72,9 +73,17 @@ public class QueueQueryResult { this.name = name; - this.autoCreateJmsQueues = autoCreateJmsQueues; + this.autoCreateQueues = autoCreateQueues; this.exists = exists; + + this.autoCreated = autoCreated; + + this.deleteOnNoConsumers = deleteOnNoConsumers; + + this.routingType = routingType; + + this.maxConsumers = maxConsumers; } public boolean isExists() { @@ -109,8 +118,23 @@ public class QueueQueryResult { return temporary; } - public boolean isAutoCreateJmsQueues() { - return autoCreateJmsQueues; + public boolean isAutoCreateQueues() { + return autoCreateQueues; } + public boolean isAutoCreated() { + return autoCreated; + } + + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public RoutingType getRoutingType() { + return routingType; + } + + public int getMaxConsumers() { + return maxConsumers; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index 09b190254b..9d37cd3d96 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -120,12 +120,19 @@ public class MessageUtil { } public static void clearProperties(Message message) { + /** + * JavaDoc for this method states: + * Clears a message's properties. + * The message's header fields and body are not cleared. + * + * Since the {@code Message.HDR_ROUTING_TYPE} is used for the JMSDestination header it isn't cleared + */ List toRemove = new ArrayList<>(); for (SimpleString propName : message.getPropertyNames()) { - if (!propName.startsWith(JMS) || propName.startsWith(JMSX) || - propName.startsWith(JMS_)) { + if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || + propName.startsWith(JMS_)) && !propName.equals(Message.HDR_ROUTING_TYPE)) { toRemove.add(propName); } } @@ -140,7 +147,7 @@ public class MessageUtil { for (SimpleString propName : message.getPropertyNames()) { if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || - propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) { + propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) { set.add(propName.toString()); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 283f95832f..4f0be81b78 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; @@ -201,8 +202,6 @@ public class ActiveMQMessage implements javax.jms.Message { private long jmsDeliveryTime; - private boolean fromQueue; - // Constructors -------------------------------------------------- /* @@ -399,8 +398,17 @@ public class ActiveMQMessage implements javax.jms.Message { public Destination getJMSDestination() throws JMSException { if (dest == null) { SimpleString address = message.getAddress(); + String prefix = ""; + if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) { + RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)); + if (routingType.equals(RoutingType.ANYCAST)) { + prefix = QUEUE_QUALIFIED_PREFIX; + } else if (routingType.equals(RoutingType.MULTICAST)) { + prefix = TOPIC_QUALIFIED_PREFIX; + } + } - dest = address == null ? null : ActiveMQDestination.fromPrefixedName((fromQueue ? QUEUE_QUALIFIED_PREFIX : TOPIC_QUALIFIED_PREFIX) + address.toString()); + dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString()); } return dest; @@ -779,10 +787,6 @@ public class ActiveMQMessage implements javax.jms.Message { // Public -------------------------------------------------------- - public void setFromQueue(boolean fromQueue) { - this.fromQueue = fromQueue; - } - public void setIndividualAcknowledge() { this.individualAck = true; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index b449aeaacc..8bc1fd8181 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -240,8 +240,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr } else { coreMessage.acknowledge(); } - - jmsMsg.setFromQueue(destination instanceof ActiveMQQueue); } return jmsMsg; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index aa4754b733..4c1d33564f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -41,7 +41,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -493,7 +492,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID); byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType(); - coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType); + coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType); try { /** diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java index 2632daed1c..a6d047a227 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java @@ -76,6 +76,26 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue { return "ActiveMQQueue[" + name + "]"; } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof ActiveMQQueue)) { + return false; + } + + ActiveMQQueue that = (ActiveMQQueue) o; + + return super.getAddress().equals(that.getAddress()); + } + + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index fe2a1a0f6d..3e9b76f06c 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -820,7 +820,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { SimpleString simpleAddress = queue.getSimpleAddress(); - session.createTemporaryQueue(simpleAddress, simpleAddress); + session.createTemporaryQueue(simpleAddress, RoutingType.ANYCAST, simpleAddress); connection.addTemporaryQueue(simpleAddress); @@ -1074,7 +1074,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { QueueQuery response = session.queueQuery(queue.getSimpleAddress()); - if (!response.isExists() && !response.isAutoCreateJmsQueues()) { + if (!response.isExists() && !response.isAutoCreateQueues()) { return null; } else { return queue; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java index fa01409ed6..88a822ade7 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java @@ -52,6 +52,26 @@ public class ActiveMQTemporaryQueue extends ActiveMQQueue implements TemporaryQu return "ActiveMQTemporaryQueue[" + name + "]"; } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof ActiveMQTemporaryQueue)) { + return false; + } + + ActiveMQTemporaryQueue that = (ActiveMQTemporaryQueue) o; + + return super.getAddress().equals(that.getAddress()); + } + + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java index 07c3ec9dcb..98b5ba6c36 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java @@ -36,6 +36,26 @@ public class ActiveMQTemporaryTopic extends ActiveMQTopic implements TemporaryTo // Public -------------------------------------------------------- + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof ActiveMQTemporaryTopic)) { + return false; + } + + ActiveMQTemporaryTopic that = (ActiveMQTemporaryTopic) o; + + return super.getAddress().equals(that.getAddress()); + } + + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java index 5ffd918619..941b4408d0 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java @@ -71,6 +71,26 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic { return "ActiveMQTopic[" + name + "]"; } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof ActiveMQTopic)) { + return false; + } + + ActiveMQTopic that = (ActiveMQTopic) o; + + return super.getAddress().equals(that.getAddress()); + } + + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 9e103f492b..e6c32c8248 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -564,7 +564,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception { + @Parameter(name = "routingType", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception { checkStarted(); clearIO(); @@ -665,7 +665,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } finally { blockOnIO(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 3a0c240594..29f42773f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -53,4 +53,8 @@ public interface QueueBindingInfo { boolean isDeleteOnNoConsumers(); void setDeleteOnNoConsumers(boolean deleteOnNoConsumers); + + byte getRoutingType(); + + void setRoutingType(byte routingType); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index ee03aa9754..15083e8fe9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1221,7 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { SimpleString filterString = filter == null ? null : filter.getFilterString(); - PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers()); + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers(), queue.getRoutingType().getType()); readLock(); try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 88bc1cff43..36a0ae6d8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.EncodingSupport; @@ -45,6 +46,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public boolean deleteOnNoConsumers; + public byte routingType; + public PersistentQueueBindingEncoding() { } @@ -65,6 +68,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin maxConsumers + ", deleteOnNoConsumers=" + deleteOnNoConsumers + + ", routingType=" + + routingType + "]"; } @@ -74,7 +79,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin final SimpleString user, final boolean autoCreated, final int maxConsumers, - final boolean deleteOnNoConsumers) { + final boolean deleteOnNoConsumers, + final byte routingType) { this.name = name; this.address = address; this.filterString = filterString; @@ -82,6 +88,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin this.autoCreated = autoCreated; this.maxConsumers = maxConsumers; this.deleteOnNoConsumers = deleteOnNoConsumers; + this.routingType = routingType; } @Override @@ -156,6 +163,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin this.deleteOnNoConsumers = deleteOnNoConsumers; } + @Override + public byte getRoutingType() { + return routingType; + } + + @Override + public void setRoutingType(byte routingType) { + this.routingType = routingType; + } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); @@ -180,9 +197,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin if (buffer.readableBytes() > 0) { maxConsumers = buffer.readInt(); deleteOnNoConsumers = buffer.readBoolean(); + routingType = buffer.readByte(); } else { - maxConsumers = -1; - deleteOnNoConsumers = false; + maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); + routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType().getType(); } } @@ -195,6 +214,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin buffer.writeBoolean(autoCreated); buffer.writeInt(maxConsumers); buffer.writeBoolean(deleteOnNoConsumers); + buffer.writeByte(routingType); } @Override @@ -203,7 +223,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN + SimpleString.sizeofNullableString(createMetadata()) + DataConstants.SIZE_INT + - DataConstants.SIZE_BOOLEAN; + DataConstants.SIZE_BOOLEAN + + DataConstants.SIZE_BYTE; } private SimpleString createMetadata() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index d02f0f0668..e09d108418 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.Bindable; @@ -131,8 +131,8 @@ public class LocalQueueBinding implements QueueBinding { } private boolean isMatchRoutingType(ServerMessage message) { - if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) { - return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType(); + if (message.containsProperty(Message.HDR_ROUTING_TYPE)) { + return message.getByteProperty(Message.HDR_ROUTING_TYPE).equals(queue.getRoutingType().getType()); } return true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 2fc34093aa..e060542075 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -664,10 +664,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding SimpleString address = message.getAddress(); - if (address.toString().equals("testQueue")) { - System.out.println("f"); - } - setPagingStore(message); AtomicBoolean startedTX = new AtomicBoolean(false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 65ffc69676..d3cc6176c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; @@ -218,7 +219,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { // We send back queue information on the queue as a response- this allows the queue to // be automatically recreated on failover QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName()); - if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { + response = new SessionQueueQueryResponseMessage_V3(queueQueryResult); + } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { response = new SessionQueueQueryResponseMessage_V2(queueQueryResult); } else { response = new SessionQueueQueryResponseMessage(queueQueryResult); @@ -284,7 +287,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); - if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { + response = new SessionQueueQueryResponseMessage_V3(result); + } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { response = new SessionQueueQueryResponseMessage_V2(result); } else { response = new SessionQueueQueryResponseMessage(result); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index 3435ca0ae3..6e0d5afaa6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -126,7 +126,7 @@ public final class QueueConfig { return this; } - public Builder deliveryMode(RoutingType routingType) { + public Builder routingType(RoutingType routingType) { this.routingType = routingType; return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index d6e626cef1..06852ceaba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -719,7 +719,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); } - boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues(); + boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues(); QueueQueryResult response; @@ -734,14 +734,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { SimpleString filterString = filter == null ? null : filter.getFilterString(); - response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues); + response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isDeleteOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers()); } else if (name.equals(managementAddress)) { // make an exception for the management address (see HORNETQ-29) - response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues); - } else if (autoCreateJmsQueues) { - response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false); + response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1); + } else if (autoCreateQueues) { + response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, false, RoutingType.MULTICAST, 0); } else { - response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false); + response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0); } return response; @@ -1657,7 +1657,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean deleteOnNoConsumers, final boolean autoCreateAddress) throws Exception { - // TODO: fix logging here as this could be for a topic or queue ActiveMQServerLogger.LOGGER.deployQueue(queueName); return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); @@ -2476,14 +2475,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (info == null) { if (autoCreateAddress) { - postOffice.addAddressInfo(defaultAddressInfo); + postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true)); info = postOffice.getAddressInfo(addressName); } else { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); } } - final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deliveryMode(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build(); + final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build(); final Queue queue = queueFactory.createQueueWith(queueConfig); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 7816cde3b4..6384ae98da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -119,6 +119,11 @@ public class AddressInfo { for (RoutingType routingType : routingTypes) { buff.append(routingType.toString() + ","); } + // delete hanging comma + if (buff.charAt(buff.length() - 1) == ',') { + buff.deleteCharAt(buff.length() - 1); + } + buff.append("}"); buff.append(", autoCreated=" + autoCreated); buff.append("]"); return buff.toString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 20ef545b0c..f52b5cc945 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; @@ -149,7 +150,8 @@ public class PostOfficeJournalLoader implements JournalLoader { .temporary(false) .autoCreated(queueBindingInfo.isAutoCreated()) .deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers()) - .maxConsumers(queueBindingInfo.getMaxConsumers()); + .maxConsumers(queueBindingInfo.getMaxConsumers()) + .routingType(RoutingType.getType(queueBindingInfo.getRoutingType())); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));