ARTEMIS-780 Consolodate protocol packets and new Address/Queue commands

This commit is contained in:
jbertram 2016-11-23 13:34:00 -06:00 committed by Martyn Taylor
parent 7a51491c32
commit c480351c11
29 changed files with 251 additions and 243 deletions

View File

@ -799,14 +799,14 @@ public class Create extends InputAbstract {
printWriter.println();
for (String str : getQueueList()) {
printWriter.println(" <address name=\"" + str + "\" type=\"anycast\">");
printWriter.println(" <queues>");
printWriter.println(" <address name=\"" + str + "\">");
printWriter.println(" <anycast>");
printWriter.println(" <queue name=\"" + str + "\" />");
printWriter.println(" </queues>");
printWriter.println(" </anycast>");
printWriter.println(" </address>");
}
for (String str : getAddressList()) {
printWriter.println(" <address name=\"" + str + "\" type=\"multicast\"/>");
printWriter.println(" <address name=\"" + str + "\"/>");
}
filters.put("${address-queue.settings}", writer.toString());
}

View File

@ -98,15 +98,15 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
</address-settings>
<addresses>
<address name="DLQ" type="anycast">
<queues>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</queues>
</anycast>
</address>
<address name="ExpiryQueue" type="anycast">
<queues>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</queues>
</anycast>
</address>${address-queue.settings}
</addresses>

View File

@ -18,10 +18,8 @@ package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.core.server.RoutingType;
/**
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
@ -438,7 +436,7 @@ public interface ActiveMQServerControl {
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception;
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception;
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
@ -457,14 +455,21 @@ public interface ActiveMQServerControl {
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param name name of the queue
* @param durable whether the queue is durable
*/
@Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
/**
* Create a queue.
@ -492,13 +497,24 @@ public interface ActiveMQServerControl {
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
* @param name name of the queue
* @param durable whether the queue is durable
* @param filterStr filter of the queue
* @param durable is the queue durable?
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param deleteOnNoConsumers delete this queue when the last consumer disconnects
* @param autoCreateAddress create an address with default values should a matching address not be found
* @throws Exception
*/
@Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
/**
* Deploy a durable queue.
@ -552,7 +568,8 @@ public interface ActiveMQServerControl {
*/
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers, boolean autoDeleteAddress) throws Exception;
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
@Parameter(name = "autoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception;
/**

View File

@ -140,6 +140,18 @@ public interface QueueControl {
@Attribute(desc = "dead-letter address associated with this queue")
String getDeadLetterAddress();
/**
*
*/
@Attribute(desc = "maximum number of consumers allowed on this queue at any one time")
int getMaxConsumers();
/**
*
*/
@Attribute(desc = "delete this queue when the last consumer disconnects")
boolean isDeleteOnNoConsumers();
// Operations ----------------------------------------------------
/**

View File

@ -54,7 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
@ -625,7 +625,7 @@ public class ActiveMQSessionContext extends SessionContext {
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreated) throws ActiveMQException {
CreateQueueMessage request = new CreateQueueMessage_V3(address, queueName, routingType, filterString, durable, temp, maxConsumers, deleteOnNoConsumers, autoCreated, true);
CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, routingType, filterString, durable, temp, maxConsumers, deleteOnNoConsumers, autoCreated, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}

View File

@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTop
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
@ -95,7 +94,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CRE
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@ -255,10 +253,6 @@ public abstract class PacketDecoder implements Serializable {
packet = new CreateQueueMessage_V2();
break;
}
case CREATE_QUEUE_V3: {
packet = new CreateQueueMessage_V3();
break;
}
case CREATE_SHARED_QUEUE: {
packet = new CreateSharedQueueMessage();
break;

View File

@ -253,9 +253,7 @@ public class PacketImpl implements Packet {
public static final byte CREATE_QUEUE_V2 = -12;
public static final byte CREATE_QUEUE_V3 = -13;
public static final byte CREATE_SHARED_QUEUE_V2 = -14;
public static final byte CREATE_SHARED_QUEUE_V2 = -13;
// Static --------------------------------------------------------

View File

@ -134,7 +134,10 @@ public class CreateAddressMessage extends PacketImpl {
return false;
} else if (!address.equals(other.address))
return false;
if (routingTypes.equals(other.routingTypes))
if (routingTypes == null) {
if (other.routingTypes != null)
return false;
} else if (!routingTypes.equals(other.routingTypes))
return false;
if (autoCreated != other.autoCreated)
return false;

View File

@ -18,16 +18,26 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
public class CreateQueueMessage_V2 extends CreateQueueMessage {
protected boolean autoCreated;
private RoutingType routingType;
private int maxConsumers;
private boolean deleteOnNoConsumers;
public CreateQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
final RoutingType routingType,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreated,
final boolean requiresResponse) {
this();
@ -39,26 +49,52 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.temporary = temporary;
this.autoCreated = autoCreated;
this.requiresResponse = requiresResponse;
this.routingType = routingType;
this.maxConsumers = maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public CreateQueueMessage_V2() {
super(CREATE_QUEUE_V2);
}
public CreateQueueMessage_V2(byte packet) {
super(packet);
}
// Public --------------------------------------------------------
@Override
public String toString() {
StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", autoCreated=" + autoCreated);
buff.append(", routingType=" + routingType);
buff.append(", maxConsumers=" + maxConsumers);
buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
buff.append("]");
return buff.toString();
}
public RoutingType getRoutingType() {
return routingType;
}
public void setRoutingType(RoutingType routingType) {
this.routingType = routingType;
}
public int getMaxConsumers() {
return maxConsumers;
}
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public boolean isAutoCreated() {
return autoCreated;
}
@ -71,12 +107,18 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(autoCreated);
buffer.writeByte(routingType.getType());
buffer.writeInt(maxConsumers);
buffer.writeBoolean(deleteOnNoConsumers);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
autoCreated = buffer.readBoolean();
routingType = RoutingType.getType(buffer.readByte());
maxConsumers = buffer.readInt();
deleteOnNoConsumers = buffer.readBoolean();
}
@Override
@ -84,6 +126,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreated ? 1231 : 1237);
result = prime * result + (routingType.getType());
result = prime * result + (maxConsumers);
result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
return result;
}
@ -98,6 +143,17 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
CreateQueueMessage_V2 other = (CreateQueueMessage_V2) obj;
if (autoCreated != other.autoCreated)
return false;
if (maxConsumers != other.maxConsumers)
return false;
if (deleteOnNoConsumers != other.deleteOnNoConsumers)
return false;
if (deleteOnNoConsumers != other.deleteOnNoConsumers)
return false;
if (routingType == null) {
if (other.routingType != null)
return false;
} else if (!routingType.equals(other.routingType))
return false;
return true;
}
}

View File

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
public class CreateQueueMessage_V3 extends CreateQueueMessage_V2 {
private RoutingType routingType;
private int maxConsumers;
private boolean deleteOnNoConsumers;
public CreateQueueMessage_V3(final SimpleString address,
final SimpleString queueName,
final RoutingType routingType,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreated,
final boolean requiresResponse) {
this();
this.address = address;
this.queueName = queueName;
this.filterString = filterString;
this.durable = durable;
this.temporary = temporary;
this.autoCreated = autoCreated;
this.requiresResponse = requiresResponse;
this.routingType = routingType;
this.maxConsumers = maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public CreateQueueMessage_V3() {
super(CREATE_QUEUE_V3);
}
// Public --------------------------------------------------------
@Override
public String toString() {
StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", routingType=" + routingType);
buff.append(", maxConsumers=" + maxConsumers);
buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
buff.append("]");
return buff.toString();
}
public RoutingType getRoutingType() {
return routingType;
}
public void setRoutingType(RoutingType routingType) {
this.routingType = routingType;
}
public int getMaxConsumers() {
return maxConsumers;
}
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeByte(routingType.getType());
buffer.writeInt(maxConsumers);
buffer.writeBoolean(deleteOnNoConsumers);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
routingType = RoutingType.getType(buffer.readByte());
maxConsumers = buffer.readInt();
deleteOnNoConsumers = buffer.readBoolean();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (routingType.getType());
result = prime * result + (maxConsumers);
result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof CreateQueueMessage_V3))
return false;
CreateQueueMessage_V3 other = (CreateQueueMessage_V3) obj;
if (autoCreated != other.autoCreated)
return false;
return true;
}
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {

View File

@ -16,16 +16,12 @@
*/
package org.apache.activemq.artemis.junit;
import java.util.Collections;
import java.util.HashSet;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
/**
* A JUnit Rule that embeds an ActiveMQ Artemis ClientConsumer into a test.

View File

@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -77,9 +78,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -563,12 +564,16 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception {
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
checkStarted();
clearIO();
try {
server.createAddressInfo(new AddressInfo(new SimpleString(name), routingTypes));
Set<RoutingType> set = new HashSet<>();
for (Object routingType : routingTypes) {
set.add(RoutingType.valueOf(routingType.toString()));
}
server.createAddressInfo(new AddressInfo(new SimpleString(name), set));
} finally {
blockOnIO();
}
@ -580,7 +585,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
server.removeAddressInfo(new SimpleString(name));
server.removeAddressInfo(new SimpleString(name), null);
} finally {
blockOnIO();
}
@ -642,14 +647,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception {
public void createQueue(String address,
String routingType,
String name,
String filterStr,
boolean durable,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
checkStarted();
clearIO();
@ -660,7 +665,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), routingType, new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} finally {
blockOnIO();
}

View File

@ -331,6 +331,30 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public int getMaxConsumers() {
checkStarted();
clearIO();
try {
return queue.getMaxConsumers();
} finally {
blockOnIO();
}
}
@Override
public boolean isDeleteOnNoConsumers() {
checkStarted();
clearIO();
try {
return queue.isDeleteOnNoConsumers();
} finally {
blockOnIO();
}
}
@Override
public Map<String, Object>[] listScheduledMessages() throws Exception {
checkStarted();

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQEx
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
@ -80,7 +79,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
@ -91,7 +89,6 @@ import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@ -251,23 +248,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE_V2: {
CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(),
request.getQueueName(),
RoutingType.MULTICAST,
request.getFilterString(),
request.isTemporary(),
request.isDurable(),
Queue.MAX_CONSUMERS_UNLIMITED,
false,
request.isAutoCreated());
if (requiresResponse) {
response = new NullResponseMessage();
}
break;
}
case CREATE_QUEUE_V3: {
CreateQueueMessage_V3 request = (CreateQueueMessage_V3) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isDeleteOnNoConsumers(),
request.isAutoCreated());
if (requiresResponse) {

View File

@ -35,6 +35,12 @@ public enum CheckType {
return role.isCreateAddress();
}
},
DELETE_ADDRESS {
@Override
public boolean hasRole(final Role role) {
return role.isDeleteAddress();
}
},
CREATE_DURABLE_QUEUE {
@Override
public boolean hasRole(final Role role) {

View File

@ -456,7 +456,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
void removeAddressInfo(SimpleString address) throws Exception;
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;
String getInternalNamingPrefix();
}

View File

@ -110,7 +110,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -123,6 +122,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
@ -1717,7 +1717,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (autoDeleteAddress && postOffice != null) {
try {
removeAddressInfo(address);
removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
}
@ -2392,7 +2392,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public void removeAddressInfo(SimpleString address) throws Exception {
public void removeAddressInfo(final SimpleString address,
final SecurityAuth session) throws Exception {
if (session != null) {
securityStore.check(address, CheckType.DELETE_ADDRESS, session);
}
AddressInfo addressInfo = getAddressInfo(address);
if (postOffice.removeAddressInfo(address) == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
@ -2451,7 +2456,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
AddressInfo defaultAddressInfo = new AddressInfo(addressName);
defaultAddressInfo.addRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType());
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
AddressInfo info = postOffice.getAddressInfo(addressName);
if (info == null) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

View File

@ -719,7 +719,8 @@ public class ManagementServiceImpl implements ManagementService {
paramTypes[i] == Long.TYPE && params[i].getClass() == Long.class ||
paramTypes[i] == Double.TYPE && params[i].getClass() == Double.class ||
paramTypes[i] == Integer.TYPE && params[i].getClass() == Integer.class ||
paramTypes[i] == Boolean.TYPE && params[i].getClass() == Boolean.class) {
paramTypes[i] == Boolean.TYPE && params[i].getClass() == Boolean.class ||
paramTypes[i] == Object[].class && params[i].getClass() == Object[].class) {
// parameter match
} else {
match = false;

View File

@ -82,7 +82,7 @@ public class AutoDeleteJmsDestinationTest extends JMSTestBase {
@Test
public void testAutoDeleteNegative() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteJmsQueues(false));
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteQueues(false));
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -230,11 +230,11 @@ public class SessionTest extends ActiveMQTestBase {
@Test
public void testQueueQueryNoQ() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false));
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false));
cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
Assert.assertFalse(resp.isExists());
Assert.assertFalse(resp.isAutoCreateJmsQueues());
Assert.assertEquals(null, resp.getAddress());
clientSession.close();
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.AddressSettingsInfo;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
@ -52,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
@ -251,6 +253,41 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
}
@Test
public void testCreateAndDestroyQueue_4() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
boolean durable = RandomUtil.randomBoolean();
boolean deleteOnNoConsumers = RandomUtil.randomBoolean();
boolean autoCreateAddress = true;
int maxConsumers = RandomUtil.randomInt();
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
Assert.assertEquals(durable, queueControl.isDurable());
Assert.assertEquals(deleteOnNoConsumers, queueControl.isDeleteOnNoConsumers());
Assert.assertEquals(maxConsumers, queueControl.getMaxConsumers());
Assert.assertEquals(false, queueControl.isTemporary());
checkResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mbeanServer);
Assert.assertEquals(address.toString(), addressControl.getAddress());
serverControl.destroyQueue(name.toString(), true, true);
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
}
@Test
public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -17,13 +17,11 @@
package org.apache.activemq.artemis.tests.integration.management;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest {
@ -103,15 +101,15 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception {
public void createQueue(String address,
String routingType,
String name,
String filterStr,
boolean durable,
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@ -158,9 +156,10 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
public void destroyQueue(String name,
boolean removeConsumers,
boolean autoDeleteAddress) throws Exception {
proxy.invokeOperation("destroyQueue", name, removeConsumers, autoDeleteAddress);
}
@Override
@ -567,7 +566,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception {
@Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
}

View File

@ -81,6 +81,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (String) proxy.retrieveAttributeValue("deadLetterAddress");
}
@Override
public int getMaxConsumers() {
return (Integer) proxy.retrieveAttributeValue("maxConsumers");
}
@Override
public boolean isDeleteOnNoConsumers() {
return (Boolean) proxy.retrieveAttributeValue("deleteOnNoConsumers");
}
@Override
public int getDeliveringCount() {
return (Integer) proxy.retrieveAttributeValue("deliveringCount", Integer.class);

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;

View File

@ -83,7 +83,11 @@ public class StompTest extends StompTestBase {
boolean connected = conn != null && conn.isConnected();
log.debug("Connection 1.0 connected: " + connected);
if (connected) {
try {
conn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();

View File

@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class AutoAckMesageListenerTest extends JMSTestCase {
public class AutoAckMessageListenerTest extends JMSTestCase {
// Constants -----------------------------------------------------

View File

@ -34,7 +34,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.objectweb.jtests.jms.admin.Admin;
@ -148,9 +147,8 @@ public class AbstractAdmin implements Admin {
@Override
public void createTopic(final String name) {
Boolean result;
try {
invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, (int) RoutingType.MULTICAST.getType(), false, -1);
invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, new Object[]{"MULTICAST"});
} catch (Exception e) {
throw new IllegalStateException(e);
}