ARTEMIS-876 Remove all reliances on JMS prefixing

This commit is contained in:
jbertram 2016-10-21 19:58:01 -05:00 committed by Martyn Taylor
parent 0bcde8140f
commit 84e8a87325
216 changed files with 1258 additions and 850 deletions

View File

@ -82,7 +82,7 @@ public abstract class DestinationAction extends ConnectionAbstract {
ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
session.start();
ClientRequestor requestor = new ClientRequestor(session, "jms.queue.activemq.management");
ClientRequestor requestor = new ClientRequestor(session, "activemq.management");
ClientMessage message = session.createMessage(false);
cb.setUpInvocation(message);

View File

@ -326,7 +326,7 @@ public final class XmlDataImporter extends ActionAbstract {
// Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
// send directly to a queue, we have to send to an address instead but not all the queues related to the
// address may need the message
try (ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management")) {
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
managementSession.start();
@ -825,7 +825,7 @@ public final class XmlDataImporter extends ActionAbstract {
reader.next();
}
try (ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management")) {
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createConnectionFactory", name, Boolean.parseBoolean(ha), discoveryGroupName.length() > 0, Integer.parseInt(type), connectors, entries, clientId, Long.parseLong(clientFailureCheckPeriod), Long.parseLong(connectionTtl), Long.parseLong(callTimeout), Long.parseLong(callFailoverTimeout), Integer.parseInt(minLargeMessageSize), Boolean.parseBoolean(compressLargeMessages), Integer.parseInt(consumerWindowSize), Integer.parseInt(consumerMaxRate), Integer.parseInt(confirmationWindowSize), Integer.parseInt(producerWindowSize), Integer.parseInt(producerMaxRate), Boolean.parseBoolean(blockOnAcknowledge), Boolean.parseBoolean(blockOnDurableSend), Boolean.parseBoolean(blockOnNonDurableSend), Boolean.parseBoolean(autoGroup), Boolean.parseBoolean(preacknowledge), loadBalancingPolicyClassName, Integer.parseInt(transactionBatchSize), Integer.parseInt(dupsOkBatchSize), Boolean.parseBoolean(useGlobalPools), Integer.parseInt(scheduledThreadMaxPoolSize), Integer.parseInt(threadMaxPoolSize), Long.parseLong(retryInterval), Double.parseDouble(retryIntervalMultiplier), Long.parseLong(maxRetryInterval), Integer.parseInt(reconnectAttempts), Boolean.parseBoolean(failoverOnInitialConnection), groupId);
//Boolean.parseBoolean(cacheLargeMessagesClient));
@ -883,7 +883,7 @@ public final class XmlDataImporter extends ActionAbstract {
reader.next();
}
try (ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management")) {
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
if ("Queue".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createQueue", name, entries, selector);

View File

@ -114,7 +114,7 @@ public class FileBrokerTest {
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession("myUser", "myPass", false, true, false, false, 0);
ClientProducer producer = session.createProducer("jms.queue.DLQ");
ClientProducer producer = session.createProducer("DLQ");
producer.send(session.createMessage(true));
replacePatternInFile(path, "guest", "X");

View File

@ -61,8 +61,8 @@ under the License.
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>

View File

@ -64,8 +64,8 @@ under the License.
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>

View File

@ -65,8 +65,8 @@ under the License.
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>

View File

@ -157,8 +157,7 @@ public final class ActiveMQDefaultConfiguration {
// true means that the server supports wild card routing
private static boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
// the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it.
private static SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("jms.queue.activemq.management");
private static SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("activemq.management");
// the name of the address that consumers bind to receive management notifications
private static SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("activemq.notifications");

View File

@ -198,6 +198,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
*/
int getVersion();
void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException;
// Queue Operations ----------------------------------------------
/**

View File

@ -747,6 +747,18 @@ public interface ActiveMQServerControl {
@Parameter(desc = "a comma-separated list of roles allowed to send management messages messages", name = "manage") String manageRoles,
@Parameter(desc = "a comma-separated list of roles allowed to browse queues", name = "browse") String browseRoles) throws Exception;
@Operation(desc = "Add security settings for addresses matching the addressMatch", impact = MBeanOperationInfo.ACTION)
void addSecuritySettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
@Parameter(desc = "a comma-separated list of roles allowed to send messages", name = "send") String sendRoles,
@Parameter(desc = "a comma-separated list of roles allowed to consume messages", name = "consume") String consumeRoles,
@Parameter(desc = "a comma-separated list of roles allowed to create durable queues", name = "createDurableQueueRoles") String createDurableQueueRoles,
@Parameter(desc = "a comma-separated list of roles allowed to delete durable queues", name = "deleteDurableQueueRoles") String deleteDurableQueueRoles,
@Parameter(desc = "a comma-separated list of roles allowed to create non durable queues", name = "createNonDurableQueueRoles") String createNonDurableQueueRoles,
@Parameter(desc = "a comma-separated list of roles allowed to delete non durable queues", name = "deleteNonDurableQueueRoles") String deleteNonDurableQueueRoles,
@Parameter(desc = "a comma-separated list of roles allowed to send management messages messages", name = "manage") String manageRoles,
@Parameter(desc = "a comma-separated list of roles allowed to browse queues", name = "browse") String browseRoles,
@Parameter(desc = "a comma-separated list of roles allowed to create addresses", name = "createAddressRoles") String createAddressRoles) throws Exception;
@Operation(desc = "Remove security settings for an address", impact = MBeanOperationInfo.ACTION)
void removeSecuritySettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch) throws Exception;

View File

@ -67,7 +67,7 @@ public final class AddressSettingsInfo {
public static AddressSettingsInfo from(final String jsonString) {
JsonObject object = JsonUtil.readJsonObject(jsonString);
return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getJsonNumber("maxSizeBytes").longValue(), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getJsonNumber("redeliveryDelay").longValue(), object.getJsonNumber("redeliveryMultiplier").doubleValue(), object.getJsonNumber("maxRedeliveryDelay").longValue(), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getJsonNumber("redistributionDelay").longValue(), object.getBoolean("sendToDLAOnNoRoute"), object.getJsonNumber("slowConsumerThreshold").longValue(), object.getJsonNumber("slowConsumerCheckPeriod").longValue(), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues"), object.getBoolean("autoCreateJmsTopics"), object.getBoolean("autoDeleteJmsTopics"));
return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getJsonNumber("maxSizeBytes").longValue(), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getJsonNumber("redeliveryDelay").longValue(), object.getJsonNumber("redeliveryMultiplier").doubleValue(), object.getJsonNumber("maxRedeliveryDelay").longValue(), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getJsonNumber("redistributionDelay").longValue(), object.getBoolean("sendToDLAOnNoRoute"), object.getJsonNumber("slowConsumerThreshold").longValue(), object.getJsonNumber("slowConsumerCheckPeriod").longValue(), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoCreateJmsTopics"), object.getBoolean("autoDeleteJmsQueues"), object.getBoolean("autoDeleteJmsTopics"));
}
// Constructors --------------------------------------------------
@ -89,8 +89,8 @@ public final class AddressSettingsInfo {
long slowConsumerCheckPeriod,
String slowConsumerPolicy,
boolean autoCreateJmsQueues,
boolean autoDeleteJmsQueues,
boolean autoCreateJmsTopics,
boolean autoDeleteJmsQueues,
boolean autoDeleteJmsTopics) {
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;

View File

@ -44,9 +44,9 @@ public final class ResourceNames {
public static final String JMS_SERVER = "jms.server";
public static final String JMS_QUEUE = "jms.queue.";
// public static final String JMS_QUEUE = "jms.queue.";
public static final String JMS_TOPIC = "jms.topic.";
// public static final String JMS_TOPIC = "jms.topic.";
public static final String JMS_CONNECTION_FACTORY = "jms.connectionfactory.";

View File

@ -45,6 +45,8 @@ public final class RoleInfo {
private final boolean browse;
private final boolean createAddress;
/**
* Returns an array of RoleInfo corresponding to the JSON serialization returned
* by {@link AddressControl#getRolesAsJSON()}.
@ -54,7 +56,7 @@ public final class RoleInfo {
RoleInfo[] roles = new RoleInfo[array.size()];
for (int i = 0; i < array.size(); i++) {
JsonObject r = array.getJsonObject(i);
RoleInfo role = new RoleInfo(r.getString("name"), r.getBoolean("send"), r.getBoolean("consume"), r.getBoolean("createDurableQueue"), r.getBoolean("deleteDurableQueue"), r.getBoolean("createNonDurableQueue"), r.getBoolean("deleteNonDurableQueue"), r.getBoolean("manage"), r.getBoolean("browse"));
RoleInfo role = new RoleInfo(r.getString("name"), r.getBoolean("send"), r.getBoolean("consume"), r.getBoolean("createDurableQueue"), r.getBoolean("deleteDurableQueue"), r.getBoolean("createNonDurableQueue"), r.getBoolean("deleteNonDurableQueue"), r.getBoolean("manage"), r.getBoolean("browse"), r.getBoolean("createAddress"));
roles[i] = role;
}
return roles;
@ -68,7 +70,8 @@ public final class RoleInfo {
final boolean createNonDurableQueue,
final boolean deleteNonDurableQueue,
final boolean manage,
final boolean browse) {
final boolean browse,
final boolean createAddress) {
this.name = name;
this.send = send;
this.consume = consume;
@ -78,6 +81,7 @@ public final class RoleInfo {
this.deleteNonDurableQueue = deleteNonDurableQueue;
this.manage = manage;
this.browse = browse;
this.createAddress = createAddress;
}
/**
@ -142,4 +146,11 @@ public final class RoleInfo {
public boolean isBrowse() {
return browse;
}
/**
* Returns whether this role can create addresses.
*/
public boolean isCreateAddress() {
return createAddress;
}
}

View File

@ -278,6 +278,18 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
@Override
public void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException {
checkClosed();
startCall();
try {
sessionContext.createAddress(address, multicast);
} finally {
endCall();
}
}
@Override
public void createQueue(final SimpleString address,
final SimpleString queueName,

View File

@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
@ -582,6 +583,12 @@ public class ActiveMQSessionContext extends SessionContext {
return response.getTimeoutSeconds();
}
@Override
public void createAddress(SimpleString address, final boolean multicast) throws ActiveMQException {
CreateAddressMessage request = new CreateAddressMessage(address, multicast, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
@Override
public void createQueue(SimpleString address,
SimpleString queueName,

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailo
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
@ -88,6 +89,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLU
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@ -235,6 +237,10 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionQueueQueryResponseMessage_V2();
break;
}
case CREATE_ADDRESS: {
packet = new CreateAddressMessage();
break;
}
case CREATE_QUEUE: {
packet = new CreateQueueMessage();
break;

View File

@ -249,6 +249,8 @@ public class PacketImpl implements Packet {
public static final byte SESS_BINDINGQUERY_RESP_V3 = -10;
public static final byte CREATE_ADDRESS = -11;
// Static --------------------------------------------------------
public PacketImpl(final byte type) {

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class CreateAddressMessage extends PacketImpl {
private SimpleString address;
private boolean multicast;
private boolean requiresResponse;
public CreateAddressMessage(final SimpleString address,
final boolean multicast,
final boolean requiresResponse) {
this();
this.address = address;
this.multicast = multicast;
this.requiresResponse = requiresResponse;
}
public CreateAddressMessage() {
super(CREATE_ADDRESS);
}
// Public --------------------------------------------------------
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
buff.append(", multicast=" + multicast);
buff.append("]");
return buff.toString();
}
public SimpleString getAddress() {
return address;
}
public boolean isMulticast() {
return multicast;
}
public boolean isRequiresResponse() {
return requiresResponse;
}
public void setAddress(SimpleString address) {
this.address = address;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);
buffer.writeBoolean(multicast);
buffer.writeBoolean(requiresResponse);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
address = buffer.readSimpleString();
multicast = buffer.readBoolean();
requiresResponse = buffer.readBoolean();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((address == null) ? 0 : address.hashCode());
result = prime * result + (multicast ? 1231 : 1237);
result = prime * result + (requiresResponse ? 1231 : 1237);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof CreateAddressMessage))
return false;
CreateAddressMessage other = (CreateAddressMessage) obj;
if (address == null) {
if (other.address != null)
return false;
} else if (!address.equals(other.address))
return false;
if (multicast != other.multicast)
return false;
if (requiresResponse != other.requiresResponse)
return false;
return true;
}
}

View File

@ -34,6 +34,8 @@ public class Role implements Serializable {
private final boolean consume;
private final boolean createAddress;
private final boolean createDurableQueue;
private final boolean deleteDurableQueue;
@ -47,7 +49,7 @@ public class Role implements Serializable {
private final boolean browse;
public JsonObject toJson() {
return JsonLoader.createObjectBuilder().add("name", name).add("send", send).add("consume", consume).add("createDurableQueue", createDurableQueue).add("deleteDurableQueue", deleteDurableQueue).add("createNonDurableQueue", createNonDurableQueue).add("deleteNonDurableQueue", deleteNonDurableQueue).add("manage", manage).add("browse", browse).build();
return JsonLoader.createObjectBuilder().add("name", name).add("send", send).add("consume", consume).add("createDurableQueue", createDurableQueue).add("deleteDurableQueue", deleteDurableQueue).add("createNonDurableQueue", createNonDurableQueue).add("deleteNonDurableQueue", deleteNonDurableQueue).add("manage", manage).add("browse", browse).add("createAddress", createAddress).build();
}
/**
@ -84,12 +86,28 @@ public class Role implements Serializable {
final boolean deleteNonDurableQueue,
final boolean manage,
final boolean browse) {
// This constructor exists for version compatibility on the API. If either createDurableQueue or createNonDurableQueue
// is true then createAddress will be true.
this(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue);
}
public Role(final String name,
final boolean send,
final boolean consume,
final boolean createDurableQueue,
final boolean deleteDurableQueue,
final boolean createNonDurableQueue,
final boolean deleteNonDurableQueue,
final boolean manage,
final boolean browse,
final boolean createAddress) {
if (name == null) {
throw new NullPointerException("name is null");
}
this.name = name;
this.send = send;
this.consume = consume;
this.createAddress = createAddress;
this.createDurableQueue = createDurableQueue;
this.deleteDurableQueue = deleteDurableQueue;
this.createNonDurableQueue = createNonDurableQueue;
@ -110,6 +128,10 @@ public class Role implements Serializable {
return consume;
}
public boolean isCreateAddress() {
return createAddress;
}
public boolean isCreateDurableQueue() {
return createDurableQueue;
}
@ -136,6 +158,9 @@ public class Role implements Serializable {
if (consume) {
stringReturn.append(" consume ");
}
if (createAddress) {
stringReturn.append(" createAddress ");
}
if (createDurableQueue) {
stringReturn.append(" createDurableQueue ");
}
@ -174,6 +199,9 @@ public class Role implements Serializable {
if (consume != role.consume) {
return false;
}
if (createAddress != role.createAddress) {
return false;
}
if (createDurableQueue != role.createDurableQueue) {
return false;
}
@ -208,6 +236,7 @@ public class Role implements Serializable {
result = name.hashCode();
result = 31 * result + (send ? 1 : 0);
result = 31 * result + (consume ? 1 : 0);
result = 31 * result + (createAddress ? 1 : 0);
result = 31 * result + (createDurableQueue ? 1 : 0);
result = 31 * result + (deleteDurableQueue ? 1 : 0);
result = 31 * result + (createNonDurableQueue ? 1 : 0);

View File

@ -166,6 +166,8 @@ public abstract class SessionContext {
public abstract void deleteQueue(SimpleString queueName) throws ActiveMQException;
public abstract void createAddress(SimpleString address, boolean multicast) throws ActiveMQException;
public abstract void createQueue(SimpleString address,
SimpleString queueName,
SimpleString filterString,

View File

@ -32,7 +32,8 @@ public class SecurityFormatter {
String createNonDurableQueueRoles,
String deleteNonDurableQueueRoles,
String manageRoles,
String browseRoles) {
String browseRoles,
String createAddressRoles) {
List<String> createDurableQueue = toList(createDurableQueueRoles);
List<String> deleteDurableQueue = toList(deleteDurableQueueRoles);
List<String> createNonDurableQueue = toList(createNonDurableQueueRoles);
@ -41,6 +42,7 @@ public class SecurityFormatter {
List<String> consume = toList(consumeRoles);
List<String> manage = toList(manageRoles);
List<String> browse = toList(browseRoles);
List<String> createAddress = toList(createAddressRoles);
Set<String> allRoles = new HashSet<>();
allRoles.addAll(createDurableQueue);
@ -51,10 +53,11 @@ public class SecurityFormatter {
allRoles.addAll(consume);
allRoles.addAll(manage);
allRoles.addAll(browse);
allRoles.addAll(createAddress);
Set<Role> roles = new HashSet<>(allRoles.size());
for (String role : allRoles) {
roles.add(new Role(role, send.contains(role), consume.contains(role), createDurableQueue.contains(role), deleteDurableQueue.contains(role), createNonDurableQueue.contains(role), deleteNonDurableQueue.contains(role), manageRoles.contains(role), browse.contains(role)));
roles.add(new Role(role, send.contains(role), consume.contains(role), createDurableQueue.contains(role), deleteDurableQueue.contains(role), createNonDurableQueue.contains(role), deleteNonDurableQueue.contains(role), manageRoles.contains(role), browse.contains(role), createAddressRoles.contains(role)));
}
return roles;
}

View File

@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129

View File

@ -43,13 +43,13 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
*/
private static final long serialVersionUID = 5027962425462382883L;
public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue.";
// public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue.";
public static final String JMS_TEMP_QUEUE_ADDRESS_PREFIX = "jms.tempqueue.";
// public static final String JMS_TEMP_QUEUE_ADDRESS_PREFIX = "jms.tempqueue.";
public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic.";
// public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic.";
public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic.";
// public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic.";
public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
@ -98,23 +98,23 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
}
public static Destination fromAddress(final String address) {
if (address.startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX)) {
String name = address.substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length());
public static Destination fromPrefixedName(final String address) {
if (address.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
String name = address.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
return createQueue(name);
} else if (address.startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
String name = address.substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length());
} else if (address.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
String name = address.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
return createTopic(name);
} else if (address.startsWith(ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX)) {
String name = address.substring(ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
} else if (address.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
String name = address.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
return new ActiveMQTemporaryQueue(address, name, null);
} else if (address.startsWith(ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX)) {
String name = address.substring(ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
return new ActiveMQTemporaryQueue(name, name, null);
} else if (address.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
String name = address.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
return new ActiveMQTemporaryTopic(address, name, null);
return new ActiveMQTemporaryTopic(name, name, null);
} else {
throw new JMSRuntimeException("Invalid address " + address);
}
@ -202,11 +202,11 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public static SimpleString createQueueAddressFromName(final String name) {
return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + name);
return new SimpleString(QUEUE_QUALIFIED_PREFIX + name);
}
public static SimpleString createTopicAddressFromName(final String name) {
return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + name);
return new SimpleString(TOPIC_QUALIFIED_PREFIX + name);
}
public static ActiveMQQueue createQueue(final String name) {
@ -218,11 +218,11 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final String name, final ActiveMQSession session) {
return new ActiveMQTemporaryQueue(JMS_TEMP_QUEUE_ADDRESS_PREFIX.concat(name), name, session);
return new ActiveMQTemporaryQueue(name, name, session);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final String name) {
return createTemporaryQueue(name, null);
return createTemporaryQueue(/*TEMP_QUEUE_QUALIFED_PREFIX + */name, null);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final ActiveMQSession session) {
@ -238,7 +238,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public static ActiveMQTemporaryTopic createTemporaryTopic(String name, final ActiveMQSession session) {
return new ActiveMQTemporaryTopic(JMS_TEMP_TOPIC_ADDRESS_PREFIX.concat(name), name, session);
return new ActiveMQTemporaryTopic(/*TEMP_TOPIC_QUALIFED_PREFIX + */name, name, session);
}
public static ActiveMQTemporaryTopic createTemporaryTopic(String name) {

View File

@ -47,6 +47,11 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.TOPIC_QUALIFIED_PREFIX;
/**
* ActiveMQ Artemis implementation of a JMS Message.
* <br>
@ -196,6 +201,8 @@ public class ActiveMQMessage implements javax.jms.Message {
private long jmsDeliveryTime;
private boolean fromQueue;
// Constructors --------------------------------------------------
/*
@ -353,7 +360,7 @@ public class ActiveMQMessage implements javax.jms.Message {
SimpleString repl = MessageUtil.getJMSReplyTo(message);
if (repl != null) {
replyTo = ActiveMQDestination.fromAddress(repl.toString());
replyTo = ActiveMQDestination.fromPrefixedName(repl.toString());
}
}
return replyTo;
@ -370,9 +377,19 @@ public class ActiveMQMessage implements javax.jms.Message {
throw new InvalidDestinationException("Foreign destination " + dest);
}
String prefix = "";
if (dest instanceof ActiveMQTemporaryQueue) {
prefix = TEMP_QUEUE_QUALIFED_PREFIX;
} else if (dest instanceof ActiveMQQueue) {
prefix = QUEUE_QUALIFIED_PREFIX;
} else if (dest instanceof ActiveMQTemporaryTopic) {
prefix = TEMP_TOPIC_QUALIFED_PREFIX;
} else if (dest instanceof ActiveMQTopic) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
ActiveMQDestination jbd = (ActiveMQDestination) dest;
MessageUtil.setJMSReplyTo(message, jbd.getSimpleAddress());
MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress()));
replyTo = jbd;
}
@ -381,9 +398,9 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString sdest = message.getAddress();
SimpleString address = message.getAddress();
dest = sdest == null ? null : ActiveMQDestination.fromAddress(sdest.toString());
dest = address == null ? null : ActiveMQDestination.fromPrefixedName((fromQueue ? QUEUE_QUALIFIED_PREFIX : TOPIC_QUALIFIED_PREFIX) + address.toString());
}
return dest;
@ -762,6 +779,10 @@ public class ActiveMQMessage implements javax.jms.Message {
// Public --------------------------------------------------------
public void setFromQueue(boolean fromQueue) {
this.fromQueue = fromQueue;
}
public void setIndividualAcknowledge() {
this.individualAck = true;
}

View File

@ -240,6 +240,8 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
} else {
coreMessage.acknowledge();
}
jmsMsg.setFromQueue(destination instanceof ActiveMQQueue);
}
return jmsMsg;

View File

@ -403,9 +403,19 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
try {
ClientSession.AddressQuery query = clientSession.addressQuery(address);
// if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
// as that's a more efficient path for such operation
if (!query.isExists() && ((address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !query.isAutoCreateJmsQueues()) || (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !query.isAutoCreateJmsTopics()))) {
if (!query.isExists() && query.isAutoCreateJmsQueues()) {
if (destination.isQueue() && !destination.isTemporary()) {
clientSession.createAddress(address, false);
clientSession.createQueue(address, address, null, true);
} else if (destination.isQueue() && destination.isTemporary()) {
clientSession.createAddress(address, false);
clientSession.createTemporaryQueue(address, address);
} else if (!destination.isQueue() && !destination.isTemporary()) {
clientSession.createAddress(address, true);
} else if (!destination.isQueue() && destination.isTemporary()) {
clientSession.createAddress(address, true);
}
} else if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
throw new InvalidDestinationException("Destination " + address + " does not exist");
} else {
connection.addKnownDestination(address);

View File

@ -33,7 +33,7 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
// Static --------------------------------------------------------
public static SimpleString createAddressFromName(final String name) {
return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + name);
return new SimpleString(name);
}
// Attributes ----------------------------------------------------
@ -41,11 +41,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
// Constructors --------------------------------------------------
public ActiveMQQueue(final String name) {
super(JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null);
super(name, name, false, true, null);
}
public ActiveMQQueue(final String name, boolean temporary) {
super(JMS_QUEUE_ADDRESS_PREFIX + name, name, temporary, true, null);
super(name, name, temporary, true, null);
}
/**

View File

@ -299,7 +299,15 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (jbd != null) {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
if (!response.isExists() && ((jbd.getAddress().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !response.isAutoCreateJmsQueues()) || (jbd.getAddress().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !response.isAutoCreateJmsTopics()))) {
if (!response.isExists() && response.isAutoCreateJmsQueues()) {
if (jbd.isQueue()) {
session.createAddress(jbd.getSimpleAddress(), false);
session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true);
} else {
session.createAddress(jbd.getSimpleAddress(), true);
}
} else if (!response.isExists() && !response.isAutoCreateJmsQueues()) {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
@ -559,7 +567,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
if (!response.isExists()) {
if (!response.isExists() && !response.isAutoCreateJmsTopics()) {
throw ActiveMQJMSClientBundle.BUNDLE.destinationDoesNotExist(dest.getSimpleAddress());
}
@ -652,8 +660,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
} else {
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
if (!response.isExists() && !response.isAutoCreateJmsTopics()) {
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
if (!response.isExists()) {
if (response.isAutoCreateJmsQueues()) {
session.createAddress(dest.getSimpleAddress(), true);
} else {
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
}
}
connection.addKnownDestination(dest.getSimpleAddress());
@ -774,26 +786,26 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw JMSExceptionHelper.convertFromActiveMQException(ActiveMQJMSClientBundle.BUNDLE.invalidFilter(e, new SimpleString(filterString)));
}
ActiveMQDestination jbq = (ActiveMQDestination) queue;
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
if (!jbq.isQueue()) {
if (!activeMQDestination.isQueue()) {
throw new InvalidDestinationException("Cannot create a browser on a topic");
}
try {
AddressQuery response = session.addressQuery(new SimpleString(jbq.getAddress()));
AddressQuery response = session.addressQuery(new SimpleString(activeMQDestination.getAddress()));
if (!response.isExists()) {
if (response.isAutoCreateJmsQueues()) {
session.createQueue(jbq.getSimpleAddress(), jbq.getSimpleAddress(), true);
session.createQueue(activeMQDestination.getSimpleAddress(), activeMQDestination.getSimpleAddress(), true);
} else {
throw new InvalidDestinationException("Destination " + jbq.getName() + " does not exist");
throw new InvalidDestinationException("Destination " + activeMQDestination.getName() + " does not exist");
}
}
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
return new ActiveMQQueueBrowser(options, (ActiveMQQueue) jbq, filterString, session);
return new ActiveMQQueueBrowser(options, (ActiveMQQueue) activeMQDestination, filterString, session);
}
@ -1082,7 +1094,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
AddressQuery query = session.addressQuery(topic.getSimpleAddress());
if (!query.isExists() && !query.isAutoCreateJmsTopics()) {
if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
return null;
} else {
return topic;

View File

@ -32,7 +32,7 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
// Static --------------------------------------------------------
public static SimpleString createAddressFromName(final String name) {
return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + name);
return new SimpleString(name);
}
// Attributes ----------------------------------------------------
@ -44,7 +44,7 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
}
public ActiveMQTopic(final String name, boolean temporary) {
super(JMS_TOPIC_ADDRESS_PREFIX + name, name, temporary, false, null);
super(name, name, temporary, false, null);
}
/**

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.jms.management.impl;
import javax.jms.JMSRuntimeException;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
@ -38,6 +37,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.Parameter;
@ -52,11 +52,12 @@ import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.management.impl.AbstractControl;
import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerLogger;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
@ -101,28 +102,6 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
return trimmed;
}
private static String[] determineJMSDestination(String coreAddress) {
String[] result = new String[2]; // destination name & type
if (coreAddress.startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX)) {
result[0] = coreAddress.substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length());
result[1] = "queue";
} else if (coreAddress.startsWith(ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX)) {
result[0] = coreAddress.substring(ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
result[1] = "tempqueue";
} else if (coreAddress.startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
result[0] = coreAddress.substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length());
result[1] = "topic";
} else if (coreAddress.startsWith(ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX)) {
result[0] = coreAddress.substring(ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
result[1] = "temptopic";
} else {
ActiveMQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestination()" + coreAddress);
// not related to JMS
return null;
}
return result;
}
public static MBeanNotificationInfo[] getNotificationInfos() {
JMSNotificationType[] values = JMSNotificationType.values();
String[] names = new String[values.length];
@ -822,24 +801,45 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
}
private JsonObject toJSONObject(ServerConsumer consumer) {
String[] destinationInfo = determineJMSDestination(consumer.getQueue().getAddress().toString());
if (destinationInfo == null) {
private String determineJMSDestinationType(Queue queue) {
String result;
if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.ANYCAST) {
if (queue.isTemporary()) {
result = "tempqueue";
} else {
result = "queue";
}
} else if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.MULTICAST) {
if (queue.isTemporary()) {
result = "temptopic";
} else {
result = "topic";
}
} else {
ActiveMQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestinationType() " + queue);
// not related to JMS
return null;
}
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("destinationName", destinationInfo[0]).add("destinationType", destinationInfo[1]);
return result;
}
private JsonObject toJSONObject(ServerConsumer consumer) {
AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(consumer.getQueue().getAddress().toString()));
if (addressInfo == null) {
return null;
}
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("destinationName", consumer.getQueue().getAddress().toString()).add("destinationType", determineJMSDestinationType(consumer.getQueue()));
// JMS consumer with message filter use the queue's filter
Filter queueFilter = consumer.getQueue().getFilter();
if (queueFilter != null) {
obj.add("filter", queueFilter.getFilterString().toString());
}
if (destinationInfo[1].equals("topic")) {
try {
ActiveMQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString());
obj.add("durable", true);
} catch (IllegalArgumentException | JMSRuntimeException e) {
if (addressInfo.getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) {
if (consumer.getQueue().isTemporary()) {
obj.add("durable", false);
} else {
obj.add("durable", true);
}
} else {
obj.add("durable", false);

View File

@ -300,16 +300,16 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl {
String clientID = null;
String subName = null;
if (queue.isDurable() && !queue.getName().startsWith(ResourceNames.JMS_TOPIC)) {
if (queue.isDurable()) {
Pair<String, String> pair = ActiveMQDestination.decomposeQueueNameForDurableSubscription(queue.getName());
clientID = pair.getA();
subName = pair.getB();
} else if (queue.getName().startsWith(ResourceNames.JMS_TOPIC)) {
} else {
// in the case of heirarchical topics the queue name will not follow the <part>.<part> pattern of normal
// durable subscribers so skip decomposing the name for the client ID and subscription name and just
// hard-code it
clientID = "ActiveMQ";
subName = "ActiveMQ";
clientID = "";
subName = "";
}
String filter = queue.getFilter() != null ? queue.getFilter() : null;

View File

@ -28,7 +28,6 @@ import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -57,12 +56,9 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueDeleter;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
@ -392,15 +388,15 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
return;
}
server.setJMSQueueCreator(new JMSDestinationCreator());
server.setJMSQueueDeleter(new JMSQueueDeleter());
// server.setJMSQueueCreator(new JMSDestinationCreator());
//
// server.setJMSQueueDeleter(new JMSQueueDeleter());
server.registerActivateCallback(this);
server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback());
server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback());
// server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback());
//
// server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback());
/**
* See this method's javadoc.
* <p>
@ -797,11 +793,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
public synchronized boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception {
checkInitialised();
server.destroyQueue(ActiveMQDestination.createQueueAddressFromName(name), null, !removeConsumers, removeConsumers);
server.destroyQueue(SimpleString.toSimpleString(name), null, !removeConsumers, removeConsumers);
// if the queue has consumers and 'removeConsumers' is false then the queue won't actually be removed
// therefore only remove the queue from Bindings, etc. if the queue is actually removed
if (this.server.getPostOffice().getBinding(ActiveMQDestination.createQueueAddressFromName(name)) == null) {
if (this.server.getPostOffice().getBinding(SimpleString.toSimpleString(name)) == null) {
removeFromBindings(queues, queueBindings, name);
queues.remove(name);
@ -826,7 +822,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
@Override
public synchronized boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception {
checkInitialised();
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + ActiveMQDestination.createTopicAddressFromName(name));
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + name);
if (addressControl != null) {
for (String queueName : addressControl.getQueueNames()) {
Binding binding = server.getPostOffice().getBinding(new SimpleString(queueName));
@ -1096,6 +1092,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
}
server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).setRoutingType(AddressInfo.RoutingType.ANYCAST).setDefaultMaxQueueConsumers(-1));
Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated);
queues.put(queueName, activeMQQueue);
@ -1131,7 +1129,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
// server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress())));
topics.put(topicName, activeMQTopic);
@ -1643,95 +1642,95 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* This class is responsible for auto-creating the JMS (and underlying core) resources when a client sends a message
* to a non-existent JMS queue or topic
*/
class JMSDestinationCreator implements QueueCreator {
// class JMSDestinationCreator implements QueueCreator {
//
// @Override
// public boolean create(SimpleString address) throws Exception {
// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
// return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
// } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) {
// return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
// } else {
// return false;
// }
// }
// }
@Override
public boolean create(SimpleString address) throws Exception {
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
} else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) {
return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
} else {
return false;
}
}
}
class JMSQueueDeleter implements QueueDeleter {
@Override
public boolean delete(SimpleString queueName) throws Exception {
Queue queue = server.locateQueue(queueName);
SimpleString address = queue.getAddress();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoDeleteJmsQueues() && queue.getMessageCount() == 0) {
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues());
}
return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false);
} else {
return false;
}
}
}
// class JMSQueueDeleter implements QueueDeleter {
//
// @Override
// public boolean delete(SimpleString queueName) throws Exception {
// Queue queue = server.locateQueue(queueName);
// SimpleString address = queue.getAddress();
// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
// long consumerCount = queue.getConsumerCount();
// long messageCount = queue.getMessageCount();
//
// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.getAutoDeleteJmsQueues() && queue.getMessageCount() == 0) {
// if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
// ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues());
// }
//
// return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false);
// } else {
// return false;
// }
// }
// }
/**
* When a core queue is created with a jms.topic prefix this class will create the associated JMS resources
* retroactively. This would happen if, for example, a client created a subscription a non-existent JMS topic and
* autoCreateJmsTopics = true.
*/
class JMSPostQueueCreationCallback implements PostQueueCreationCallback {
@Override
public void callback(SimpleString queueName) throws Exception {
Queue queue = server.locateQueue(queueName);
String address = queue.getAddress().toString();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
/* When a topic is created a dummy subscription is created which never receives any messages; when the queue
* for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the
* queue name doesn't start with the topic prefix.
*/
if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
}
}
}
// class JMSPostQueueCreationCallback implements PostQueueCreationCallback {
//
// @Override
// public void callback(SimpleString queueName) throws Exception {
// Queue queue = server.locateQueue(queueName);
// String address = queue.getAddress().toString();
//
// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
// /* When a topic is created a dummy subscription is created which never receives any messages; when the queue
// * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the
// * queue name doesn't start with the topic prefix.
// */
// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
// createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
// }
// }
// }
/**
* When a core queue representing a JMS topic subscription is deleted this class will check to see if that was the
* last subscription on the topic and if so and autoDeleteJmsTopics = true then it will delete the JMS resources
* for that topic.
*/
class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback {
@Override
public void callback(SimpleString address, SimpleString queueName) throws Exception {
Queue queue = server.locateQueue(address);
Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(address).getBindings();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) {
try {
destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()));
} catch (IllegalStateException e) {
/*
* During shutdown the callback can be invoked after the JMSServerManager is already shut down so we just
* ignore the exception in that case
*/
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQJMSServerLogger.LOGGER.debug("Failed to destroy topic", e);
}
}
}
}
}
// class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback {
//
// @Override
// public void callback(SimpleString address, SimpleString queueName) throws Exception {
// Queue queue = server.locateQueue(address);
// Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(address).getBindings();
//
// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
//
// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) {
// try {
// destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()));
// } catch (IllegalStateException e) {
// /*
// * During shutdown the callback can be invoked after the JMSServerManager is already shut down so we just
// * ignore the exception in that case
// */
// if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
// ActiveMQJMSServerLogger.LOGGER.debug("Failed to destroy topic", e);
// }
// }
// }
// }
// }
private final class JMSReloader implements ReloadCallback {

View File

@ -89,14 +89,14 @@ public class JMSManagementServiceImpl implements JMSManagementService {
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jmsServerManager, counter);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control);
managementService.registerInRegistry(queue.getQueueName(), control);
}
@Override
public synchronized void unregisterQueue(final String name) throws Exception {
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(name);
managementService.unregisterFromJMX(objectName);
managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
managementService.unregisterFromRegistry(name);
}
@Override
@ -105,14 +105,14 @@ public class JMSManagementServiceImpl implements JMSManagementService {
AddressControl addressControl = (AddressControl) managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
JMSTopicControlImpl control = new JMSTopicControlImpl(topic, jmsServerManager, addressControl, managementService);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control);
managementService.registerInRegistry(topic.getTopicName(), control);
}
@Override
public synchronized void unregisterTopic(final String name) throws Exception {
ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(name);
managementService.unregisterFromJMX(objectName);
managementService.unregisterFromRegistry(ResourceNames.JMS_TOPIC + name);
managementService.unregisterFromRegistry(name);
}
@Override

View File

@ -76,7 +76,7 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest {
List<Queue> boundQueues = jmsServer.getTopicQueues(TEST_TOPIC);
assertNotNull("List should never be null", boundQueues);
assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size());
assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 1, boundQueues.size());
}
}

View File

@ -76,7 +76,7 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest {
List<Queue> boundQueues = jmsServer.getTopicQueues(TEST_TOPIC);
assertNotNull("List should never be null", boundQueues);
assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size());
assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 1, boundQueues.size());
}
}

View File

@ -58,7 +58,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
* */
private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
// TODO fix this
private String pubSubPrefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX;
private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;

View File

@ -248,7 +248,7 @@ public class TestConversions extends Assert {
}
private void simulatePersistence(ServerMessage serverMessage) {
serverMessage.setAddress(new SimpleString("jms.queue.SomeAddress"));
serverMessage.setAddress(new SimpleString("SomeAddress"));
// This is just to simulate what would happen during the persistence of the message
// We need to still be able to recover the message when we read it back
((EncodingSupport) serverMessage).encode(new EmptyBuffer());

View File

@ -792,8 +792,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public void tempQueueDeleted(SimpleString bindingName) {
String amqName = OpenWireUtil.toAMQAddress(bindingName.toString());
ActiveMQDestination dest = new ActiveMQTempQueue(amqName);
ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = getContext();

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
@ -84,9 +85,9 @@ public class AMQConsumer {
if (openwireDestination.isTopic()) {
if (openwireDestination.isTemporary()) {
address = new SimpleString("jms.temptopic." + physicalName);
address = new SimpleString(physicalName);
} else {
address = new SimpleString("jms.topic." + physicalName);
address = new SimpleString(physicalName);
}
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
@ -95,7 +96,11 @@ public class AMQConsumer {
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
} else {
SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
session.getCoreServer().getJMSDestinationCreator().create(queueName);
try {
session.getCoreServer().createQueue(queueName, queueName, null, true, false);
} catch (ActiveMQQueueExistsException e) {
// ignore
}
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@ -145,7 +146,11 @@ public class AMQSession implements SessionCallback {
for (ActiveMQDestination openWireDest : dests) {
if (openWireDest.isQueue()) {
SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
getCoreServer().getJMSDestinationCreator().create(queueName);
try {
getCoreServer().createQueue(queueName, queueName, null, true, false);
} catch (ActiveMQQueueExistsException e) {
// ignore
}
}
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);

View File

@ -28,11 +28,6 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.util.ByteSequence;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX;
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
public class OpenWireUtil {
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
@ -45,23 +40,19 @@ public class OpenWireUtil {
public static SimpleString toCoreAddress(ActiveMQDestination dest) {
if (dest.isQueue()) {
if (dest.isTemporary()) {
return new SimpleString(JMS_TEMP_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName());
return new SimpleString(dest.getPhysicalName());
} else {
return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName());
return new SimpleString(dest.getPhysicalName());
}
} else {
if (dest.isTemporary()) {
return new SimpleString(JMS_TEMP_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName());
return new SimpleString(dest.getPhysicalName());
} else {
return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName());
return new SimpleString(dest.getPhysicalName());
}
}
}
public static String toAMQAddress(String coreAddress) {
return coreAddress.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
}
/**
* We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
* destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
@ -70,7 +61,7 @@ public class OpenWireUtil {
*/
public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
String address = message.getAddress().toString();
String strippedAddress = toAMQAddress(address);
String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
if (actualDestination.isQueue()) {
return new ActiveMQQueue(strippedAddress);
} else {

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
@ -37,8 +38,8 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@ -253,11 +254,12 @@ public final class StompConnection implements RemotingConnection {
}
public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException {
// TODO: STOMP clients will have to prefix their destination with queue:// or topic:// so we can determine what to do here
try {
QueueCreator queueCreator = manager.getServer().getJMSDestinationCreator();
if (queueCreator != null) {
queueCreator.create(SimpleString.toSimpleString(queue));
}
manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST));
manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, true, false);
} catch (ActiveMQQueueExistsException e) {
// ignore
} catch (Exception e) {
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -285,7 +286,7 @@ public class StompSession implements SessionCallback {
receiveCredits = -1;
}
if (destination.startsWith("jms.topic")) {
if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {

View File

@ -65,7 +65,7 @@ public class EmbeddedRestActiveMQJMSTest {
List<String> connectors = createInVmConnector();
server.getEmbeddedJMS().getJMSServerManager().createConnectionFactory("ConnectionFactory", false, JMSFactoryType.CF, connectors, "ConnectionFactory");
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/jms.queue.exampleQueue"));
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/exampleQueue"));
ClientResponse<?> response = request.head();
response.releaseConnection();

View File

@ -94,7 +94,7 @@ public class EmbeddedTest {
@Test
public void testTransform() throws Exception {
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/jms.queue.exampleQueue"));
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/exampleQueue"));
ClientResponse<?> response = request.head();
response.releaseConnection();

View File

@ -106,7 +106,7 @@ public class JMSTest extends MessageTestBase {
}
public static Destination createDestination(String dest) {
ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromAddress(dest);
ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(dest);
System.out.println("SimpleAddress: " + destination.getSimpleAddress());
return destination;
}
@ -150,8 +150,9 @@ public class JMSTest extends MessageTestBase {
@Test
public void testJmsConsumer() throws Exception {
String queueName = ActiveMQDestination.createQueueAddressFromName("testQueue2").toString();
System.out.println("Queue name: " + queueName);
String queueName = "testQueue2";
String prefixedQueueName = ActiveMQDestination.createQueueAddressFromName(queueName).toString();
System.out.println("Queue name: " + prefixedQueueName);
QueueDeployment deployment = new QueueDeployment();
deployment.setDuplicatesAllowed(true);
deployment.setDurableSend(false);
@ -160,7 +161,7 @@ public class JMSTest extends MessageTestBase {
Connection conn = connectionFactory.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = createDestination(queueName);
Destination destination = createDestination(prefixedQueueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new Listener());
conn.start();
@ -196,8 +197,9 @@ public class JMSTest extends MessageTestBase {
@Test
public void testJmsProducer() throws Exception {
String queueName = ActiveMQDestination.createQueueAddressFromName("testQueue").toString();
System.out.println("Queue name: " + queueName);
String queueName = "testQueue";
String prefixedQueueName = ActiveMQDestination.createQueueAddressFromName(queueName).toString();
System.out.println("Queue name: " + prefixedQueueName);
QueueDeployment deployment = new QueueDeployment();
deployment.setDuplicatesAllowed(true);
deployment.setDurableSend(false);
@ -221,7 +223,7 @@ public class JMSTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
publish(queueName, order, null);
publish(prefixedQueueName, order, null);
ClientResponse<?> res = consumeNext.request().header("Accept-Wait", "2").accept("application/xml").post(String.class);
Assert.assertEquals(200, res.getStatus());
@ -238,7 +240,7 @@ public class JMSTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
publish(queueName, order, null);
publish(prefixedQueueName, order, null);
ClientResponse<?> res = consumeNext.request().header("Accept-Wait", "2").accept("application/json").post(String.class);
Assert.assertEquals(200, res.getStatus());
@ -255,7 +257,7 @@ public class JMSTest extends MessageTestBase {
Order order = new Order();
order.setName("2");
order.setAmount("$15.00");
publish(queueName, order, "application/xml");
publish(prefixedQueueName, order, "application/xml");
ClientResponse<?> res = consumeNext.request().header("Accept-Wait", "2").post(String.class);
Assert.assertEquals(200, res.getStatus());

View File

@ -45,12 +45,13 @@ import static org.jboss.resteasy.test.TestPortProvider.generateURL;
public class SelectorTest extends MessageTestBase {
public static ConnectionFactory connectionFactory;
public static String topicName = ActiveMQDestination.createQueueAddressFromName("testTopic").toString();
public static String topicName = "testTopic";
public static String prefixedTopicName = ActiveMQDestination.createQueueAddressFromName(topicName).toString();
@BeforeClass
public static void setup() throws Exception {
connectionFactory = new ActiveMQJMSConnectionFactory(manager.getQueueManager().getServerLocator());
System.out.println("Queue name: " + topicName);
System.out.println("Queue name: " + prefixedTopicName);
TopicDeployment deployment = new TopicDeployment();
deployment.setDuplicatesAllowed(true);
deployment.setDurableSend(false);
@ -118,7 +119,7 @@ public class SelectorTest extends MessageTestBase {
}
public static Destination createDestination(String dest) {
ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromAddress(dest);
ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(dest);
System.out.println("SimpleAddress: " + destination.getSimpleAddress());
return destination;
}
@ -203,32 +204,32 @@ public class SelectorTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
order.setName("2");
publish(topicName, order, null, "2");
publish(prefixedTopicName, order, null, "2");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.twoOrder);
order.setName("3");
publish(topicName, order, null, "2");
publish(prefixedTopicName, order, null, "2");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.twoOrder);
order.setName("4");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
order.setName("5");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
order.setName("6");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
@ -262,17 +263,17 @@ public class SelectorTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
order.setName("2");
publish(topicName, order, null, "2");
publish(prefixedTopicName, order, null, "2");
order.setName("3");
publish(topicName, order, null, "2");
publish(prefixedTopicName, order, null, "2");
order.setName("4");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
order.setName("5");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
order.setName("6");
publish(topicName, order, null, "1");
publish(prefixedTopicName, order, null, "1");
{
order.setName("1");

View File

@ -27,7 +27,7 @@ public class XmlTest {
@Test
public void testPush() throws Exception {
String xml = "<push-registration id=\"111\">\n" +
" <destination>jms.queue.bar</destination>\n" +
" <destination>bar</destination>\n" +
" <durable>true</durable>\n" +
" <session-count>10</session-count>\n" +
" <link rel=\"template\" href=\"http://somewhere.com/resources/{id}/messages\" method=\"PUT\"/>\n" +

View File

@ -39,7 +39,7 @@
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.exampleQueue">
<security-setting match="exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>

View File

@ -1505,15 +1505,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final String deleteNonDurableQueueRoles,
final String manageRoles,
final String browseRoles) throws Exception {
addSecuritySettings(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, "");
}
@Override
public void addSecuritySettings(final String addressMatch,
final String sendRoles,
final String consumeRoles,
final String createDurableQueueRoles,
final String deleteDurableQueueRoles,
final String createNonDurableQueueRoles,
final String deleteNonDurableQueueRoles,
final String manageRoles,
final String browseRoles,
final String createAddressRoles) throws Exception {
checkStarted();
clearIO();
try {
Set<Role> roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles);
Set<Role> roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, createAddressRoles);
server.getSecurityRepository().addMatch(addressMatch, roles);
PersistedRoles persistedRoles = new PersistedRoles(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles);
PersistedRoles persistedRoles = new PersistedRoles(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, createAddressRoles);
storageManager.storeSecurityRoles(persistedRoles);
} finally {
@ -1588,7 +1602,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (addressSettings.getExpiryAddress() != null) {
settings.add("expiryAddress", addressSettings.getExpiryAddress().toString());
}
return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()).add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsTopics()).build().toString();
return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsQueues", addressSettings.getAutoDeleteJmsQueues()).add("autoDeleteJmsTopics", addressSettings.getAutoDeleteJmsQueues()).build().toString();
}
@Override
@ -1661,8 +1675,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues);
addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues);
addressSettings.setAutoCreateJmsTopics(autoCreateJmsTopics);
addressSettings.setAutoDeleteJmsTopics(autoDeleteJmsTopics);
server.getAddressSettingsRepository().addMatch(address, addressSettings);
storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));

View File

@ -27,4 +27,6 @@ public interface AddressBindingInfo {
AddressInfo.RoutingType getRoutingType();
int getDefaultMaxConsumers();
}

View File

@ -52,5 +52,5 @@ public interface QueueBindingInfo {
boolean isDeleteOnNoConsumers();
void setDeleteOnNoConsumers();
void setDeleteOnNoConsumers(boolean deleteOnNoConsumers);
}

View File

@ -46,6 +46,8 @@ public class PersistedRoles implements EncodingSupport {
private SimpleString browseRoles;
private SimpleString createAddressRoles;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -72,7 +74,8 @@ public class PersistedRoles implements EncodingSupport {
final String createNonDurableQueueRoles,
final String deleteNonDurableQueueRoles,
final String manageRoles,
final String browseRoles) {
final String browseRoles,
final String createAddressRoles) {
super();
this.addressMatch = SimpleString.toSimpleString(addressMatch);
this.sendRoles = SimpleString.toSimpleString(sendRoles);
@ -83,6 +86,7 @@ public class PersistedRoles implements EncodingSupport {
this.deleteNonDurableQueueRoles = SimpleString.toSimpleString(deleteNonDurableQueueRoles);
this.manageRoles = SimpleString.toSimpleString(manageRoles);
this.browseRoles = SimpleString.toSimpleString(browseRoles);
this.createAddressRoles = SimpleString.toSimpleString(createAddressRoles);
}
// Public --------------------------------------------------------
@ -158,6 +162,13 @@ public class PersistedRoles implements EncodingSupport {
return browseRoles.toString();
}
/**
* @return the createAddressRoles
*/
public String getCreateAddressRoles() {
return createAddressRoles.toString();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(addressMatch);
@ -169,6 +180,7 @@ public class PersistedRoles implements EncodingSupport {
buffer.writeNullableSimpleString(deleteNonDurableQueueRoles);
buffer.writeNullableSimpleString(manageRoles);
buffer.writeNullableSimpleString(browseRoles);
buffer.writeNullableSimpleString(createAddressRoles);
}
@Override
@ -180,7 +192,8 @@ public class PersistedRoles implements EncodingSupport {
SimpleString.sizeofNullableString(createNonDurableQueueRoles) +
SimpleString.sizeofNullableString(deleteNonDurableQueueRoles) +
SimpleString.sizeofNullableString(manageRoles) +
SimpleString.sizeofNullableString(browseRoles);
SimpleString.sizeofNullableString(browseRoles) +
SimpleString.sizeofNullableString(createAddressRoles);
}
@ -195,6 +208,7 @@ public class PersistedRoles implements EncodingSupport {
deleteNonDurableQueueRoles = buffer.readNullableSimpleString();
manageRoles = buffer.readNullableSimpleString();
browseRoles = buffer.readNullableSimpleString();
createAddressRoles = buffer.readNullableSimpleString();
}
/* (non-Javadoc)
@ -212,6 +226,7 @@ public class PersistedRoles implements EncodingSupport {
result = prime * result + ((deleteNonDurableQueueRoles == null) ? 0 : deleteNonDurableQueueRoles.hashCode());
result = prime * result + ((manageRoles == null) ? 0 : manageRoles.hashCode());
result = prime * result + ((browseRoles == null) ? 0 : browseRoles.hashCode());
result = prime * result + ((createAddressRoles == null) ? 0 : createAddressRoles.hashCode());
result = prime * result + ((sendRoles == null) ? 0 : sendRoles.hashCode());
result = prime * result + (int) (storeId ^ (storeId >>> 32));
return result;
@ -269,6 +284,11 @@ public class PersistedRoles implements EncodingSupport {
return false;
} else if (!browseRoles.equals(other.browseRoles))
return false;
if (createAddressRoles == null) {
if (other.createAddressRoles != null)
return false;
} else if (!createAddressRoles.equals(other.createAddressRoles))
return false;
if (sendRoles == null) {
if (other.sendRoles != null)
return false;
@ -303,6 +323,8 @@ public class PersistedRoles implements EncodingSupport {
manageRoles +
", browseRoles=" +
browseRoles +
", createAddressRoles=" +
createAddressRoles +
"]";
}

View File

@ -1221,7 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers());
readLock();
try {
@ -1268,7 +1268,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType());
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType(), addressInfo.getDefaultMaxQueueConsumers());
readLock();
try {

View File

@ -29,6 +29,8 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public SimpleString name;
public int defaultMaxConsumers;
public AddressInfo.RoutingType routingType;
public PersistentAddressBindingEncoding() {
@ -41,13 +43,17 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
name +
", routingType=" +
routingType +
", defaultMaxConsumers=" +
defaultMaxConsumers +
"]";
}
public PersistentAddressBindingEncoding(final SimpleString name,
final AddressInfo.RoutingType routingType) {
final AddressInfo.RoutingType routingType,
final int defaultMaxConsumers) {
this.name = name;
this.routingType = routingType;
this.defaultMaxConsumers = defaultMaxConsumers;
}
@Override
@ -69,20 +75,27 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
return routingType;
}
@Override
public int getDefaultMaxConsumers() {
return defaultMaxConsumers;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
routingType = AddressInfo.RoutingType.getType(buffer.readByte());
defaultMaxConsumers = buffer.readInt();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name);
buffer.writeByte(routingType.getType());
buffer.writeInt(defaultMaxConsumers);
}
@Override
public int getEncodeSize() {
return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE;
return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
}
}

View File

@ -72,12 +72,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final SimpleString address,
final SimpleString filterString,
final SimpleString user,
final boolean autoCreated) {
final boolean autoCreated,
final int maxConsumers,
final boolean deleteOnNoConsumers) {
this.name = name;
this.address = address;
this.filterString = filterString;
this.user = user;
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
@Override
@ -134,12 +138,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
@Override
public int getMaxConsumers() {
return 0;
return maxConsumers;
}
@Override
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
@Override
@ -148,8 +152,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
public void setDeleteOnNoConsumers() {
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
@Override

View File

@ -421,11 +421,21 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
try {
getServer().getManagementService().registerAddress(addressInfo.getName());
} catch (Exception e) {
e.printStackTrace();
}
return addressManager.addAddressInfo(addressInfo);
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
try {
getServer().getManagementService().registerAddress(addressInfo.getName());
} catch (Exception e) {
e.printStackTrace();
}
return addressManager.addOrUpdateAddressInfo(addressInfo);
}
@ -490,6 +500,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new ActiveMQNonExistentQueueException();
}
// TODO: see whether we still want to do this or not
if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
pagingManager.deletePageStore(binding.getAddress());

View File

@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -39,7 +38,7 @@ import org.jboss.logging.Logger;
*/
public class SimpleAddressManager implements AddressManager {
private static final Logger logger = Logger.getLogger(Page.class);
private static final Logger logger = Logger.getLogger(SimpleAddressManager.class);
private final ConcurrentMap<SimpleString, AddressInfo> addressInfoMap = new ConcurrentHashMap<>();
@ -196,7 +195,7 @@ public class SimpleAddressManager implements AddressManager {
private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) {
synchronized (from) {
from.setRoutingType(to.getRoutingType());
from.setDefaultMaxConsumers(to.getDefaultMaxConsumers());
from.setDefaultMaxQueueConsumers(to.getDefaultMaxQueueConsumers());
from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers());
return from;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
@ -82,6 +83,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@ -220,6 +222,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
break;
}
case CREATE_ADDRESS: {
CreateAddressMessage request = (CreateAddressMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createAddress(request.getAddress(), request.isMulticast());
if (requiresResponse) {
response = new NullResponseMessage();
}
break;
}
case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse();

View File

@ -29,6 +29,12 @@ public enum CheckType {
return role.isConsume();
}
},
CREATE_ADDRESS {
@Override
public boolean hasRole(final Role role) {
return role.isCreateAddress();
}
},
CREATE_DURABLE_QUEUE {
@Override
public boolean hasRole(final Role role) {

View File

@ -473,7 +473,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId);
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
AddressInfo removeAddressInfo(SimpleString address);
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
}

View File

@ -33,7 +33,7 @@ public final class QueueConfig {
private final boolean durable;
private final boolean temporary;
private final boolean autoCreated;
private final int maxConsumers;
private final Integer maxConsumers;
private final boolean deleteOnNoConsumers;
public static final class Builder {
@ -47,7 +47,7 @@ public final class QueueConfig {
private boolean durable;
private boolean temporary;
private boolean autoCreated;
private int maxConsumers;
private Integer maxConsumers;
private boolean deleteOnNoConsumers;
private Builder(final long id, final SimpleString name) {
@ -112,7 +112,7 @@ public final class QueueConfig {
return this;
}
public Builder maxConsumers(final int maxConsumers) {
public Builder maxConsumers(final Integer maxConsumers) {
this.maxConsumers = maxConsumers;
return this;
}
@ -185,7 +185,7 @@ public final class QueueConfig {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final int maxConsumers,
final Integer maxConsumers,
final boolean deleteOnNoConsumers) {
this.id = id;
this.address = address;
@ -240,7 +240,7 @@ public final class QueueConfig {
return deleteOnNoConsumers;
}
public int maxConsumers() {
public Integer maxConsumers() {
return maxConsumers;
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -109,6 +110,8 @@ public interface ServerSession extends SecurityAuth {
boolean temporary,
boolean durable) throws Exception;
AddressInfo createAddress(final SimpleString address, final boolean multicast) throws Exception;
void deleteQueue(SimpleString name) throws Exception;
ServerConsumer createConsumer(long consumerID,

View File

@ -78,10 +78,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// Attributes ----------------------------------------------------
private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
protected final ServerLocatorInternal serverLocator;
protected final Executor executor;
@ -879,16 +875,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return;
}
if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX)) {
if (!query.isExists()) {
ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount);
scheduleRetryConnect();
return;
}
} else {
if (!query.isExists()) {
ActiveMQServerLogger.LOGGER.bridgeNoBindings(getName(), getForwardingAddress(), getForwardingAddress());
}
if (!query.isExists()) {
ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount);
scheduleRetryConnect();
return;
}
}

View File

@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@ -692,14 +691,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return postOffice.isAddressBound(SimpleString.toSimpleString(address));
}
// TODO: this should probably look at the addresses too, not just queue bindings
@Override
public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
if (address == null) {
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
}
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
boolean autoCreateJmsTopics = address.toString().startsWith(ResourceNames.JMS_TOPIC) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsTopics();
boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
boolean autoCreateJmsTopics = getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsTopics();
List<SimpleString> names = new ArrayList<>();
@ -728,7 +728,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
QueueQueryResult response;
@ -1628,11 +1628,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
if (resourceName.toString().toLowerCase().startsWith("jms.topic")) {
ActiveMQServerLogger.LOGGER.deployTopic(resourceName);
} else {
ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
}
// TODO: fix logging here as this could be for a topic or queue
ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
}
@ -1662,6 +1659,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SecurityAuth session,
final boolean checkConsumerCount,
final boolean removeConsumers) throws Exception {
if (postOffice == null) {
return;
}
addressSettingsRepository.clearCache();
Binding binding = postOffice.getBinding(queueName);
@ -2210,7 +2211,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()));
info.setRoutingType(config.getRoutingType());
info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers());
info.setDefaultMaxConsumers(config.getDefaultMaxConsumers());
info.setDefaultMaxQueueConsumers(config.getDefaultMaxConsumers());
createOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
@ -2323,20 +2324,34 @@ public class ActiveMQServerImpl implements ActiveMQServer {
List<PersistedRoles> roles = storageManager.recoverPersistedRoles();
for (PersistedRoles roleItem : roles) {
Set<Role> setRoles = SecurityFormatter.createSecurity(roleItem.getSendRoles(), roleItem.getConsumeRoles(), roleItem.getCreateDurableQueueRoles(), roleItem.getDeleteDurableQueueRoles(), roleItem.getCreateNonDurableQueueRoles(), roleItem.getDeleteNonDurableQueueRoles(), roleItem.getManageRoles(), roleItem.getBrowseRoles());
Set<Role> setRoles = SecurityFormatter.createSecurity(roleItem.getSendRoles(), roleItem.getConsumeRoles(), roleItem.getCreateDurableQueueRoles(), roleItem.getDeleteDurableQueueRoles(), roleItem.getCreateNonDurableQueueRoles(), roleItem.getDeleteNonDurableQueueRoles(), roleItem.getManageRoles(), roleItem.getBrowseRoles(), roleItem.getCreateAddressRoles());
securityRepository.addMatch(roleItem.getAddressMatch().toString(), setRoles);
}
}
@Override
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) {
return postOffice.addOrUpdateAddressInfo(addressInfo);
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo);
// TODO: is this the right way to do this?
long txID = storageManager.generateID();
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
return result;
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
return postOffice.removeAddressInfo(address);
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
AddressInfo result = postOffice.removeAddressInfo(address);
// TODO: is this the right way to do this?
// long txID = storageManager.generateID();
// storageManager.deleteAddressBinding(txID, getAddressInfo(address).getID());
// storageManager.commitBindings(txID);
return result;
}
@Override
@ -2394,30 +2409,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final long queueID = storageManager.generateID();
final QueueConfig.Builder queueConfigBuilder;
final SimpleString address;
if (addressName == null) {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
address = queueName;
} else {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
address = addressName;
}
AddressInfo defaultAddressInfo = new AddressInfo(address);
AddressInfo defaultAddressInfo = new AddressInfo(addressName);
// FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API.
AddressInfo info = postOffice.addAddressInfo(defaultAddressInfo);
AddressInfo info = postOffice.getAddressInfo(addressName);
boolean addressExists = true;
if (info == null) {
info = defaultAddressInfo;
addressExists = false;
}
final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers;
final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxQueueConsumers() : maxConsumers;
final QueueConfig queueConfig = queueConfigBuilder.filter(filter)
final QueueConfig queueConfig = queueConfigBuilder
.filter(filter)
.pagingManager(pagingManager)
.user(user)
.durable(durable)
@ -2428,6 +2438,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
.build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
boolean addressAlreadyExists = true;
if (postOffice.getAddressInfo(queue.getAddress()) == null) {
postOffice.addAddressInfo(new AddressInfo(queue.getAddress())
.setRoutingType(AddressInfo.RoutingType.MULTICAST)
.setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers));
addressAlreadyExists = false;
}
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
@ -2437,10 +2456,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
if (!addressExists) {
storageManager.addAddressBinding(txID, getAddressInfo(address));
}
storageManager.addQueueBinding(txID, localQueueBinding);
if (!addressAlreadyExists) {
storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
}
}
try {
@ -2467,7 +2486,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw e;
}
managementService.registerAddress(queue.getAddress());
if (!addressAlreadyExists) {
managementService.registerAddress(queue.getAddress());
}
managementService.registerQueue(queue, queue.getAddress(), storageManager);
callPostQueueCreationCallbacks(queue.getName());

View File

@ -27,7 +27,7 @@ public class AddressInfo {
private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
private int defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
private int defaultMaxQueueConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
public AddressInfo(SimpleString name) {
this.name = name;
@ -51,12 +51,12 @@ public class AddressInfo {
return this;
}
public int getDefaultMaxConsumers() {
return defaultMaxConsumers;
public int getDefaultMaxQueueConsumers() {
return defaultMaxQueueConsumers;
}
public AddressInfo setDefaultMaxConsumers(int defaultMaxConsumers) {
this.defaultMaxConsumers = defaultMaxConsumers;
public AddressInfo setDefaultMaxQueueConsumers(int defaultMaxQueueConsumers) {
this.defaultMaxQueueConsumers = defaultMaxQueueConsumers;
return this;
}
@ -64,6 +64,17 @@ public class AddressInfo {
return name;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("AddressInfo [name=" + name);
buff.append(", routingType=" + routingType);
buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers);
buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers);
buff.append("]");
return buff.toString();
}
public enum RoutingType {
MULTICAST, ANYCAST;

View File

@ -180,7 +180,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
// TODO: figure out what else to set here
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName())
.setRoutingType(addressBindingInfo.getRoutingType());
.setRoutingType(addressBindingInfo.getRoutingType())
.setDefaultMaxQueueConsumers(addressBindingInfo.getDefaultMaxConsumers());
postOffice.addAddressInfo(addressInfo);
managementService.registerAddress(addressInfo.getName());

View File

@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -380,9 +381,9 @@ public class QueueImpl implements Queue {
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers == null ? addressInfo.getDefaultMaxConsumers() : maxConsumers;
this.maxConsumers = maxConsumers == null ? (addressInfo == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : addressInfo.getDefaultMaxQueueConsumers()) : maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers == null ? addressInfo.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers == null ? (addressInfo == null ? ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers() : addressInfo.isDefaultDeleteOnNoConsumers()) : deleteOnNoConsumers;
this.postOffice = postOffice;
@ -1883,7 +1884,7 @@ public class QueueImpl implements Queue {
@Override
public String toString() {
return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
}
private synchronized void internalAddTail(final MessageReference ref) {

View File

@ -439,7 +439,7 @@ public class ScaleDownHandler {
private Integer getQueueID(ClientSession session, SimpleString queueName) throws Exception {
Integer queueID = -1;
Object result;
try (ClientRequestor requestor = new ClientRequestor(session, "jms.queue.activemq.management")) {
try (ClientRequestor requestor = new ClientRequestor(session, "activemq.management")) {
ClientMessage managementMessage = session.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queueName, "ID");
session.start();

View File

@ -41,7 +41,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.filter.Filter;
@ -521,14 +520,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername());
Queue queue;
// any non-temporary JMS destination created via this method should be marked as auto-created
if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC))) {
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true, maxConsumers, deleteOnNoConsumers);
} else {
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers);
}
Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers);
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@ -558,6 +550,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public AddressInfo createAddress(final SimpleString address, final boolean multicast) throws Exception {
// make sure the user has privileges to create this queue
securityCheck(address, CheckType.CREATE_ADDRESS, this);
AddressInfo.RoutingType routingType = multicast ? AddressInfo.RoutingType.MULTICAST : AddressInfo.RoutingType.ANYCAST;
AddressInfo addressInfo = server.createOrUpdateAddressInfo(new AddressInfo(address).setRoutingType(routingType));
return addressInfo;
}
@Override
public void createSharedQueue(final SimpleString address,
final SimpleString name,
@ -1516,6 +1519,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
if (replyTo != null) {
// TODO: move this check somewhere else? this is a JMS-specific bit of logic in the core impl
if (replyTo.toString().startsWith("queue://") || replyTo.toString().startsWith("topic://")) {
replyTo = SimpleString.toSimpleString(replyTo.toString().substring(8));
} else if (replyTo.toString().startsWith("temp-queue://") || replyTo.toString().startsWith("temp-topic://")) {
replyTo = SimpleString.toSimpleString(replyTo.toString().substring(13));
}
reply.setAddress(replyTo);
doSend(tx, reply, direct, false);

View File

@ -464,11 +464,13 @@ public class ManagementServiceImpl implements ManagementService {
public synchronized void registerInRegistry(final String resourceName, final Object managedResource) {
unregisterFromRegistry(resourceName);
ActiveMQServerLogger.LOGGER.info("Registering: " + resourceName);
registry.put(resourceName, managedResource);
}
@Override
public synchronized void unregisterFromRegistry(final String resourceName) {
ActiveMQServerLogger.LOGGER.info("Unregistering: " + resourceName, new Exception());
registry.remove(resourceName);
}

View File

@ -52,9 +52,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
public static final boolean DEFAULT_AUTO_CREATE_QUEUES = true;
public static final boolean DEFAULT_AUTO_CREATE_JMS_QUEUES = true;
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
public static final boolean DEFAULT_AUTO_DELETE_JMS_QUEUES = true;
public static final boolean DEFAULT_AUTO_CREATE_TOPICS = true;
@ -166,7 +166,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues != null ? autoCreateJmsQueues : AddressSettings.DEFAULT_AUTO_CREATE_QUEUES;
return autoCreateJmsQueues != null ? autoCreateJmsQueues : AddressSettings.DEFAULT_AUTO_CREATE_JMS_QUEUES;
}
public AddressSettings setAutoCreateJmsQueues(final boolean autoCreateJmsQueues) {
@ -174,8 +174,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public boolean isAutoDeleteJmsQueues() {
return autoDeleteJmsQueues != null ? autoDeleteJmsQueues : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES;
public boolean getAutoDeleteJmsQueues() {
return autoDeleteJmsQueues != null ? autoDeleteJmsQueues : AddressSettings.DEFAULT_AUTO_DELETE_JMS_QUEUES;
}
public AddressSettings setAutoDeleteJmsQueues(final boolean autoDeleteJmsQueues) {
@ -193,7 +193,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
public boolean isAutoDeleteJmsTopics() {
return autoDeleteJmsTopics != null ? autoDeleteJmsTopics : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES;
return autoDeleteJmsTopics != null ? autoDeleteJmsTopics : AddressSettings.DEFAULT_AUTO_DELETE_TOPICS;
}
public AddressSettings setAutoDeleteJmsTopics(final boolean autoDeleteJmsTopics) {
@ -459,9 +459,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (autoDeleteJmsQueues == null) {
autoDeleteJmsQueues = merged.autoDeleteJmsQueues;
}
if (autoCreateJmsTopics == null) {
autoCreateJmsTopics = merged.autoCreateJmsTopics;
}
// if (autoCreateJmsTopics == null) {
// autoCreateJmsTopics = merged.autoCreateJmsTopics;
// }
if (autoDeleteJmsTopics == null) {
autoDeleteJmsTopics = merged.autoDeleteJmsTopics;
}
@ -532,7 +532,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer);
autoCreateJmsTopics = BufferHelper.readNullableBoolean(buffer);
// autoCreateJmsTopics = BufferHelper.readNullableBoolean(buffer);
autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer);
@ -565,7 +565,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) +
BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
// BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold);
@ -615,7 +615,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues);
BufferHelper.writeNullableBoolean(buffer, autoCreateJmsTopics);
// BufferHelper.writeNullableBoolean(buffer, autoCreateJmsTopics);
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics);
@ -652,7 +652,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
result = prime * result + ((autoCreateJmsTopics == null) ? 0 : autoCreateJmsTopics.hashCode());
// result = prime * result + ((autoCreateJmsTopics == null) ? 0 : autoCreateJmsTopics.hashCode());
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
@ -778,11 +778,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues))
return false;
if (autoCreateJmsTopics == null) {
if (other.autoCreateJmsTopics != null)
return false;
} else if (!autoCreateJmsTopics.equals(other.autoCreateJmsTopics))
return false;
// if (autoCreateJmsTopics == null) {
// if (other.autoCreateJmsTopics != null)
// return false;
// } else if (!autoCreateJmsTopics.equals(other.autoCreateJmsTopics))
// return false;
if (autoDeleteJmsTopics == null) {
if (other.autoDeleteJmsTopics != null)
return false;
@ -854,11 +854,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
slowConsumerPolicy +
", autoCreateJmsQueues=" +
autoCreateJmsQueues +
", autoDeleteJmsQueues=" +
autoDeleteJmsQueues +
", autoDeleteJmsQueues=" + autoDeleteJmsQueues +
", autoCreateJmsTopics=" +
autoCreateJmsTopics +
", autoDeleteJmsTopics=" +
// autoCreateJmsTopics +
// ", autoDeleteJmsTopics=" +
autoDeleteJmsTopics +
", managementBrowsePageSize=" +
managementBrowsePageSize +

View File

@ -170,7 +170,7 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
"</security-setting>" + "\n" +
"</security-settings>" + "\n" +
"<address-settings>" + "\n" +
"<address-setting match=\"#\">" + "\n" + "<dead-letter-address>jms.queue.DLQ\n</dead-letter-address>" + "\n" + "<expiry-address>jms.queue.ExpiryQueue\n</expiry-address>" + "\n" + "<redelivery-delay>0\n</redelivery-delay>" + "\n" + "<max-size-bytes>10485760\n</max-size-bytes>" + "\n" + "<message-counter-history-day-limit>10</message-counter-history-day-limit>" + "\n" + "<address-full-policy>BLOCK</address-full-policy>" + "\n" +
"<address-setting match=\"#\">" + "\n" + "<dead-letter-address>DLQ\n</dead-letter-address>" + "\n" + "<expiry-address>ExpiryQueue\n</expiry-address>" + "\n" + "<redelivery-delay>0\n</redelivery-delay>" + "\n" + "<max-size-bytes>10485760\n</max-size-bytes>" + "\n" + "<message-counter-history-day-limit>10</message-counter-history-day-limit>" + "\n" + "<address-full-policy>BLOCK</address-full-policy>" + "\n" +
"</address-setting>" + "\n" +
"</address-settings>" + "\n";

View File

@ -299,7 +299,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues());
assertEquals(true, conf.getAddressesSettings().get("a1").getAutoDeleteJmsQueues());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsTopics());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsTopics());
@ -314,7 +314,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
assertEquals(false, conf.getAddressesSettings().get("a2").getAutoDeleteJmsQueues());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsTopics());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsTopics());

View File

@ -89,7 +89,7 @@ public class WrongRoleFileConfigurationParserTest extends ActiveMQTestBase {
"</security-setting>" + "\n" +
"</security-settings>" + "\n" +
"<address-settings>" + "\n" +
"<address-setting match=\"#\">" + "\n" + "<dead-letter-address>jms.queue.DLQ\n</dead-letter-address>" + "\n" + "<expiry-address>jms.queue.ExpiryQueue\n</expiry-address>" + "\n" + "<redelivery-delay>0\n</redelivery-delay>" + "\n" + "<max-size-bytes>10485760\n</max-size-bytes>" + "\n" + "<message-counter-history-day-limit>10</message-counter-history-day-limit>" + "\n" + "<address-full-policy>BLOCK</address-full-policy>" + "\n" +
"<address-setting match=\"#\">" + "\n" + "<dead-letter-address>DLQ\n</dead-letter-address>" + "\n" + "<expiry-address>ExpiryQueue\n</expiry-address>" + "\n" + "<redelivery-delay>0\n</redelivery-delay>" + "\n" + "<max-size-bytes>10485760\n</max-size-bytes>" + "\n" + "<message-counter-history-day-limit>10</message-counter-history-day-limit>" + "\n" + "<address-full-policy>BLOCK</address-full-policy>" + "\n" +
"</address-setting>" + "\n" +
"</address-settings>" + "\n" +
"</configuration>";

View File

@ -21,6 +21,7 @@ import org.junit.Test;
import static org.apache.activemq.artemis.core.security.CheckType.BROWSE;
import static org.apache.activemq.artemis.core.security.CheckType.CONSUME;
import static org.apache.activemq.artemis.core.security.CheckType.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.security.CheckType.CREATE_DURABLE_QUEUE;
import static org.apache.activemq.artemis.core.security.CheckType.CREATE_NON_DURABLE_QUEUE;
import static org.apache.activemq.artemis.core.security.CheckType.DELETE_DURABLE_QUEUE;
@ -41,7 +42,7 @@ public class RoleTest extends Assert {
@Test
public void testWriteRole() throws Exception {
Role role = new Role("testWriteRole", true, false, false, false, false, false, false, false);
Role role = new Role("testWriteRole", true, false, false, false, false, false, false, false, false);
Assert.assertTrue(SEND.hasRole(role));
Assert.assertFalse(CONSUME.hasRole(role));
Assert.assertFalse(CREATE_DURABLE_QUEUE.hasRole(role));
@ -50,11 +51,12 @@ public class RoleTest extends Assert {
Assert.assertFalse(DELETE_NON_DURABLE_QUEUE.hasRole(role));
Assert.assertFalse(MANAGE.hasRole(role));
Assert.assertFalse(BROWSE.hasRole(role));
Assert.assertFalse(CREATE_ADDRESS.hasRole(role));
}
@Test
public void testReadRole() throws Exception {
Role role = new Role("testReadRole", false, true, false, false, false, false, false, true);
Role role = new Role("testReadRole", false, true, false, false, false, false, false, true, false);
Assert.assertFalse(SEND.hasRole(role));
Assert.assertTrue(CONSUME.hasRole(role));
Assert.assertFalse(CREATE_DURABLE_QUEUE.hasRole(role));
@ -63,11 +65,12 @@ public class RoleTest extends Assert {
Assert.assertFalse(DELETE_NON_DURABLE_QUEUE.hasRole(role));
Assert.assertFalse(MANAGE.hasRole(role));
Assert.assertTrue(BROWSE.hasRole(role));
Assert.assertFalse(CREATE_ADDRESS.hasRole(role));
}
@Test
public void testCreateRole() throws Exception {
Role role = new Role("testCreateRole", false, false, true, false, false, false, false, false);
Role role = new Role("testCreateRole", false, false, true, false, false, false, false, false, false);
Assert.assertFalse(SEND.hasRole(role));
Assert.assertFalse(CONSUME.hasRole(role));
Assert.assertTrue(CREATE_DURABLE_QUEUE.hasRole(role));
@ -76,11 +79,12 @@ public class RoleTest extends Assert {
Assert.assertFalse(DELETE_NON_DURABLE_QUEUE.hasRole(role));
Assert.assertFalse(MANAGE.hasRole(role));
Assert.assertFalse(BROWSE.hasRole(role));
Assert.assertFalse(CREATE_ADDRESS.hasRole(role));
}
@Test
public void testManageRole() throws Exception {
Role role = new Role("testManageRole", false, false, false, false, false, false, true, false);
Role role = new Role("testManageRole", false, false, false, false, false, false, true, false, false);
Assert.assertFalse(SEND.hasRole(role));
Assert.assertFalse(CONSUME.hasRole(role));
Assert.assertFalse(CREATE_DURABLE_QUEUE.hasRole(role));
@ -89,16 +93,17 @@ public class RoleTest extends Assert {
Assert.assertFalse(DELETE_NON_DURABLE_QUEUE.hasRole(role));
Assert.assertTrue(MANAGE.hasRole(role));
Assert.assertFalse(BROWSE.hasRole(role));
Assert.assertFalse(CREATE_ADDRESS.hasRole(role));
}
@Test
public void testEqualsAndHashcode() throws Exception {
Role role = new Role("testEquals", true, true, true, false, false, false, false, false);
Role sameRole = new Role("testEquals", true, true, true, false, false, false, false, false);
Role roleWithDifferentName = new Role("notEquals", true, true, true, false, false, false, false, false);
Role roleWithDifferentRead = new Role("testEquals", false, true, true, false, false, false, false, false);
Role roleWithDifferentWrite = new Role("testEquals", true, false, true, false, false, false, false, false);
Role roleWithDifferentCreate = new Role("testEquals", true, true, false, false, false, false, false, false);
Role role = new Role("testEquals", true, true, true, false, false, false, false, false, false);
Role sameRole = new Role("testEquals", true, true, true, false, false, false, false, false, false);
Role roleWithDifferentName = new Role("notEquals", true, true, true, false, false, false, false, false, false);
Role roleWithDifferentRead = new Role("testEquals", false, true, true, false, false, false, false, false, false);
Role roleWithDifferentWrite = new Role("testEquals", true, false, true, false, false, false, false, false, false);
Role roleWithDifferentCreate = new Role("testEquals", true, true, false, false, false, false, false, false, false);
Assert.assertTrue(role.equals(role));

View File

@ -39,10 +39,10 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_TOPICS, addressSettings.isAutoCreateJmsTopics());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_TOPICS, addressSettings.isAutoDeleteJmsTopics());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_JMS_QUEUES, addressSettings.isAutoCreateJmsQueues());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_JMS_QUEUES, addressSettings.getAutoDeleteJmsQueues());
// Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_TOPICS, addressSettings.isAutoCreateJmsTopics());
// Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_TOPICS, addressSettings.isAutoDeleteJmsTopics());
}
@Test

View File

@ -72,13 +72,13 @@ public class RepositoryTest extends ActiveMQTestBase {
public void testSingletwo() {
securityRepository.addMatch("queues.another.aq.*", new HashSet<Role>());
HashSet<Role> roles = new HashSet<>(2);
roles.add(new Role("test1", true, true, true, true, true, true, true, true));
roles.add(new Role("test2", true, true, true, true, true, true, true, true));
roles.add(new Role("test1", true, true, true, true, true, true, true, true, true));
roles.add(new Role("test2", true, true, true, true, true, true, true, true, true));
securityRepository.addMatch("queues.aq", roles);
HashSet<Role> roles2 = new HashSet<>(2);
roles2.add(new Role("test1", true, true, true, true, true, true, true, true));
roles2.add(new Role("test2", true, true, true, true, true, true, true, true));
roles2.add(new Role("test3", true, true, true, true, true, true, true, true));
roles2.add(new Role("test1", true, true, true, true, true, true, true, true, true));
roles2.add(new Role("test2", true, true, true, true, true, true, true, true, true));
roles2.add(new Role("test3", true, true, true, true, true, true, true, true, true));
securityRepository.addMatch("queues.another.andanother", roles2);
HashSet<Role> hashSet = securityRepository.getMatch("queues.another.andanother");
@ -89,8 +89,8 @@ public class RepositoryTest extends ActiveMQTestBase {
public void testWithoutWildcard() {
securityRepository.addMatch("queues.1.*", new HashSet<Role>());
HashSet<Role> roles = new HashSet<>(2);
roles.add(new Role("test1", true, true, true, true, true, true, true, true));
roles.add(new Role("test2", true, true, true, true, true, true, true, true));
roles.add(new Role("test1", true, true, true, true, true, true, true, true, true));
roles.add(new Role("test2", true, true, true, true, true, true, true, true, true));
securityRepository.addMatch("queues.2.aq", roles);
HashSet<Role> hashSet = securityRepository.getMatch("queues.2.aq");
Assert.assertEquals(hashSet.size(), 2);

View File

@ -81,7 +81,7 @@ public class ManagementExample {
// Step 13. Use a helper class to fill the JMS message with management information:
// * the name of the resource to manage
// * in this case, we want to retrieve the value of the messageCount of the queue
JMSManagementHelper.putAttribute(m, "jms.queue.exampleQueue", "messageCount");
JMSManagementHelper.putAttribute(m, "exampleQueue", "messageCount");
// Step 14. Use the requestor to send the request and wait for the reply
Message reply = requestor.request(m);
@ -97,7 +97,7 @@ public class ManagementExample {
// * the object name of the resource to manage (i.e. the queue)
// * in this case, we want to call the "removeMessage" operation with the JMS MessageID
// of the message sent to the queue in step 8.
JMSManagementHelper.putOperationInvocation(m, "jms.queue.exampleQueue", "removeMessage", message.getJMSMessageID());
JMSManagementHelper.putOperationInvocation(m, "exampleQueue", "removeMessage", message.getJMSMessageID());
// Step 18 Use the requestor to send the request and wait for the reply
reply = requestor.request(m);

View File

@ -113,7 +113,7 @@ public class PreacknowledgeExample {
Message m = session.createMessage();
JMSManagementHelper.putAttribute(m, "jms.queue.exampleQueue", "messageCount");
JMSManagementHelper.putAttribute(m, "exampleQueue", "messageCount");
Message response = requestor.request(m);

View File

@ -23,7 +23,7 @@ public class PostOrder {
public static void main(String[] args) throws Exception {
// first get the create URL for the shipping queue
ClientRequest request = new ClientRequest("http://localhost:8080/queues/jms.queue.orders");
ClientRequest request = new ClientRequest("http://localhost:8080/queues/orders");
ClientResponse res = request.head();
Link create = res.getHeaderAsLink("msg-create");

View File

@ -26,7 +26,7 @@ public class PostOrderWithId {
throw new RuntimeException("You must pass in a parameter");
// first get the create URL for the shipping queue
ClientRequest request = new ClientRequest("http://localhost:8080/queues/jms.queue.orders");
ClientRequest request = new ClientRequest("http://localhost:8080/queues/orders");
ClientResponse res = request.head();
Link create = res.getHeaderAsLink("msg-create-with-id");

View File

@ -23,7 +23,7 @@ public class ReceiveOrder {
public static void main(String[] args) throws Exception {
// first get the create URL for the shipping queue
ClientRequest request = new ClientRequest("http://localhost:8080/queues/jms.queue.orders");
ClientRequest request = new ClientRequest("http://localhost:8080/queues/orders");
ClientResponse res = request.head();
Link pullConsumers = res.getHeaderAsLink("msg-pull-consumers");
res.releaseConnection();

View File

@ -31,7 +31,7 @@ public class AutoAckTopicTest {
//todo fix
//@Test
public void testSuccessFirst() throws Exception {
ClientRequest request = new ClientRequest("http://localhost:8080/topics/jms.topic.chat");
ClientRequest request = new ClientRequest("http://localhost:8080/topics/chat");
ClientResponse response = request.head();
Assert.assertEquals("*****", 200, response.getStatus());

View File

@ -32,7 +32,7 @@ public class JmsReceive {
public static void main(String[] args) throws Exception {
System.out.println("Receive Setup...");
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
Destination destination = ActiveMQDestination.fromPrefixedName("queue://orders");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -29,7 +29,7 @@ public class JmsSend {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
Destination destination = ActiveMQDestination.fromPrefixedName("queue://orders");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -23,7 +23,7 @@ public class RestReceive {
public static void main(String[] args) throws Exception {
// first get the create URL for the shipping queue
ClientRequest request = new ClientRequest("http://localhost:8080/queues/jms.queue.orders");
ClientRequest request = new ClientRequest("http://localhost:8080/queues/orders");
ClientResponse res = request.head();
Link pullConsumers = res.getHeaderAsLink("msg-pull-consumers");
res = pullConsumers.request().formParameter("autoAck", "false").post();

View File

@ -23,7 +23,7 @@ public class RestSend {
public static void main(String[] args) throws Exception {
// first get the create URL for the shipping queue
ClientRequest request = new ClientRequest("http://localhost:8080/queues/jms.queue.orders");
ClientRequest request = new ClientRequest("http://localhost:8080/queues/orders");
ClientResponse res = request.head();
Link create = res.getHeaderAsLink("msg-create");

View File

@ -29,7 +29,7 @@ public class PostOrder {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
Destination destination = ActiveMQDestination.fromPrefixedName("queue://orders");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -25,7 +25,7 @@ public class PushReg {
public static void main(String[] args) throws Exception {
// get the push consumers factory resource
ClientRequest request = new ClientRequest("http://localhost:8080/queues/jms.queue.orders");
ClientRequest request = new ClientRequest("http://localhost:8080/queues/orders");
ClientResponse res = request.head();
Link pushConsumers = res.getHeaderAsLink("msg-push-consumers");
@ -33,7 +33,7 @@ public class PushReg {
// Really, just create a link with the shipping URL and the type you want posted
PushRegistration reg = new PushRegistration();
XmlLink target = new XmlLink();
target.setHref("http://localhost:8080/queues/jms.queue.shipping");
target.setHref("http://localhost:8080/queues/shipping");
target.setType("application/xml");
target.setRelationship("destination");
reg.setTarget(target);

View File

@ -31,7 +31,7 @@ public class ReceiveShipping {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.shipping");
Destination destination = ActiveMQDestination.fromPrefixedName("queue://shipping");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -107,7 +107,7 @@ public class ProtonCPPExample {
Message m = session.createMessage();
JMSManagementHelper.putAttribute(m, "jms.queue.exampleQueue", "messageCount");
JMSManagementHelper.putAttribute(m, "exampleQueue", "messageCount");
Message response = requestor.request(m);

View File

@ -41,7 +41,7 @@ public class AMQPQueueExample {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 3. Create a sender
Queue queue = session.createQueue("jms.queue.exampleQueue");
Queue queue = session.createQueue("exampleQueue");
MessageProducer sender = session.createProducer(queue);
// Step 4. send a few simple message

View File

@ -70,7 +70,7 @@ public class StompDualAuthenticationExample {
// jms.queue.exampleQueue address with a text body
String text = "Hello, world from Stomp!";
String message = "SEND\n" +
"destination: jms.queue.exampleQueue\n" +
"destination: exampleQueue\n" +
"\n" +
text +
END_OF_FRAME;

View File

@ -52,7 +52,7 @@ public class StompEmbeddedWithInterceptorExample {
// jms.queue.exampleQueue address with a text body
String text = "Hello World from Stomp 1.2 !";
String message = "SEND\n" +
"destination:jms.queue.exampleQueue" +
"destination:exampleQueue" +
"\n" +
text +
END_OF_FRAME;

View File

@ -32,9 +32,7 @@ public class StompExample {
public static void main(final String[] args) throws Exception {
StompJmsConnectionFactory factory = new StompJmsConnectionFactory();
factory.setQueuePrefix("jms.queue.");
factory.setDisconnectTimeout(5000);
factory.setTopicPrefix("jms.topic.");
factory.setBrokerURI("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -59,7 +59,7 @@ public class StompExample {
// jms.queue.exampleQueue address with a text body
String text = "Hello, world from Stomp!";
String message = "SEND\n" +
"destination: jms.queue.exampleQueue\n" +
"destination: exampleQueue\n" +
"\n" +
text +
END_OF_FRAME;

View File

@ -62,7 +62,7 @@ public class StompExample {
// jms.queue.exampleQueue address with a text body
String text = "Hello World from Stomp 1.1 !";
String message = "SEND\n" +
"destination:jms.queue.exampleQueue\n" +
"destination:exampleQueue\n" +
"\n" +
text +
END_OF_FRAME;

View File

@ -62,7 +62,7 @@ public class StompExample {
// jms.queue.exampleQueue address with a text body
String text = "Hello World from Stomp 1.2 !";
String message = "SEND\n" +
"destination:jms.queue.exampleQueue\n" +
"destination:exampleQueue\n" +
"\n" +
text +
END_OF_FRAME;

View File

@ -110,7 +110,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>

View File

@ -81,13 +81,13 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
translatePolicyMap(serverConfig, policyMap);
}
String match = "jms.queue.#";
String match = "#";
AddressSettings commonSettings = addressSettingsMap.get(match);
if (commonSettings == null) {
commonSettings = new AddressSettings();
addressSettingsMap.put(match, commonSettings);
}
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
SimpleString dla = new SimpleString("ActiveMQ.DLQ");
commonSettings.setDeadLetterAddress(dla);
commonSettings.setAutoCreateJmsQueues(true);
@ -222,11 +222,11 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest) {
String physicalName = dest.getPhysicalName();
String pattern = physicalName.replace(">", "#");
if (dest.isTopic()) {
pattern = "jms.topic." + pattern;
} else {
pattern = "jms.queue." + pattern;
}
// if (dest.isTopic()) {
// pattern = pattern;
// } else {
// pattern = pattern;
// }
return pattern;
}
@ -248,7 +248,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
synchronized (testQueues) {
SimpleString coreQ = testQueues.get(qname);
if (coreQ == null) {
coreQ = new SimpleString("jms.queue." + qname);
coreQ = new SimpleString(qname);
try {
this.server.createQueue(coreQ, coreQ, null, false, false);
testQueues.put(qname, coreQ);
@ -266,9 +266,9 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
long count = 0;
String qname = null;
if (amq5Dest.isTemporary()) {
qname = "jms.tempqueue." + amq5Dest.getPhysicalName();
qname = amq5Dest.getPhysicalName();
} else {
qname = "jms.queue." + amq5Dest.getPhysicalName();
qname = amq5Dest.getPhysicalName();
}
Binding binding = server.getPostOffice().getBinding(new SimpleString(qname));
if (binding != null) {

Some files were not shown because too many files have changed in this diff Show More