ARTEMIS-787 Update CORE Protocol/Client Packets

This commit is contained in:
Martyn Taylor 2016-12-09 17:59:25 +00:00
parent 279383a798
commit a20b23bf37
31 changed files with 506 additions and 98 deletions

View File

@ -100,7 +100,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* Returns <code>true</code> if auto-creation for this queue is enabled and if the queue queried is a JMS queue,
* <code>false</code> else.
*/
boolean isAutoCreateJmsQueues();
boolean isAutoCreateQueues();
/**
* Returns the number of consumers attached to the queue.
@ -128,6 +128,14 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @return
*/
SimpleString getName();
RoutingType getRoutingType();
int getMaxConsumers();
boolean isDeleteOnNoConsumers();
boolean isAutoCreated();
}
// Lifecycle operations ------------------------------------------

View File

@ -422,7 +422,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName, final String filter) throws ActiveMQException {
createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName));
createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter));
}
/**
@ -560,7 +560,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
routingType,
filter,
durable,
!durable,
false,
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
false);
@ -1823,8 +1823,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
filterString,
durable,
temp,
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
maxConsumers,
deleteOnNoConsumers,
autoCreated);
} finally {
endCall();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.RoutingType;
public class QueueQueryImpl implements ClientSession.QueueQuery {
@ -37,7 +38,15 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final SimpleString name;
private final boolean autoCreateJmsQueues;
private final boolean autoCreateQueues;
private final boolean autoCreated;
private final RoutingType routingType;
private final boolean deleteOnNoConsumers;
private final int maxConsumers;
public QueueQueryImpl(final boolean durable,
final boolean temporary,
@ -58,7 +67,23 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final SimpleString address,
final SimpleString name,
final boolean exists,
final boolean autoCreateJmsQueues) {
final boolean autoCreateQueues) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, -1, false, false, RoutingType.MULTICAST);
}
public QueueQueryImpl(final boolean durable,
final boolean temporary,
final int consumerCount,
final long messageCount,
final SimpleString filterString,
final SimpleString address,
final SimpleString name,
final boolean exists,
final boolean autoCreateQueues,
final int maxConsumers,
final boolean autoCreated,
final boolean deleteOnNoConsumers,
final RoutingType routingType) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@ -67,7 +92,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.address = address;
this.name = name;
this.exists = exists;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.autoCreateQueues = autoCreateQueues;
this.maxConsumers = maxConsumers;
this.autoCreated = autoCreated;
this.deleteOnNoConsumers = deleteOnNoConsumers;
this.routingType = routingType;
}
@Override
@ -101,8 +130,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
}
@Override
public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues;
public boolean isAutoCreateQueues() {
return autoCreateQueues;
}
@Override
@ -115,5 +144,25 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
return exists;
}
@Override
public RoutingType getRoutingType() {
return routingType;
}
@Override
public int getMaxConsumers() {
return maxConsumers;
}
@Override
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
@Override
public boolean isAutoCreated() {
return autoCreated;
}
}

View File

@ -79,7 +79,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@ -265,7 +265,7 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
return response.toQueueQuery();
}
@ -290,7 +290,7 @@ public class ActiveMQSessionContext extends SessionContext {
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
@ -710,8 +710,7 @@ public class ActiveMQSessionContext extends SessionContext {
// they are defined in broker.xml
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
if (!queueInfo.isDurable()) {
// TODO (mtaylor) QueueInfo needs updating to include new parameters, this method should pass in del mode
CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false);
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isDeleteOnNoConsumers(), queueInfo.isAutoCreated(), false);
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}

View File

@ -172,6 +172,8 @@ public final class ChannelImpl implements Channel {
return version >= 126;
case PacketImpl.SESS_BINDINGQUERY_RESP_V3:
return version >= 127;
case PacketImpl.SESS_QUEUEQUERY_RESP_V3:
return version >= 129;
default:
return true;
}

View File

@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
@ -127,6 +128,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
@ -241,6 +243,10 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionQueueQueryResponseMessage_V2();
break;
}
case SESS_QUEUEQUERY_RESP_V3: {
packet = new SessionQueueQueryResponseMessage_V3();
break;
}
case CREATE_ADDRESS: {
packet = new CreateAddressMessage();
break;

View File

@ -255,6 +255,8 @@ public class PacketImpl implements Packet {
public static final byte CREATE_SHARED_QUEUE_V2 = -13;
public static final byte SESS_QUEUEQUERY_RESP_V3 = -14;
// Static --------------------------------------------------------
public PacketImpl(final byte type) {

View File

@ -49,8 +49,8 @@ public class SessionQueueQueryResponseMessage extends PacketImpl {
this(null, null, false, false, null, 0, 0, false);
}
public SessionQueueQueryResponseMessage(byte v2) {
super(v2);
public SessionQueueQueryResponseMessage(byte v) {
super(v);
}
private SessionQueueQueryResponseMessage(final SimpleString name,
@ -159,6 +159,13 @@ public class SessionQueueQueryResponseMessage extends PacketImpl {
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append("]");
return buff.toString();
}
@Override
public String getParentString() {
StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", address=" + address);
buff.append(", name=" + name);
buff.append(", consumerCount=" + consumerCount);
@ -167,7 +174,6 @@ public class SessionQueueQueryResponseMessage extends PacketImpl {
buff.append(", exists=" + exists);
buff.append(", temporary=" + temporary);
buff.append(", messageCount=" + messageCount);
buff.append("]");
return buff.toString();
}

View File

@ -24,10 +24,10 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage {
private boolean autoCreationEnabled;
protected boolean autoCreateQueues;
public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues());
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues());
}
public SessionQueueQueryResponseMessage_V2() {
@ -42,7 +42,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon
final int consumerCount,
final long messageCount,
final boolean exists,
final boolean autoCreationEnabled) {
final boolean autoCreateQueues) {
super(SESS_QUEUEQUERY_RESP_V2);
this.durable = durable;
@ -61,52 +61,53 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon
this.exists = exists;
this.autoCreationEnabled = autoCreationEnabled;
this.autoCreateQueues = autoCreateQueues;
}
public SessionQueueQueryResponseMessage_V2(byte v) {
super(v);
}
public boolean isAutoCreationEnabled() {
return autoCreationEnabled;
public boolean isAutoCreateQueues() {
return autoCreateQueues;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(autoCreationEnabled);
buffer.writeBoolean(autoCreateQueues);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
autoCreationEnabled = buffer.readBoolean();
autoCreateQueues = buffer.readBoolean();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreationEnabled ? 1231 : 1237);
result = prime * result + (autoCreateQueues ? 1231 : 1237);
return result;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
buff.append(", name=" + name);
buff.append(", consumerCount=" + consumerCount);
buff.append(", filterString=" + filterString);
buff.append(", durable=" + durable);
buff.append(", exists=" + exists);
buff.append(", temporary=" + temporary);
buff.append(", messageCount=" + messageCount);
buff.append(", autoCreationEnabled=" + autoCreationEnabled);
buff.append("]");
return buff.toString();
}
@Override
public String getParentString() {
StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", autoCreationEnabled=" + autoCreateQueues);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreationEnabled());
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues());
}
@Override
@ -118,7 +119,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon
if (!(obj instanceof SessionQueueQueryResponseMessage_V2))
return false;
SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2) obj;
if (autoCreationEnabled != other.autoCreationEnabled)
if (autoCreateQueues != other.autoCreateQueues)
return false;
return true;
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.client.impl.QueueQueryImpl;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryResponseMessage_V2 {
protected boolean autoCreated;
protected boolean deleteOnNoConsumers;
protected RoutingType routingType;
protected int maxConsumers;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isDeleteOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers());
}
public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
final long messageCount,
final boolean exists,
final boolean autoCreateQueues,
final boolean autoCreated,
final boolean deleteOnNoConsumers,
final RoutingType routingType,
final int maxConsumers) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
this.messageCount = messageCount;
this.filterString = filterString;
this.address = address;
this.name = name;
this.exists = exists;
this.autoCreateQueues = autoCreateQueues;
this.autoCreated = autoCreated;
this.deleteOnNoConsumers = deleteOnNoConsumers;
this.routingType = routingType;
this.maxConsumers = maxConsumers;
}
public boolean isAutoCreated() {
return autoCreated;
}
public void setAutoCreated(boolean autoCreated) {
this.autoCreated = autoCreated;
}
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public RoutingType getRoutingType() {
return routingType;
}
public void setRoutingType(RoutingType routingType) {
this.routingType = routingType;
}
public int getMaxConsumers() {
return maxConsumers;
}
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(autoCreated);
buffer.writeBoolean(deleteOnNoConsumers);
buffer.writeByte(routingType.getType());
buffer.writeInt(maxConsumers);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
autoCreated = buffer.readBoolean();
deleteOnNoConsumers = buffer.readBoolean();
routingType = RoutingType.getType(buffer.readByte());
maxConsumers = buffer.readInt();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreated ? 1231 : 1237);
result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
result = prime * result + routingType.hashCode();
result = prime * result + maxConsumers;
return result;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append("]");
return buff.toString();
}
@Override
public String getParentString() {
StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", autoCreated=" + autoCreated);
buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
buff.append(", routingType=" + routingType);
buff.append(", maxConsumers=" + maxConsumers);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isDeleteOnNoConsumers(), getRoutingType());
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionQueueQueryResponseMessage_V3))
return false;
SessionQueueQueryResponseMessage_V3 other = (SessionQueueQueryResponseMessage_V3) obj;
if (autoCreated != other.autoCreated)
return false;
if (deleteOnNoConsumers != other.deleteOnNoConsumers)
return false;
if (routingType == null) {
if (other.routingType != null)
return false;
} else if (!routingType.equals(other.routingType))
return false;
if (maxConsumers != other.maxConsumers)
return false;
return true;
}
}

View File

@ -36,7 +36,15 @@ public class QueueQueryResult {
private boolean temporary;
private boolean autoCreateJmsQueues;
private boolean autoCreateQueues;
private boolean autoCreated;
private boolean deleteOnNoConsumers;
private RoutingType routingType;
private int maxConsumers;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
@ -45,19 +53,12 @@ public class QueueQueryResult {
final SimpleString filterString,
final int consumerCount,
final long messageCount,
final boolean autoCreateJmsQueues) {
this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true);
}
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
final long messageCount,
final boolean autoCreateJmsQueues,
final boolean exists) {
final boolean autoCreateQueues,
final boolean exists,
final boolean autoCreated,
final boolean deleteOnNoConsumers,
final RoutingType routingType,
final int maxConsumers) {
this.durable = durable;
this.temporary = temporary;
@ -72,9 +73,17 @@ public class QueueQueryResult {
this.name = name;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.autoCreateQueues = autoCreateQueues;
this.exists = exists;
this.autoCreated = autoCreated;
this.deleteOnNoConsumers = deleteOnNoConsumers;
this.routingType = routingType;
this.maxConsumers = maxConsumers;
}
public boolean isExists() {
@ -109,8 +118,23 @@ public class QueueQueryResult {
return temporary;
}
public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues;
public boolean isAutoCreateQueues() {
return autoCreateQueues;
}
public boolean isAutoCreated() {
return autoCreated;
}
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
public RoutingType getRoutingType() {
return routingType;
}
public int getMaxConsumers() {
return maxConsumers;
}
}

View File

@ -120,12 +120,19 @@ public class MessageUtil {
}
public static void clearProperties(Message message) {
/**
* JavaDoc for this method states:
* Clears a message's properties.
* The message's header fields and body are not cleared.
*
* Since the {@code Message.HDR_ROUTING_TYPE} is used for the JMSDestination header it isn't cleared
*/
List<SimpleString> toRemove = new ArrayList<>();
for (SimpleString propName : message.getPropertyNames()) {
if (!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
propName.startsWith(JMS_)) {
if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
propName.startsWith(JMS_)) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
toRemove.add(propName);
}
}
@ -140,7 +147,7 @@ public class MessageUtil {
for (SimpleString propName : message.getPropertyNames()) {
if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) {
propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
set.add(propName.toString());
}
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
@ -201,8 +202,6 @@ public class ActiveMQMessage implements javax.jms.Message {
private long jmsDeliveryTime;
private boolean fromQueue;
// Constructors --------------------------------------------------
/*
@ -399,8 +398,17 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddress();
String prefix = "";
if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
if (routingType.equals(RoutingType.ANYCAST)) {
prefix = QUEUE_QUALIFIED_PREFIX;
} else if (routingType.equals(RoutingType.MULTICAST)) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
}
dest = address == null ? null : ActiveMQDestination.fromPrefixedName((fromQueue ? QUEUE_QUALIFIED_PREFIX : TOPIC_QUALIFIED_PREFIX) + address.toString());
dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
}
return dest;
@ -779,10 +787,6 @@ public class ActiveMQMessage implements javax.jms.Message {
// Public --------------------------------------------------------
public void setFromQueue(boolean fromQueue) {
this.fromQueue = fromQueue;
}
public void setIndividualAcknowledge() {
this.individualAck = true;
}

View File

@ -240,8 +240,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
} else {
coreMessage.acknowledge();
}
jmsMsg.setFromQueue(destination instanceof ActiveMQQueue);
}
return jmsMsg;

View File

@ -41,7 +41,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -493,7 +492,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType);
coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType);
try {
/**

View File

@ -76,6 +76,26 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
return "ActiveMQQueue[" + name + "]";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ActiveMQQueue)) {
return false;
}
ActiveMQQueue that = (ActiveMQQueue) o;
return super.getAddress().equals(that.getAddress());
}
@Override
public int hashCode() {
return super.getAddress().hashCode();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -820,7 +820,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
SimpleString simpleAddress = queue.getSimpleAddress();
session.createTemporaryQueue(simpleAddress, simpleAddress);
session.createTemporaryQueue(simpleAddress, RoutingType.ANYCAST, simpleAddress);
connection.addTemporaryQueue(simpleAddress);
@ -1074,7 +1074,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
if (!response.isExists() && !response.isAutoCreateJmsQueues()) {
if (!response.isExists() && !response.isAutoCreateQueues()) {
return null;
} else {
return queue;

View File

@ -52,6 +52,26 @@ public class ActiveMQTemporaryQueue extends ActiveMQQueue implements TemporaryQu
return "ActiveMQTemporaryQueue[" + name + "]";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ActiveMQTemporaryQueue)) {
return false;
}
ActiveMQTemporaryQueue that = (ActiveMQTemporaryQueue) o;
return super.getAddress().equals(that.getAddress());
}
@Override
public int hashCode() {
return super.getAddress().hashCode();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -36,6 +36,26 @@ public class ActiveMQTemporaryTopic extends ActiveMQTopic implements TemporaryTo
// Public --------------------------------------------------------
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ActiveMQTemporaryTopic)) {
return false;
}
ActiveMQTemporaryTopic that = (ActiveMQTemporaryTopic) o;
return super.getAddress().equals(that.getAddress());
}
@Override
public int hashCode() {
return super.getAddress().hashCode();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -71,6 +71,26 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
return "ActiveMQTopic[" + name + "]";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ActiveMQTopic)) {
return false;
}
ActiveMQTopic that = (ActiveMQTopic) o;
return super.getAddress().equals(that.getAddress());
}
@Override
public int hashCode() {
return super.getAddress().hashCode();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -564,7 +564,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
@Parameter(name = "routingType", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
checkStarted();
clearIO();
@ -665,7 +665,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} finally {
blockOnIO();
}

View File

@ -53,4 +53,8 @@ public interface QueueBindingInfo {
boolean isDeleteOnNoConsumers();
void setDeleteOnNoConsumers(boolean deleteOnNoConsumers);
byte getRoutingType();
void setRoutingType(byte routingType);
}

View File

@ -1221,7 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers(), queue.getRoutingType().getType());
readLock();
try {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -45,6 +46,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean deleteOnNoConsumers;
public byte routingType;
public PersistentQueueBindingEncoding() {
}
@ -65,6 +68,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
maxConsumers +
", deleteOnNoConsumers=" +
deleteOnNoConsumers +
", routingType=" +
routingType +
"]";
}
@ -74,7 +79,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final SimpleString user,
final boolean autoCreated,
final int maxConsumers,
final boolean deleteOnNoConsumers) {
final boolean deleteOnNoConsumers,
final byte routingType) {
this.name = name;
this.address = address;
this.filterString = filterString;
@ -82,6 +88,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers;
this.routingType = routingType;
}
@Override
@ -156,6 +163,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
@Override
public byte getRoutingType() {
return routingType;
}
@Override
public void setRoutingType(byte routingType) {
this.routingType = routingType;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
@ -180,9 +197,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
if (buffer.readableBytes() > 0) {
maxConsumers = buffer.readInt();
deleteOnNoConsumers = buffer.readBoolean();
routingType = buffer.readByte();
} else {
maxConsumers = -1;
deleteOnNoConsumers = false;
maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType().getType();
}
}
@ -195,6 +214,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeBoolean(autoCreated);
buffer.writeInt(maxConsumers);
buffer.writeBoolean(deleteOnNoConsumers);
buffer.writeByte(routingType);
}
@Override
@ -203,7 +223,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
SimpleString.sizeofNullableString(createMetadata()) +
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN;
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BYTE;
}
private SimpleString createMetadata() {

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.Bindable;
@ -131,8 +131,8 @@ public class LocalQueueBinding implements QueueBinding {
}
private boolean isMatchRoutingType(ServerMessage message) {
if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) {
return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType();
if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
return message.getByteProperty(Message.HDR_ROUTING_TYPE).equals(queue.getRoutingType().getType());
}
return true;
}

View File

@ -664,10 +664,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
SimpleString address = message.getAddress();
if (address.toString().equals("testQueue")) {
System.out.println("f");
}
setPagingStore(message);
AtomicBoolean startedTX = new AtomicBoolean(false);

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@ -218,7 +219,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
} else {
response = new SessionQueueQueryResponseMessage(queueQueryResult);
@ -284,7 +287,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(result);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
response = new SessionQueueQueryResponseMessage_V2(result);
} else {
response = new SessionQueueQueryResponseMessage(result);

View File

@ -126,7 +126,7 @@ public final class QueueConfig {
return this;
}
public Builder deliveryMode(RoutingType routingType) {
public Builder routingType(RoutingType routingType) {
this.routingType = routingType;
return this;
}

View File

@ -719,7 +719,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
QueueQueryResult response;
@ -734,14 +734,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isDeleteOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers());
} else if (name.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
} else if (autoCreateJmsQueues) {
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1);
} else if (autoCreateQueues) {
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, false, RoutingType.MULTICAST, 0);
} else {
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0);
}
return response;
@ -1657,7 +1657,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
// TODO: fix logging here as this could be for a topic or queue
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
@ -2476,14 +2475,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (info == null) {
if (autoCreateAddress) {
postOffice.addAddressInfo(defaultAddressInfo);
postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true));
info = postOffice.getAddressInfo(addressName);
} else {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
}
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deliveryMode(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);

View File

@ -119,6 +119,11 @@ public class AddressInfo {
for (RoutingType routingType : routingTypes) {
buff.append(routingType.toString() + ",");
}
// delete hanging comma
if (buff.charAt(buff.length() - 1) == ',') {
buff.deleteCharAt(buff.length() - 1);
}
buff.append("}");
buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
@ -149,7 +150,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
.temporary(false)
.autoCreated(queueBindingInfo.isAutoCreated())
.deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
.maxConsumers(queueBindingInfo.getMaxConsumers());
.maxConsumers(queueBindingInfo.getMaxConsumers())
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));