ARTEMIS-322 auto-create/delete JMS topic
Implements a new feature for the broker whereby it may automatically create and delete JMS topics which are not explicitly defined through the management API or file-based configuration. A JMS topic is created in response to a sent message or connected subscriber. The topic may subsequently be deleted when it no longer has any subscribers. Auto-creation and auto-deletion can both be turned on/off via address-setting.
This commit is contained in:
parent
25316e4232
commit
e53649a6b9
|
@ -64,6 +64,12 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
||||||
* queue, <code>false</code> else.
|
* queue, <code>false</code> else.
|
||||||
*/
|
*/
|
||||||
boolean isAutoCreateJmsQueues();
|
boolean isAutoCreateJmsQueues();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns <code>true</code> if auto-creation for this address is enabled and if the address queried is for a JMS
|
||||||
|
* topic, <code>false</code> else.
|
||||||
|
*/
|
||||||
|
boolean isAutoCreateJmsTopics();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -656,7 +656,9 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
|
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
|
||||||
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
|
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
|
||||||
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
|
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
|
||||||
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception;
|
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
|
||||||
|
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
|
||||||
|
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
|
||||||
|
|
||||||
void removeAddressSettings(String addressMatch) throws Exception;
|
void removeAddressSettings(String addressMatch) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -57,11 +57,15 @@ public final class AddressSettingsInfo {
|
||||||
|
|
||||||
private final boolean autoDeleteJmsQueues;
|
private final boolean autoDeleteJmsQueues;
|
||||||
|
|
||||||
|
private final boolean autoCreateJmsTopics;
|
||||||
|
|
||||||
|
private final boolean autoDeleteJmsTopics;
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
// Static --------------------------------------------------------
|
||||||
|
|
||||||
public static AddressSettingsInfo from(final String jsonString) throws Exception {
|
public static AddressSettingsInfo from(final String jsonString) throws Exception {
|
||||||
JSONObject object = new JSONObject(jsonString);
|
JSONObject object = new JSONObject(jsonString);
|
||||||
return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getLong("maxSizeBytes"), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getLong("redeliveryDelay"), object.getDouble("redeliveryMultiplier"), object.getLong("maxRedeliveryDelay"), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues"));
|
return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getLong("maxSizeBytes"), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getLong("redeliveryDelay"), object.getDouble("redeliveryMultiplier"), object.getLong("maxRedeliveryDelay"), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues"), object.getBoolean("autoCreateJmsTopics"), object.getBoolean("autoDeleteJmsTopics"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
// Constructors --------------------------------------------------
|
||||||
|
@ -83,7 +87,9 @@ public final class AddressSettingsInfo {
|
||||||
long slowConsumerCheckPeriod,
|
long slowConsumerCheckPeriod,
|
||||||
String slowConsumerPolicy,
|
String slowConsumerPolicy,
|
||||||
boolean autoCreateJmsQueues,
|
boolean autoCreateJmsQueues,
|
||||||
boolean autoDeleteJmsQueues) {
|
boolean autoDeleteJmsQueues,
|
||||||
|
boolean autoCreateJmsTopics,
|
||||||
|
boolean autoDeleteJmsTopics) {
|
||||||
this.addressFullMessagePolicy = addressFullMessagePolicy;
|
this.addressFullMessagePolicy = addressFullMessagePolicy;
|
||||||
this.maxSizeBytes = maxSizeBytes;
|
this.maxSizeBytes = maxSizeBytes;
|
||||||
this.pageSizeBytes = pageSizeBytes;
|
this.pageSizeBytes = pageSizeBytes;
|
||||||
|
@ -102,6 +108,8 @@ public final class AddressSettingsInfo {
|
||||||
this.slowConsumerPolicy = slowConsumerPolicy;
|
this.slowConsumerPolicy = slowConsumerPolicy;
|
||||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||||
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
|
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
|
||||||
|
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||||
|
this.autoDeleteJmsTopics = autoDeleteJmsTopics;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
@ -181,5 +189,13 @@ public final class AddressSettingsInfo {
|
||||||
public boolean isAutoDeleteJmsQueues() {
|
public boolean isAutoDeleteJmsQueues() {
|
||||||
return autoDeleteJmsQueues;
|
return autoDeleteJmsQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAutoCreateJmsTopics() {
|
||||||
|
return autoCreateJmsTopics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAutoDeleteJmsTopics() {
|
||||||
|
return autoDeleteJmsTopics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,12 +30,16 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
|
||||||
|
|
||||||
private final boolean autoCreateJmsQueues;
|
private final boolean autoCreateJmsQueues;
|
||||||
|
|
||||||
|
private final boolean autoCreateJmsTopics;
|
||||||
|
|
||||||
public AddressQueryImpl(final boolean exists,
|
public AddressQueryImpl(final boolean exists,
|
||||||
final List<SimpleString> queueNames,
|
final List<SimpleString> queueNames,
|
||||||
final boolean autoCreateJmsQueues) {
|
final boolean autoCreateJmsQueues,
|
||||||
|
final boolean autoCreateJmsTopics) {
|
||||||
this.exists = exists;
|
this.exists = exists;
|
||||||
this.queueNames = new ArrayList<>(queueNames);
|
this.queueNames = new ArrayList<>(queueNames);
|
||||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||||
|
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -52,4 +56,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
|
||||||
public boolean isAutoCreateJmsQueues() {
|
public boolean isAutoCreateJmsQueues() {
|
||||||
return autoCreateJmsQueues;
|
return autoCreateJmsQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAutoCreateJmsTopics() {
|
||||||
|
return autoCreateJmsTopics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,18 +49,18 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||||
|
@ -283,9 +283,9 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
|
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
|
||||||
SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
|
SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);
|
||||||
|
|
||||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues());
|
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -164,6 +164,8 @@ public final class ChannelImpl implements Channel {
|
||||||
return version >= 126;
|
return version >= 126;
|
||||||
case PacketImpl.SESS_BINDINGQUERY_RESP_V2:
|
case PacketImpl.SESS_BINDINGQUERY_RESP_V2:
|
||||||
return version >= 126;
|
return version >= 126;
|
||||||
|
case PacketImpl.SESS_BINDINGQUERY_RESP_V3:
|
||||||
|
return version >= 127;
|
||||||
default:
|
default:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
|
||||||
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
|
||||||
|
@ -110,6 +111,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAdd
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||||
|
@ -257,6 +259,10 @@ public abstract class PacketDecoder implements Serializable {
|
||||||
packet = new SessionBindingQueryResponseMessage_V2();
|
packet = new SessionBindingQueryResponseMessage_V2();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case SESS_BINDINGQUERY_RESP_V3: {
|
||||||
|
packet = new SessionBindingQueryResponseMessage_V3();
|
||||||
|
break;
|
||||||
|
}
|
||||||
case SESS_XA_START: {
|
case SESS_XA_START: {
|
||||||
packet = new SessionXAStartMessage();
|
packet = new SessionXAStartMessage();
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -245,6 +245,8 @@ public class PacketImpl implements Packet {
|
||||||
|
|
||||||
public static final byte REPLICATION_RESPONSE_V2 = -9;
|
public static final byte REPLICATION_RESPONSE_V2 = -9;
|
||||||
|
|
||||||
|
public static final byte SESS_BINDINGQUERY_RESP_V3 = -10;
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
// Static --------------------------------------------------------
|
||||||
|
|
||||||
public PacketImpl(final byte type) {
|
public PacketImpl(final byte type) {
|
||||||
|
|
|
@ -44,8 +44,8 @@ public class SessionBindingQueryResponseMessage extends PacketImpl {
|
||||||
super(SESS_BINDINGQUERY_RESP);
|
super(SESS_BINDINGQUERY_RESP);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SessionBindingQueryResponseMessage(byte v2) {
|
public SessionBindingQueryResponseMessage(byte v) {
|
||||||
super(v2);
|
super(v);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
|
||||||
public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage {
|
public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage {
|
||||||
|
|
||||||
private boolean autoCreateJmsQueues;
|
protected boolean autoCreateJmsQueues;
|
||||||
|
|
||||||
public SessionBindingQueryResponseMessage_V2(final boolean exists,
|
public SessionBindingQueryResponseMessage_V2(final boolean exists,
|
||||||
final List<SimpleString> queueNames,
|
final List<SimpleString> queueNames,
|
||||||
|
@ -41,6 +41,10 @@ public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryRe
|
||||||
super(SESS_BINDINGQUERY_RESP_V2);
|
super(SESS_BINDINGQUERY_RESP_V2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SessionBindingQueryResponseMessage_V2(byte v) {
|
||||||
|
super(v);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isAutoCreateJmsQueues() {
|
public boolean isAutoCreateJmsQueues() {
|
||||||
return autoCreateJmsQueues;
|
return autoCreateJmsQueues;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,95 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
|
||||||
|
public class SessionBindingQueryResponseMessage_V3 extends SessionBindingQueryResponseMessage_V2 {
|
||||||
|
|
||||||
|
private boolean autoCreateJmsTopics;
|
||||||
|
|
||||||
|
public SessionBindingQueryResponseMessage_V3(final boolean exists,
|
||||||
|
final List<SimpleString> queueNames,
|
||||||
|
final boolean autoCreateJmsQueues,
|
||||||
|
final boolean autoCreateJmsTopics) {
|
||||||
|
super(SESS_BINDINGQUERY_RESP_V3);
|
||||||
|
|
||||||
|
this.exists = exists;
|
||||||
|
|
||||||
|
this.queueNames = queueNames;
|
||||||
|
|
||||||
|
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||||
|
|
||||||
|
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SessionBindingQueryResponseMessage_V3() {
|
||||||
|
super(SESS_BINDINGQUERY_RESP_V3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAutoCreateJmsTopics() {
|
||||||
|
return autoCreateJmsTopics;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||||
|
super.encodeRest(buffer);
|
||||||
|
buffer.writeBoolean(autoCreateJmsTopics);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||||
|
super.decodeRest(buffer);
|
||||||
|
autoCreateJmsTopics = buffer.readBoolean();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = super.hashCode();
|
||||||
|
result = prime * result + (autoCreateJmsTopics ? 1231 : 1237);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuffer buff = new StringBuffer(getParentString());
|
||||||
|
buff.append(", exists=" + exists);
|
||||||
|
buff.append(", queueNames=" + queueNames);
|
||||||
|
buff.append(", autoCreateJmsQueues=" + autoCreateJmsQueues);
|
||||||
|
buff.append(", autoCreateJmsTopics=" + autoCreateJmsTopics);
|
||||||
|
buff.append("]");
|
||||||
|
return buff.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj)
|
||||||
|
return true;
|
||||||
|
if (!super.equals(obj))
|
||||||
|
return false;
|
||||||
|
if (!(obj instanceof SessionBindingQueryResponseMessage_V3))
|
||||||
|
return false;
|
||||||
|
SessionBindingQueryResponseMessage_V3 other = (SessionBindingQueryResponseMessage_V3) obj;
|
||||||
|
if (autoCreateJmsTopics != other.autoCreateJmsTopics)
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
|
||||||
activemq.version.microVersion=${activemq.version.microVersion}
|
activemq.version.microVersion=${activemq.version.microVersion}
|
||||||
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
||||||
activemq.version.versionTag=${activemq.version.versionTag}
|
activemq.version.versionTag=${activemq.version.versionTag}
|
||||||
activemq.version.compatibleVersionList=121,122,123,124,125,126
|
activemq.version.compatibleVersionList=121,122,123,124,125,126,127
|
||||||
|
|
|
@ -405,7 +405,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
||||||
|
|
||||||
// if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
|
// if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
|
||||||
// as that's a more efficient path for such operation
|
// as that's a more efficient path for such operation
|
||||||
if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
|
if (!query.isExists() && ((address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !query.isAutoCreateJmsQueues()) || (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !query.isAutoCreateJmsTopics()))) {
|
||||||
throw new InvalidDestinationException("Destination " + address + " does not exist");
|
throw new InvalidDestinationException("Destination " + address + " does not exist");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -299,7 +299,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
if (jbd != null) {
|
if (jbd != null) {
|
||||||
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
|
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
|
||||||
|
|
||||||
if (!response.isExists() && !response.isAutoCreateJmsQueues()) {
|
if (!response.isExists() && ((jbd.getAddress().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !response.isAutoCreateJmsQueues()) || (jbd.getAddress().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !response.isAutoCreateJmsTopics()))) {
|
||||||
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -659,7 +659,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
else {
|
else {
|
||||||
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
|
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
|
||||||
|
|
||||||
if (!response.isExists()) {
|
if (!response.isExists() && !response.isAutoCreateJmsTopics()) {
|
||||||
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
|
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1106,7 +1106,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
|
|
||||||
AddressQuery query = session.addressQuery(topic.getSimpleAddress());
|
AddressQuery query = session.addressQuery(topic.getSimpleAddress());
|
||||||
|
|
||||||
if (!query.isExists()) {
|
if (!query.isExists() && !query.isAutoCreateJmsTopics()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -76,6 +76,17 @@ public interface JMSServerManager extends ActiveMQComponent {
|
||||||
*/
|
*/
|
||||||
boolean createTopic(boolean storeConfig, String topicName, String... bindings) throws Exception;
|
boolean createTopic(boolean storeConfig, String topicName, String... bindings) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param storeConfig
|
||||||
|
* @param topicName
|
||||||
|
* @param autoCreated
|
||||||
|
* @param bindings
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
boolean createTopic(boolean storeConfig, String topicName, boolean autoCreated, String... bindings) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the topic from the Binding Registry or BindingRegistry.
|
* Remove the topic from the Binding Registry or BindingRegistry.
|
||||||
* Calling this method does <em>not</em> destroy the destination.
|
* Calling this method does <em>not</em> destroy the destination.
|
||||||
|
|
|
@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -48,6 +49,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
|
||||||
|
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueCreator;
|
import org.apache.activemq.artemis.core.server.QueueCreator;
|
||||||
import org.apache.activemq.artemis.core.server.QueueDeleter;
|
import org.apache.activemq.artemis.core.server.QueueDeleter;
|
||||||
|
@ -371,11 +374,15 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
server.setJMSQueueCreator(new JMSQueueCreator());
|
server.setJMSQueueCreator(new JMSDestinationCreator());
|
||||||
|
|
||||||
server.setJMSQueueDeleter(new JMSQueueDeleter());
|
server.setJMSQueueDeleter(new JMSQueueDeleter());
|
||||||
|
|
||||||
server.registerActivateCallback(this);
|
server.registerActivateCallback(this);
|
||||||
|
|
||||||
|
server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback());
|
||||||
|
|
||||||
|
server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback());
|
||||||
/**
|
/**
|
||||||
* See this method's javadoc.
|
* See this method's javadoc.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -523,10 +530,19 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean createTopic(final boolean storeConfig,
|
public synchronized boolean createTopic(final boolean storeConfig,
|
||||||
final String topicName,
|
final String topicName,
|
||||||
final String... bindings) throws Exception {
|
final String... bindings) throws Exception {
|
||||||
|
return createTopic(storeConfig, topicName, false, bindings);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean createTopic(final boolean storeConfig,
|
||||||
|
final String topicName,
|
||||||
|
final boolean autoCreated,
|
||||||
|
final String... bindings) throws Exception {
|
||||||
if (active && topics.get(topicName) != null) {
|
if (active && topics.get(topicName) != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -541,7 +557,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
public void runException() throws Exception {
|
public void runException() throws Exception {
|
||||||
checkBindings(bindings);
|
checkBindings(bindings);
|
||||||
|
|
||||||
if (internalCreateTopic(topicName)) {
|
if (internalCreateTopic(topicName, autoCreated)) {
|
||||||
ActiveMQDestination destination = topics.get(topicName);
|
ActiveMQDestination destination = topics.get(topicName);
|
||||||
|
|
||||||
if (destination == null) {
|
if (destination == null) {
|
||||||
|
@ -1082,6 +1098,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs the internal creation without activating any storage.
|
* Performs the internal creation without activating any storage.
|
||||||
* The storage load will call this method
|
* The storage load will call this method
|
||||||
|
@ -1091,6 +1109,10 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
private synchronized boolean internalCreateTopic(final String topicName) throws Exception {
|
private synchronized boolean internalCreateTopic(final String topicName) throws Exception {
|
||||||
|
return internalCreateTopic(topicName, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized boolean internalCreateTopic(final String topicName, final boolean autoCreated) throws Exception {
|
||||||
|
|
||||||
if (topics.get(topicName) != null) {
|
if (topics.get(topicName) != null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -1101,7 +1123,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
// checks when routing messages to a topic that
|
// checks when routing messages to a topic that
|
||||||
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
|
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
|
||||||
// subscriptions - core has no notion of a topic
|
// subscriptions - core has no notion of a topic
|
||||||
server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false);
|
server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
|
||||||
|
|
||||||
topics.put(topicName, activeMQTopic);
|
topics.put(topicName, activeMQTopic);
|
||||||
|
|
||||||
|
@ -1619,13 +1641,19 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class JMSQueueCreator implements QueueCreator {
|
/**
|
||||||
|
* This class is responsible for auto-creating the JMS (and underlying core) resources when a client sends a message
|
||||||
|
* to a non-existent JMS queue or topic
|
||||||
|
*/
|
||||||
|
class JMSDestinationCreator implements QueueCreator {
|
||||||
@Override
|
@Override
|
||||||
public boolean create(SimpleString address) throws Exception {
|
public boolean create(SimpleString address) throws Exception {
|
||||||
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
|
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||||
if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
|
if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
|
||||||
JMSServerManagerImpl.this.internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
|
return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
|
||||||
return true;
|
}
|
||||||
|
else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) {
|
||||||
|
return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -1635,8 +1663,64 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
||||||
|
|
||||||
class JMSQueueDeleter implements QueueDeleter {
|
class JMSQueueDeleter implements QueueDeleter {
|
||||||
@Override
|
@Override
|
||||||
public boolean delete(SimpleString address) throws Exception {
|
public boolean delete(SimpleString queueName) throws Exception {
|
||||||
return JMSServerManagerImpl.this.destroyQueue(address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false);
|
Queue queue = server.locateQueue(queueName);
|
||||||
|
SimpleString address = queue.getAddress();
|
||||||
|
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||||
|
long consumerCount = queue.getConsumerCount();
|
||||||
|
long messageCount = queue.getMessageCount();
|
||||||
|
|
||||||
|
if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoDeleteJmsQueues() && queue.getMessageCount() == 0) {
|
||||||
|
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
|
||||||
|
ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues());
|
||||||
|
}
|
||||||
|
|
||||||
|
return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When a core queue is created with a jms.topic prefix this class will create the associated JMS resources
|
||||||
|
* retroactively. This would happen if, for example, a client created a subscription a non-existent JMS topic and
|
||||||
|
* autoCreateJmsTopics = true.
|
||||||
|
*/
|
||||||
|
class JMSPostQueueCreationCallback implements PostQueueCreationCallback {
|
||||||
|
@Override
|
||||||
|
public void callback(SimpleString queueName) throws Exception {
|
||||||
|
Queue queue = server.locateQueue(queueName);
|
||||||
|
String address = queue.getAddress().toString();
|
||||||
|
|
||||||
|
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||||
|
/* When a topic is created a dummy subscription is created which never receives any messages; when the queue
|
||||||
|
* for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the
|
||||||
|
* queue name doesn't start with the topic prefix.
|
||||||
|
*/
|
||||||
|
if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
|
||||||
|
createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When a core queue representing a JMS topic subscription is deleted this class will check to see if that was the
|
||||||
|
* last subscription on the topic and if so and autoDeleteJmsTopics = true then it will delete the JMS resources
|
||||||
|
* for that topic.
|
||||||
|
*/
|
||||||
|
class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback {
|
||||||
|
@Override
|
||||||
|
public void callback(SimpleString address, SimpleString queueName) throws Exception {
|
||||||
|
Queue queue = server.locateQueue(address);
|
||||||
|
Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(address).getBindings();
|
||||||
|
|
||||||
|
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||||
|
|
||||||
|
if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) {
|
||||||
|
destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
|
||||||
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
|
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
|
||||||
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
|
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
|
||||||
|
|
||||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false);
|
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -801,11 +801,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
public void removeDestination(ActiveMQDestination dest) throws Exception {
|
public void removeDestination(ActiveMQDestination dest) throws Exception {
|
||||||
if (dest.isQueue()) {
|
if (dest.isQueue()) {
|
||||||
SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
|
server.destroyQueue(OpenWireUtil.toCoreAddress(dest));
|
||||||
server.destroyQueue(qName);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
|
Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest));
|
||||||
Iterator<Binding> iterator = bindings.getBindings().iterator();
|
Iterator<Binding> iterator = bindings.getBindings().iterator();
|
||||||
|
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
|
|
|
@ -81,7 +81,12 @@ public class AMQConsumer {
|
||||||
SimpleString address;
|
SimpleString address;
|
||||||
|
|
||||||
if (openwireDestination.isTopic()) {
|
if (openwireDestination.isTopic()) {
|
||||||
|
if (openwireDestination.isTemporary()) {
|
||||||
|
address = new SimpleString("jms.temptopic." + physicalName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
address = new SimpleString("jms.topic." + physicalName);
|
address = new SimpleString("jms.topic." + physicalName);
|
||||||
|
}
|
||||||
|
|
||||||
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
|
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
|
||||||
|
|
||||||
|
@ -90,7 +95,7 @@ public class AMQConsumer {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
|
SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
|
||||||
session.getCoreServer().getJMSQueueCreator().create(queueName);
|
session.getCoreServer().getJMSDestinationCreator().create(queueName);
|
||||||
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
|
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
|
||||||
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
|
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
|
||||||
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
|
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class AMQSession implements SessionCallback {
|
||||||
for (ActiveMQDestination openWireDest : dests) {
|
for (ActiveMQDestination openWireDest : dests) {
|
||||||
if (openWireDest.isQueue()) {
|
if (openWireDest.isQueue()) {
|
||||||
SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
|
SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
|
||||||
getCoreServer().getJMSQueueCreator().create(queueName);
|
getCoreServer().getJMSDestinationCreator().create(queueName);
|
||||||
}
|
}
|
||||||
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
|
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,11 @@ import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.command.XATransactionId;
|
import org.apache.activemq.command.XATransactionId;
|
||||||
import org.apache.activemq.util.ByteSequence;
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX;
|
||||||
|
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX;
|
||||||
|
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX;
|
||||||
|
import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
|
||||||
|
|
||||||
public class OpenWireUtil {
|
public class OpenWireUtil {
|
||||||
|
|
||||||
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
|
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
|
||||||
|
@ -39,10 +44,20 @@ public class OpenWireUtil {
|
||||||
|
|
||||||
public static SimpleString toCoreAddress(ActiveMQDestination dest) {
|
public static SimpleString toCoreAddress(ActiveMQDestination dest) {
|
||||||
if (dest.isQueue()) {
|
if (dest.isQueue()) {
|
||||||
return new SimpleString("jms.queue." + dest.getPhysicalName());
|
if (dest.isTemporary()) {
|
||||||
|
return new SimpleString(JMS_TEMP_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return new SimpleString("jms.topic." + dest.getPhysicalName());
|
return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (dest.isTemporary()) {
|
||||||
|
return new SimpleString(JMS_TEMP_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +69,7 @@ public class OpenWireUtil {
|
||||||
*/
|
*/
|
||||||
public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
|
public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
|
||||||
String address = message.getAddress().toString();
|
String address = message.getAddress().toString();
|
||||||
String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
|
String strippedAddress = address.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
|
||||||
if (actualDestination.isQueue()) {
|
if (actualDestination.isQueue()) {
|
||||||
return new ActiveMQQueue(strippedAddress);
|
return new ActiveMQQueue(strippedAddress);
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,13 +29,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
|
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
|
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.QueueCreator;
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
@ -227,30 +227,23 @@ public final class StompConnection implements RemotingConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkDestination(String destination) throws ActiveMQStompException {
|
public void checkDestination(String destination) throws ActiveMQStompException {
|
||||||
if (autoCreateQueueIfPossible(destination)) {
|
autoCreateDestinationIfPossible(destination);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!manager.destinationExists(destination)) {
|
if (!manager.destinationExists(destination)) {
|
||||||
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
|
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException {
|
public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException {
|
||||||
boolean autoCreated = false;
|
|
||||||
|
|
||||||
if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null) {
|
|
||||||
SimpleString queueName = new SimpleString(queue);
|
|
||||||
try {
|
try {
|
||||||
manager.getServer().createQueue(queueName, queueName, null, SimpleString.toSimpleString(this.getLogin()), true, false, true);
|
QueueCreator queueCreator = manager.getServer().getJMSDestinationCreator();
|
||||||
|
if (queueCreator != null) {
|
||||||
|
queueCreator.create(SimpleString.toSimpleString(queue));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
|
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
|
||||||
}
|
}
|
||||||
autoCreated = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return autoCreated;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -618,7 +611,7 @@ public final class StompConnection implements RemotingConnection {
|
||||||
String id,
|
String id,
|
||||||
String durableSubscriptionName,
|
String durableSubscriptionName,
|
||||||
boolean noLocal) throws ActiveMQStompException {
|
boolean noLocal) throws ActiveMQStompException {
|
||||||
autoCreateQueueIfPossible(destination);
|
autoCreateDestinationIfPossible(destination);
|
||||||
if (noLocal) {
|
if (noLocal) {
|
||||||
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
|
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
|
||||||
if (selector == null) {
|
if (selector == null) {
|
||||||
|
|
|
@ -163,6 +163,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
|
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
|
||||||
|
|
||||||
|
private static final String AUTO_CREATE_JMS_TOPICS = "auto-create-jms-topics";
|
||||||
|
|
||||||
|
private static final String AUTO_DELETE_JMS_TOPICS = "auto-delete-jms-topics";
|
||||||
|
|
||||||
private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size";
|
private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size";
|
||||||
|
|
||||||
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
||||||
|
@ -796,6 +800,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name)) {
|
else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name)) {
|
||||||
addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child));
|
addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child));
|
||||||
}
|
}
|
||||||
|
else if (AUTO_CREATE_JMS_TOPICS.equalsIgnoreCase(name)) {
|
||||||
|
addressSettings.setAutoCreateJmsTopics(XMLUtil.parseBoolean(child));
|
||||||
|
}
|
||||||
|
else if (AUTO_DELETE_JMS_TOPICS.equalsIgnoreCase(name)) {
|
||||||
|
addressSettings.setAutoDeleteJmsTopics(XMLUtil.parseBoolean(child));
|
||||||
|
}
|
||||||
else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
|
else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
|
||||||
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
|
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1519,6 +1519,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
settings.put("slowConsumerPolicy", policy);
|
settings.put("slowConsumerPolicy", policy);
|
||||||
settings.put("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues());
|
settings.put("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues());
|
||||||
settings.put("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues());
|
settings.put("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues());
|
||||||
|
settings.put("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics());
|
||||||
|
settings.put("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsTopics());
|
||||||
|
|
||||||
JSONObject jsonObject = new JSONObject(settings);
|
JSONObject jsonObject = new JSONObject(settings);
|
||||||
return jsonObject.toString();
|
return jsonObject.toString();
|
||||||
|
@ -1544,7 +1546,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
final long slowConsumerCheckPeriod,
|
final long slowConsumerCheckPeriod,
|
||||||
final String slowConsumerPolicy,
|
final String slowConsumerPolicy,
|
||||||
final boolean autoCreateJmsQueues,
|
final boolean autoCreateJmsQueues,
|
||||||
final boolean autoDeleteJmsQueues) throws Exception {
|
final boolean autoDeleteJmsQueues,
|
||||||
|
final boolean autoCreateJmsTopics,
|
||||||
|
final boolean autoDeleteJmsTopics) throws Exception {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
||||||
// JBPAPP-6334 requested this to be pageSizeBytes > maxSizeBytes
|
// JBPAPP-6334 requested this to be pageSizeBytes > maxSizeBytes
|
||||||
|
@ -1598,6 +1602,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
}
|
}
|
||||||
addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues);
|
addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues);
|
||||||
addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues);
|
addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues);
|
||||||
|
addressSettings.setAutoCreateJmsTopics(autoCreateJmsTopics);
|
||||||
|
addressSettings.setAutoDeleteJmsTopics(autoDeleteJmsTopics);
|
||||||
server.getAddressSettingsRepository().addMatch(address, addressSettings);
|
server.getAddressSettingsRepository().addMatch(address, addressSettings);
|
||||||
|
|
||||||
storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));
|
storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAdd
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
||||||
|
@ -263,7 +264,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
requiresResponse = true;
|
requiresResponse = true;
|
||||||
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
||||||
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
|
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
|
||||||
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
|
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
|
||||||
|
response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics());
|
||||||
|
}
|
||||||
|
else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
|
||||||
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
|
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -126,6 +126,36 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
||||||
*/
|
*/
|
||||||
void callActivationFailureListeners(Exception e);
|
void callActivationFailureListeners(Exception e);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callback {@link org.apache.activemq.artemis.core.server.PostQueueCreationCallback}
|
||||||
|
*/
|
||||||
|
void registerPostQueueCreationCallback(PostQueueCreationCallback callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callback {@link org.apache.activemq.artemis.core.server.PostQueueCreationCallback}
|
||||||
|
*/
|
||||||
|
void unregisterPostQueueCreationCallback(PostQueueCreationCallback callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param queueName
|
||||||
|
*/
|
||||||
|
void callPostQueueCreationCallbacks(SimpleString queueName) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callback {@link org.apache.activemq.artemis.core.server.PostQueueDeletionCallback}
|
||||||
|
*/
|
||||||
|
void registerPostQueueDeletionCallback(PostQueueDeletionCallback callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callback {@link org.apache.activemq.artemis.core.server.PostQueueDeletionCallback}
|
||||||
|
*/
|
||||||
|
void unregisterPostQueueDeletionCallback(PostQueueDeletionCallback callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param queueName
|
||||||
|
*/
|
||||||
|
void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
|
||||||
|
|
||||||
void checkQueueCreationLimit(String username) throws Exception;
|
void checkQueueCreationLimit(String username) throws Exception;
|
||||||
|
|
||||||
ServerSession createSession(String name,
|
ServerSession createSession(String name,
|
||||||
|
@ -196,7 +226,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
||||||
/**
|
/**
|
||||||
* @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)
|
* @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)
|
||||||
*/
|
*/
|
||||||
QueueCreator getJMSQueueCreator();
|
QueueCreator getJMSDestinationCreator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the queue deleter responsible for automatic JMS Queue deletions.
|
* This is the queue deleter responsible for automatic JMS Queue deletions.
|
||||||
|
|
|
@ -28,14 +28,19 @@ public class BindingQueryResult {
|
||||||
|
|
||||||
private boolean autoCreateJmsQueues;
|
private boolean autoCreateJmsQueues;
|
||||||
|
|
||||||
|
private boolean autoCreateJmsTopics;
|
||||||
|
|
||||||
public BindingQueryResult(final boolean exists,
|
public BindingQueryResult(final boolean exists,
|
||||||
final List<SimpleString> queueNames,
|
final List<SimpleString> queueNames,
|
||||||
final boolean autoCreateJmsQueues) {
|
final boolean autoCreateJmsQueues,
|
||||||
|
final boolean autoCreateJmsTopics) {
|
||||||
this.exists = exists;
|
this.exists = exists;
|
||||||
|
|
||||||
this.queueNames = queueNames;
|
this.queueNames = queueNames;
|
||||||
|
|
||||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||||
|
|
||||||
|
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isExists() {
|
public boolean isExists() {
|
||||||
|
@ -46,6 +51,10 @@ public class BindingQueryResult {
|
||||||
return autoCreateJmsQueues;
|
return autoCreateJmsQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAutoCreateJmsTopics() {
|
||||||
|
return autoCreateJmsTopics;
|
||||||
|
}
|
||||||
|
|
||||||
public List<SimpleString> getQueueNames() {
|
public List<SimpleString> getQueueNames() {
|
||||||
return queueNames;
|
return queueNames;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.server;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When a "core" queue is created this callback will be invoked
|
||||||
|
*/
|
||||||
|
public interface PostQueueCreationCallback {
|
||||||
|
|
||||||
|
void callback(SimpleString queueName) throws Exception;
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.server;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When a "core" queue is deleted this callback will be invoked
|
||||||
|
*/
|
||||||
|
public interface PostQueueDeletionCallback {
|
||||||
|
|
||||||
|
void callback(SimpleString address, SimpleString queueName) throws Exception;
|
||||||
|
}
|
|
@ -24,5 +24,5 @@ public interface QueueDeleter {
|
||||||
/**
|
/**
|
||||||
* @return True if a queue was deleted.
|
* @return True if a queue was deleted.
|
||||||
*/
|
*/
|
||||||
boolean delete(SimpleString address) throws Exception;
|
boolean delete(SimpleString queueName) throws Exception;
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,6 +106,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.MemoryManager;
|
import org.apache.activemq.artemis.core.server.MemoryManager;
|
||||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
|
||||||
|
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueCreator;
|
import org.apache.activemq.artemis.core.server.QueueCreator;
|
||||||
import org.apache.activemq.artemis.core.server.QueueDeleter;
|
import org.apache.activemq.artemis.core.server.QueueDeleter;
|
||||||
|
@ -261,6 +263,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
private final Set<ActivationFailureListener> activationFailureListeners = new ConcurrentHashSet<>();
|
private final Set<ActivationFailureListener> activationFailureListeners = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
|
private final Set<PostQueueCreationCallback> postQueueCreationCallbacks = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
|
private final Set<PostQueueDeletionCallback> postQueueDeletionCallbacks = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
private volatile GroupingHandler groupingHandler;
|
private volatile GroupingHandler groupingHandler;
|
||||||
|
|
||||||
private NodeManager nodeManager;
|
private NodeManager nodeManager;
|
||||||
|
@ -564,6 +570,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
|
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
|
||||||
|
boolean autoCreateJmsTopics = address.toString().startsWith(ResourceNames.JMS_TOPIC) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsTopics();
|
||||||
|
|
||||||
List<SimpleString> names = new ArrayList<>();
|
List<SimpleString> names = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -571,7 +578,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
ManagementService managementService = getManagementService();
|
ManagementService managementService = getManagementService();
|
||||||
if (managementService != null) {
|
if (managementService != null) {
|
||||||
if (address.equals(managementService.getManagementAddress())) {
|
if (address.equals(managementService.getManagementAddress())) {
|
||||||
return new BindingQueryResult(true, names, autoCreateJmsQueues);
|
return new BindingQueryResult(true, names, autoCreateJmsQueues, autoCreateJmsTopics);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -583,7 +590,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
|
return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues, autoCreateJmsTopics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -655,7 +662,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueCreator getJMSQueueCreator() {
|
public QueueCreator getJMSDestinationCreator() {
|
||||||
return jmsQueueCreator;
|
return jmsQueueCreator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1496,6 +1503,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
|
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SimpleString address = binding.getAddress();
|
||||||
|
|
||||||
Queue queue = (Queue) binding.getBindable();
|
Queue queue = (Queue) binding.getBindable();
|
||||||
|
|
||||||
// This check is only valid if checkConsumerCount == true
|
// This check is only valid if checkConsumerCount == true
|
||||||
|
@ -1507,14 +1516,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
if (queue.isDurable()) {
|
if (queue.isDurable()) {
|
||||||
// make sure the user has privileges to delete this queue
|
// make sure the user has privileges to delete this queue
|
||||||
securityStore.check(binding.getAddress(), CheckType.DELETE_DURABLE_QUEUE, session);
|
securityStore.check(address, CheckType.DELETE_DURABLE_QUEUE, session);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
securityStore.check(binding.getAddress(), CheckType.DELETE_NON_DURABLE_QUEUE, session);
|
securityStore.check(address, CheckType.DELETE_NON_DURABLE_QUEUE, session);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.deleteQueue(removeConsumers);
|
queue.deleteQueue(removeConsumers);
|
||||||
|
|
||||||
|
callPostQueueDeletionCallbacks(address, queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1544,6 +1555,40 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerPostQueueCreationCallback(final PostQueueCreationCallback callback) {
|
||||||
|
postQueueCreationCallbacks.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unregisterPostQueueCreationCallback(final PostQueueCreationCallback callback) {
|
||||||
|
postQueueCreationCallbacks.remove(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callPostQueueCreationCallbacks(final SimpleString queueName) throws Exception {
|
||||||
|
for (PostQueueCreationCallback callback : postQueueCreationCallbacks) {
|
||||||
|
callback.callback(queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerPostQueueDeletionCallback(final PostQueueDeletionCallback callback) {
|
||||||
|
postQueueDeletionCallbacks.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unregisterPostQueueDeletionCallback(final PostQueueDeletionCallback callback) {
|
||||||
|
postQueueDeletionCallbacks.remove(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callPostQueueDeletionCallbacks(final SimpleString address, final SimpleString queueName) throws Exception {
|
||||||
|
for (PostQueueDeletionCallback callback : postQueueDeletionCallbacks) {
|
||||||
|
callback.callback(address, queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ExecutorFactory getExecutorFactory() {
|
public ExecutorFactory getExecutorFactory() {
|
||||||
return executorFactory;
|
return executorFactory;
|
||||||
|
@ -2091,7 +2136,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName));
|
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName));
|
||||||
}
|
}
|
||||||
else if (autoCreated) {
|
else if (autoCreated) {
|
||||||
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queueName));
|
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queueName));
|
||||||
}
|
}
|
||||||
|
|
||||||
binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());
|
binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());
|
||||||
|
@ -2127,6 +2172,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
managementService.registerAddress(address);
|
managementService.registerAddress(address);
|
||||||
managementService.registerQueue(queue, address, storageManager);
|
managementService.registerQueue(queue, address, storageManager);
|
||||||
|
|
||||||
|
callPostQueueCreationCallbacks(queueName);
|
||||||
|
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,41 +17,23 @@
|
||||||
package org.apache.activemq.artemis.core.server.impl;
|
package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
|
import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.QueueDeleter;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
||||||
import org.jboss.logging.Logger;
|
|
||||||
|
|
||||||
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
|
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(AutoCreatedQueueManagerImpl.class);
|
|
||||||
|
|
||||||
private final SimpleString queueName;
|
private final SimpleString queueName;
|
||||||
|
|
||||||
private final ActiveMQServer server;
|
private final QueueDeleter deleter;
|
||||||
|
|
||||||
private final Runnable runnable = new Runnable() {
|
private final Runnable runnable = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
Queue queue = server.locateQueue(queueName);
|
if (deleter != null) {
|
||||||
long consumerCount = queue.getConsumerCount();
|
deleter.delete(queueName);
|
||||||
long messageCount = queue.getMessageCount();
|
|
||||||
boolean isAutoDeleteJmsQueues = server.getAddressSettingsRepository().getMatch(queueName.toString()).isAutoDeleteJmsQueues();
|
|
||||||
|
|
||||||
if (server.locateQueue(queueName).getMessageCount() == 0 && isAutoDeleteJmsQueues) {
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + isAutoDeleteJmsQueues);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (server.getJMSQueueDeleter() != null) {
|
|
||||||
server.getJMSQueueDeleter().delete(queueName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("NOT deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + isAutoDeleteJmsQueues);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -62,9 +44,8 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
|
||||||
|
|
||||||
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
|
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
|
||||||
|
|
||||||
public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
public AutoCreatedQueueManagerImpl(QueueDeleter deleter, SimpleString queueName) {
|
||||||
this.server = server;
|
this.deleter = deleter;
|
||||||
|
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated());
|
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated());
|
||||||
|
|
||||||
if (queueBindingInfo.isAutoCreated()) {
|
if (queueBindingInfo.isAutoCreated()) {
|
||||||
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer(), queueBindingInfo.getQueueName()));
|
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
|
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
|
||||||
|
|
|
@ -487,8 +487,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
|
|
||||||
Queue queue;
|
Queue queue;
|
||||||
|
|
||||||
// any non-temporary JMS queue created via this method should be marked as auto-created
|
// any non-temporary JMS destination created via this method should be marked as auto-created
|
||||||
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) {
|
if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC)) ) {
|
||||||
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
|
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -1453,7 +1453,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void installJMSHooks() {
|
private void installJMSHooks() {
|
||||||
this.queueCreator = server.getJMSQueueCreator();
|
this.queueCreator = server.getJMSDestinationCreator();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() {
|
private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() {
|
||||||
|
|
|
@ -56,6 +56,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
|
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_AUTO_CREATE_TOPICS = true;
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_AUTO_DELETE_TOPICS = true;
|
||||||
|
|
||||||
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
|
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
|
||||||
|
|
||||||
public static final long DEFAULT_EXPIRY_DELAY = -1;
|
public static final long DEFAULT_EXPIRY_DELAY = -1;
|
||||||
|
@ -114,6 +118,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
private Boolean autoDeleteJmsQueues = null;
|
private Boolean autoDeleteJmsQueues = null;
|
||||||
|
|
||||||
|
private Boolean autoCreateJmsTopics = null;
|
||||||
|
|
||||||
|
private Boolean autoDeleteJmsTopics = null;
|
||||||
|
|
||||||
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
|
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
|
||||||
|
|
||||||
//from amq5
|
//from amq5
|
||||||
|
@ -142,6 +150,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
this.slowConsumerPolicy = other.slowConsumerPolicy;
|
this.slowConsumerPolicy = other.slowConsumerPolicy;
|
||||||
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
|
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
|
||||||
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
|
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
|
||||||
|
this.autoCreateJmsTopics = other.autoCreateJmsTopics;
|
||||||
|
this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
|
||||||
this.managementBrowsePageSize = other.managementBrowsePageSize;
|
this.managementBrowsePageSize = other.managementBrowsePageSize;
|
||||||
this.queuePrefetch = other.queuePrefetch;
|
this.queuePrefetch = other.queuePrefetch;
|
||||||
}
|
}
|
||||||
|
@ -167,6 +177,24 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAutoCreateJmsTopics() {
|
||||||
|
return autoCreateJmsTopics != null ? autoCreateJmsTopics : AddressSettings.DEFAULT_AUTO_CREATE_TOPICS;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddressSettings setAutoCreateJmsTopics(final boolean autoCreateJmsTopics) {
|
||||||
|
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAutoDeleteJmsTopics() {
|
||||||
|
return autoDeleteJmsTopics != null ? autoDeleteJmsTopics : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddressSettings setAutoDeleteJmsTopics(final boolean autoDeleteJmsTopics) {
|
||||||
|
this.autoDeleteJmsTopics = autoDeleteJmsTopics;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isLastValueQueue() {
|
public boolean isLastValueQueue() {
|
||||||
return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE;
|
return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE;
|
||||||
}
|
}
|
||||||
|
@ -416,6 +444,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
if (autoDeleteJmsQueues == null) {
|
if (autoDeleteJmsQueues == null) {
|
||||||
autoDeleteJmsQueues = merged.autoDeleteJmsQueues;
|
autoDeleteJmsQueues = merged.autoDeleteJmsQueues;
|
||||||
}
|
}
|
||||||
|
if (autoCreateJmsTopics == null) {
|
||||||
|
autoCreateJmsTopics = merged.autoCreateJmsTopics;
|
||||||
|
}
|
||||||
|
if (autoDeleteJmsTopics == null) {
|
||||||
|
autoDeleteJmsTopics = merged.autoDeleteJmsTopics;
|
||||||
|
}
|
||||||
if (managementBrowsePageSize == null) {
|
if (managementBrowsePageSize == null) {
|
||||||
managementBrowsePageSize = merged.managementBrowsePageSize;
|
managementBrowsePageSize = merged.managementBrowsePageSize;
|
||||||
}
|
}
|
||||||
|
@ -482,6 +516,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer);
|
autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer);
|
||||||
|
|
||||||
|
autoCreateJmsTopics = BufferHelper.readNullableBoolean(buffer);
|
||||||
|
|
||||||
|
autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer);
|
||||||
|
|
||||||
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
|
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,6 +547,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) +
|
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) +
|
||||||
BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) +
|
BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) +
|
||||||
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
|
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
|
||||||
|
BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
|
||||||
|
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
|
||||||
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
|
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -556,6 +596,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues);
|
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues);
|
||||||
|
|
||||||
|
BufferHelper.writeNullableBoolean(buffer, autoCreateJmsTopics);
|
||||||
|
|
||||||
|
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics);
|
||||||
|
|
||||||
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
|
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -587,6 +631,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
|
result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
|
||||||
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
|
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
|
||||||
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
|
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
|
||||||
|
result = prime * result + ((autoCreateJmsTopics == null) ? 0 : autoCreateJmsTopics.hashCode());
|
||||||
|
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
|
||||||
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
|
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
|
||||||
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
|
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
|
||||||
return result;
|
return result;
|
||||||
|
@ -730,8 +776,20 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
}
|
}
|
||||||
else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues))
|
else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues))
|
||||||
return false;
|
return false;
|
||||||
else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
|
|
||||||
|
if (autoCreateJmsTopics == null) {
|
||||||
|
if (other.autoCreateJmsTopics != null)
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
else if (!autoCreateJmsTopics.equals(other.autoCreateJmsTopics))
|
||||||
|
return false;
|
||||||
|
if (autoDeleteJmsTopics == null) {
|
||||||
|
if (other.autoDeleteJmsTopics != null)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (!autoDeleteJmsTopics.equals(other.autoDeleteJmsTopics))
|
||||||
|
return false;
|
||||||
|
|
||||||
if (managementBrowsePageSize == null) {
|
if (managementBrowsePageSize == null) {
|
||||||
if (other.managementBrowsePageSize != null)
|
if (other.managementBrowsePageSize != null)
|
||||||
return false;
|
return false;
|
||||||
|
@ -793,6 +851,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
autoCreateJmsQueues +
|
autoCreateJmsQueues +
|
||||||
", autoDeleteJmsQueues=" +
|
", autoDeleteJmsQueues=" +
|
||||||
autoDeleteJmsQueues +
|
autoDeleteJmsQueues +
|
||||||
|
", autoCreateJmsTopics=" +
|
||||||
|
autoCreateJmsTopics +
|
||||||
|
", autoDeleteJmsTopics=" +
|
||||||
|
autoDeleteJmsTopics +
|
||||||
", managementBrowsePageSize=" +
|
", managementBrowsePageSize=" +
|
||||||
managementBrowsePageSize +
|
managementBrowsePageSize +
|
||||||
"]";
|
"]";
|
||||||
|
|
|
@ -2308,6 +2308,23 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="auto-create-jms-topics" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
whether or not to automatically create JMS topics when a producer sends or a consumer subscribes to
|
||||||
|
a topic
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="auto-delete-jms-topics" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
whether or not to delete auto-created JMS topics when the last subscription is closed
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1"
|
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1"
|
||||||
minOccurs="0">
|
minOccurs="0">
|
||||||
<xsd:annotation>
|
<xsd:annotation>
|
||||||
|
|
|
@ -288,6 +288,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy());
|
assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy());
|
||||||
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues());
|
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues());
|
||||||
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues());
|
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues());
|
||||||
|
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsTopics());
|
||||||
|
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsTopics());
|
||||||
|
|
||||||
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
|
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
|
||||||
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
|
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
|
||||||
|
@ -301,6 +303,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy());
|
assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy());
|
||||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
|
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
|
||||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
|
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
|
||||||
|
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsTopics());
|
||||||
|
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsTopics());
|
||||||
|
|
||||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||||
|
|
|
@ -41,6 +41,8 @@ public class AddressSettingsTest extends ActiveMQTestBase {
|
||||||
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy());
|
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy());
|
||||||
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues());
|
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues());
|
||||||
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues());
|
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues());
|
||||||
|
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_TOPICS, addressSettings.isAutoCreateJmsTopics());
|
||||||
|
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_TOPICS, addressSettings.isAutoDeleteJmsTopics());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -248,6 +248,8 @@
|
||||||
<slow-consumer-policy>NOTIFY</slow-consumer-policy>
|
<slow-consumer-policy>NOTIFY</slow-consumer-policy>
|
||||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||||
<auto-delete-jms-queues>true</auto-delete-jms-queues>
|
<auto-delete-jms-queues>true</auto-delete-jms-queues>
|
||||||
|
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||||
|
<auto-delete-jms-topics>true</auto-delete-jms-topics>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
<address-setting match="a2">
|
<address-setting match="a2">
|
||||||
<dead-letter-address>a2.1</dead-letter-address>
|
<dead-letter-address>a2.1</dead-letter-address>
|
||||||
|
@ -262,6 +264,8 @@
|
||||||
<slow-consumer-policy>KILL</slow-consumer-policy>
|
<slow-consumer-policy>KILL</slow-consumer-policy>
|
||||||
<auto-create-jms-queues>false</auto-create-jms-queues>
|
<auto-create-jms-queues>false</auto-create-jms-queues>
|
||||||
<auto-delete-jms-queues>false</auto-delete-jms-queues>
|
<auto-delete-jms-queues>false</auto-delete-jms-queues>
|
||||||
|
<auto-create-jms-topics>false</auto-create-jms-topics>
|
||||||
|
<auto-delete-jms-topics>false</auto-delete-jms-topics>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
<resource-limit-settings>
|
<resource-limit-settings>
|
||||||
|
|
|
@ -90,6 +90,8 @@ entry that would be found in the `broker.xml` file.
|
||||||
<slow-consumer-check-period>5</slow-consumer-check-period>
|
<slow-consumer-check-period>5</slow-consumer-check-period>
|
||||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||||
<auto-delete-jms-queues>true</auto-delete-jms-queues>
|
<auto-delete-jms-queues>true</auto-delete-jms-queues>
|
||||||
|
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||||
|
<auto-delete-jms-topics>true</auto-delete-jms-topics>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
|
|
||||||
|
@ -177,7 +179,18 @@ create a JMS queue when a JMS message is sent to a queue whose name fits
|
||||||
the address `match` (remember, a JMS queue is just a core queue which has
|
the address `match` (remember, a JMS queue is just a core queue which has
|
||||||
the same address and queue name) or a JMS consumer tries to connect to a
|
the same address and queue name) or a JMS consumer tries to connect to a
|
||||||
queue whose name fits the address `match`. Queues which are auto-created
|
queue whose name fits the address `match`. Queues which are auto-created
|
||||||
are durable, non-temporary, and non-transient.
|
are durable, non-temporary, and non-transient. Default is `true`.
|
||||||
|
|
||||||
`auto-delete-jms-queues`. Whether or not to the broker should automatically
|
`auto-delete-jms-queues`. Whether or not the broker should automatically
|
||||||
delete auto-created JMS queues when they have both 0 consumers and 0 messages.
|
delete auto-created JMS queues when they have both 0 consumers and 0 messages.
|
||||||
|
Default is `true`.
|
||||||
|
|
||||||
|
`auto-create-jms-topics`. Whether or not the broker should automatically
|
||||||
|
create a JMS topic when a JMS message is sent to a topic whose name fits
|
||||||
|
the address `match` (remember, a JMS topic is just a core address which has
|
||||||
|
one or more core queues mapped to it) or a JMS consumer tries to subscribe
|
||||||
|
to a topic whose name fits the address `match`. Default is `true`.
|
||||||
|
|
||||||
|
`auto-delete-jms-topics`. Whether or not the broker should automatically
|
||||||
|
delete auto-created JMS topics once the last subscription on the topic has
|
||||||
|
been closed. Default is `true`.
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -89,7 +89,7 @@
|
||||||
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
||||||
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
||||||
<activemq.version.microVersion>0</activemq.version.microVersion>
|
<activemq.version.microVersion>0</activemq.version.microVersion>
|
||||||
<activemq.version.incrementingVersion>126,125,124,123,122</activemq.version.incrementingVersion>
|
<activemq.version.incrementingVersion>127,126,125,124,123,122</activemq.version.incrementingVersion>
|
||||||
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
||||||
<ActiveMQ-Version>
|
<ActiveMQ-Version>
|
||||||
${project.version}(${activemq.version.incrementingVersion})
|
${project.version}(${activemq.version.incrementingVersion})
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.InvalidDestinationException;
|
|
||||||
import javax.jms.JMSSecurityException;
|
import javax.jms.JMSSecurityException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
@ -26,18 +25,21 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class AutoCreateJmsQueueTest extends JMSTestBase {
|
public class AutoCreateJmsDestinationTest extends JMSTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateOnSendToQueue() throws Exception {
|
public void testAutoCreateOnSendToQueue() throws Exception {
|
||||||
|
@ -134,15 +136,12 @@ public class AutoCreateJmsQueueTest extends JMSTestBase {
|
||||||
|
|
||||||
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
||||||
|
|
||||||
try {
|
|
||||||
MessageProducer producer = session.createProducer(topic);
|
MessageProducer producer = session.createProducer(topic);
|
||||||
Assert.fail("Creating a producer here should throw an exception");
|
producer.send(session.createTextMessage("msg"));
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
Assert.assertTrue(e instanceof InvalidDestinationException);
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
||||||
|
assertNotNull(server.getManagementService().getResource("jms.topic.test"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -166,22 +165,77 @@ public class AutoCreateJmsQueueTest extends JMSTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateOnConsumeFromTopic() throws Exception {
|
public void testAutoCreateOnSubscribeToTopic() throws Exception {
|
||||||
Connection connection = null;
|
Connection connection = cf.createConnection();
|
||||||
connection = cf.createConnection();
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
final String topicName = "test-" + UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
javax.jms.Topic topic = ActiveMQJMSClient.createTopic(topicName);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(topic);
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
producer.send(session.createTextMessage("msg"));
|
||||||
|
connection.start();
|
||||||
|
assertNotNull(consumer.receive(500));
|
||||||
|
|
||||||
|
assertNotNull(server.getManagementService().getResource("jms.topic." + topicName));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
assertNull(server.getManagementService().getResource("jms.topic." + topicName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCreateOnDurableSubscribeToTopic() throws Exception {
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
connection.setClientID("myClientID");
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
||||||
|
|
||||||
try {
|
MessageConsumer consumer = session.createDurableConsumer(topic, "myDurableSub");
|
||||||
MessageConsumer messageConsumer = session.createConsumer(topic);
|
MessageProducer producer = session.createProducer(topic);
|
||||||
Assert.fail("Creating a consumer here should throw an exception");
|
producer.send(session.createTextMessage("msg"));
|
||||||
}
|
connection.start();
|
||||||
catch (Exception e) {
|
assertNotNull(consumer.receive(500));
|
||||||
Assert.assertTrue(e instanceof InvalidDestinationException);
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
||||||
|
assertNotNull(server.getManagementService().getResource("jms.topic.test"));
|
||||||
|
|
||||||
|
assertNotNull(server.locateQueue(SimpleString.toSimpleString("myClientID.myDurableSub")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTemporaryTopic() throws Exception {
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
// javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
||||||
|
|
||||||
|
ActiveMQTemporaryTopic topic = (ActiveMQTemporaryTopic) session.createTemporaryTopic();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(topic);
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
producer.send(session.createTextMessage("msg"));
|
||||||
|
connection.start();
|
||||||
|
assertNotNull(consumer.receive(500));
|
||||||
|
|
||||||
|
SimpleString topicAddress = topic.getSimpleAddress();
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
assertNotNull(server.locateQueue(topicAddress));
|
||||||
|
|
||||||
|
IntegrationTestLogger.LOGGER.info("Topic name: " + topicAddress);
|
||||||
|
|
||||||
|
topic.delete();
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
// assertNotNull(server.getManagementService().getResource("jms.topic.test"));
|
||||||
|
|
||||||
|
assertNull(server.locateQueue(topicAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
|
@ -31,10 +31,10 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class AutoDeleteJmsQueueTest extends JMSTestBase {
|
public class AutoDeleteJmsDestinationTest extends JMSTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoDelete() throws Exception {
|
public void testAutoDeleteQueue() throws Exception {
|
||||||
Connection connection = cf.createConnection();
|
Connection connection = cf.createConnection();
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
@ -124,4 +124,76 @@ public class AutoDeleteJmsQueueTest extends JMSTestBase {
|
||||||
// ensure the queue was not removed
|
// ensure the queue was not removed
|
||||||
Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test")));
|
Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoDeleteTopic() throws Exception {
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
||||||
|
|
||||||
|
MessageConsumer messageConsumer = session.createConsumer(topic);
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
|
||||||
|
final int numMessages = 100;
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
TextMessage mess = session.createTextMessage("msg" + i);
|
||||||
|
producer.send(mess);
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
Message m = messageConsumer.receive(5000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
// ensure the topic was removed
|
||||||
|
Assert.assertNull(server.locateQueue(new SimpleString("jms.topic.test")));
|
||||||
|
|
||||||
|
// make sure the JMX control was removed for the JMS topic
|
||||||
|
assertNull(server.getManagementService().getResource("jms.topic.test"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoDeleteTopicDurableSubscriber() throws Exception {
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
connection.setClientID("myClientID");
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
|
||||||
|
|
||||||
|
MessageConsumer messageConsumer = session.createDurableConsumer(topic, "mySub");
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
|
||||||
|
final int numMessages = 100;
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
TextMessage mess = session.createTextMessage("msg" + i);
|
||||||
|
producer.send(mess);
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
Message m = messageConsumer.receive(5000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
messageConsumer.close();
|
||||||
|
session.unsubscribe("mySub");
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
// ensure the topic was removed
|
||||||
|
Assert.assertNull(server.locateQueue(new SimpleString("jms.topic.test")));
|
||||||
|
|
||||||
|
// make sure the JMX control was removed for the JMS topic
|
||||||
|
assertNull(server.getManagementService().getResource("jms.topic.test"));
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -607,7 +607,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
jmsServer.createTopic(true, "tt", "/topic/TT");
|
jmsServer.createTopic(true, "tt", "/topic/TT");
|
||||||
|
|
||||||
server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true);
|
server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
|
||||||
|
|
||||||
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
|
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
|
||||||
|
|
||||||
|
@ -663,7 +663,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
||||||
jmsServer.setRegistry(new JndiBindingRegistry(context));
|
jmsServer.setRegistry(new JndiBindingRegistry(context));
|
||||||
jmsServer.start();
|
jmsServer.start();
|
||||||
|
|
||||||
server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true);
|
server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
|
||||||
|
|
||||||
jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
|
jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Random;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -54,6 +55,7 @@ public class NonExistentQueueTest extends JMSTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void sendToNonExistentDestination() throws Exception {
|
public void sendToNonExistentDestination() throws Exception {
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false));
|
||||||
Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist");
|
Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist");
|
||||||
TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName());
|
TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName());
|
||||||
ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
|
ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
|
||||||
|
|
|
@ -468,12 +468,14 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString();
|
String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString();
|
||||||
boolean autoCreateJmsQueues = false;
|
boolean autoCreateJmsQueues = false;
|
||||||
boolean autoDeleteJmsQueues = false;
|
boolean autoDeleteJmsQueues = false;
|
||||||
|
boolean autoCreateJmsTopics = false;
|
||||||
|
boolean autoDeleteJmsTopics = false;
|
||||||
|
|
||||||
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues);
|
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics);
|
||||||
|
|
||||||
boolean ex = false;
|
boolean ex = false;
|
||||||
try {
|
try {
|
||||||
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, 100, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues);
|
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, 100, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics);
|
||||||
}
|
}
|
||||||
catch (Exception expected) {
|
catch (Exception expected) {
|
||||||
ex = true;
|
ex = true;
|
||||||
|
@ -504,8 +506,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
|
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
|
||||||
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
|
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
|
||||||
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
|
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
|
||||||
|
assertEquals(autoCreateJmsTopics, info.isAutoCreateJmsTopics());
|
||||||
|
assertEquals(autoDeleteJmsTopics, info.isAutoDeleteJmsTopics());
|
||||||
|
|
||||||
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -1, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues);
|
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -1, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics);
|
||||||
|
|
||||||
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
|
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
|
||||||
info = AddressSettingsInfo.from(jsonString);
|
info = AddressSettingsInfo.from(jsonString);
|
||||||
|
@ -528,10 +532,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
|
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
|
||||||
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
|
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
|
||||||
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
|
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
|
||||||
|
assertEquals(autoCreateJmsTopics, info.isAutoCreateJmsTopics());
|
||||||
|
assertEquals(autoDeleteJmsTopics, info.isAutoDeleteJmsTopics());
|
||||||
|
|
||||||
ex = false;
|
ex = false;
|
||||||
try {
|
try {
|
||||||
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -2, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues);
|
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -2, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
ex = true;
|
ex = true;
|
||||||
|
|
|
@ -587,8 +587,10 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
||||||
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
|
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
|
||||||
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
|
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
|
||||||
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
|
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
|
||||||
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception {
|
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
|
||||||
proxy.invokeOperation("addAddressSettings", addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues);
|
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
|
||||||
|
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception {
|
||||||
|
proxy.invokeOperation("addAddressSettings", addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
|
import org.apache.activemq.artemis.jms.server.JMSServerManager;
|
||||||
|
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -52,6 +54,8 @@ public class ManagementWithStompTest extends ManagementTestBase {
|
||||||
|
|
||||||
protected ActiveMQServer server;
|
protected ActiveMQServer server;
|
||||||
|
|
||||||
|
protected JMSServerManager jmsServer;
|
||||||
|
|
||||||
protected ClientSession session;
|
protected ClientSession session;
|
||||||
|
|
||||||
private Socket stompSocket;
|
private Socket stompSocket;
|
||||||
|
@ -169,7 +173,9 @@ public class ManagementWithStompTest extends ManagementTestBase {
|
||||||
|
|
||||||
server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, false, "brianm", "wombats"));
|
server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, false, "brianm", "wombats"));
|
||||||
|
|
||||||
server.start();
|
jmsServer = new JMSServerManagerImpl(server);
|
||||||
|
|
||||||
|
jmsServer.start();
|
||||||
|
|
||||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
|
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
|
||||||
ClientSessionFactory sf = createSessionFactory(locator);
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
|
|
@ -57,11 +57,16 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa
|
||||||
clientConnection = createConnection();
|
clientConnection = createConnection();
|
||||||
clientConnection.setClientID("ClientConnection:" + name.getMethodName());
|
clientConnection.setClientID("ClientConnection:" + name.getMethodName());
|
||||||
|
|
||||||
|
System.out.println("Creating session.");
|
||||||
Session session = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
clientConnection.start();
|
clientConnection.start();
|
||||||
|
|
||||||
Destination replyDestination = createTemporaryDestination(session);
|
Destination replyDestination = createTemporaryDestination(session);
|
||||||
|
System.out.println("Created temporary topic " + replyDestination);
|
||||||
|
|
||||||
|
System.out.println("Creating consumer on: " + replyDestination);
|
||||||
|
MessageConsumer replyConsumer = session.createConsumer(replyDestination);
|
||||||
|
|
||||||
// lets test the destination
|
// lets test the destination
|
||||||
clientSideClientID = clientConnection.getClientID();
|
clientSideClientID = clientConnection.getClientID();
|
||||||
|
@ -74,12 +79,15 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa
|
||||||
System.out.println("Both the clientID and destination clientID match properly: " + clientSideClientID);
|
System.out.println("Both the clientID and destination clientID match properly: " + clientSideClientID);
|
||||||
|
|
||||||
/* build queues */
|
/* build queues */
|
||||||
MessageProducer requestProducer = session.createProducer(requestDestination);
|
|
||||||
MessageConsumer replyConsumer = session.createConsumer(replyDestination);
|
|
||||||
|
|
||||||
/* build requestmessage */
|
/* build requestmessage */
|
||||||
TextMessage requestMessage = session.createTextMessage("Olivier");
|
TextMessage requestMessage = session.createTextMessage("Olivier");
|
||||||
requestMessage.setJMSReplyTo(replyDestination);
|
requestMessage.setJMSReplyTo(replyDestination);
|
||||||
|
|
||||||
|
System.out.println("Creating producer on " + requestDestination);
|
||||||
|
MessageProducer requestProducer = session.createProducer(requestDestination);
|
||||||
|
|
||||||
|
System.out.println("Sending message to " + requestDestination);
|
||||||
requestProducer.send(requestMessage);
|
requestProducer.send(requestMessage);
|
||||||
|
|
||||||
System.out.println("Sent request.");
|
System.out.println("Sent request.");
|
||||||
|
@ -116,7 +124,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa
|
||||||
try {
|
try {
|
||||||
TextMessage requestMessage = (TextMessage) message;
|
TextMessage requestMessage = (TextMessage) message;
|
||||||
|
|
||||||
System.out.println("Received request.");
|
System.out.println("Received request from " + requestDestination);
|
||||||
System.out.println(requestMessage.toString());
|
System.out.println(requestMessage.toString());
|
||||||
|
|
||||||
Destination replyDestination = requestMessage.getJMSReplyTo();
|
Destination replyDestination = requestMessage.getJMSReplyTo();
|
||||||
|
@ -140,7 +148,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa
|
||||||
replyProducer.send(replyDestination, replyMessage);
|
replyProducer.send(replyDestination, replyMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Sent reply.");
|
System.out.println("Sent reply to " + replyDestination);
|
||||||
System.out.println(replyMessage.toString());
|
System.out.println(replyMessage.toString());
|
||||||
}
|
}
|
||||||
catch (JMSException e) {
|
catch (JMSException e) {
|
||||||
|
@ -180,6 +188,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa
|
||||||
requestDestination = createDestination(serverSession);
|
requestDestination = createDestination(serverSession);
|
||||||
|
|
||||||
/* build queues */
|
/* build queues */
|
||||||
|
System.out.println("Creating consumer on: " + requestDestination);
|
||||||
final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination);
|
final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination);
|
||||||
if (useAsyncConsume) {
|
if (useAsyncConsume) {
|
||||||
requestConsumer.setMessageListener(this);
|
requestConsumer.setMessageListener(this);
|
||||||
|
@ -232,6 +241,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa
|
||||||
((TemporaryTopic) dest).delete();
|
((TemporaryTopic) dest).delete();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
System.out.println("Deleting: " + dest);
|
||||||
((TemporaryQueue) dest).delete();
|
((TemporaryQueue) dest).delete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,6 +233,43 @@ public class StompTest extends StompTestBase {
|
||||||
assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistentQueue)));
|
assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistentQueue)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMessageToNonExistentTopic() throws Exception {
|
||||||
|
String nonExistentTopic = RandomUtil.randomString();
|
||||||
|
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
frame = receiveFrame(10000);
|
||||||
|
Assert.assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
// first send a message to ensure that sending to a non-existent topic won't throw an error
|
||||||
|
frame = "SEND\n" + "destination:" + getTopicPrefix() + nonExistentTopic + "\n\n" + "Hello World" + Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
receiveFrame(1000);
|
||||||
|
|
||||||
|
// create a subscription on the topic and send/receive another message
|
||||||
|
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createTopic(nonExistentTopic));
|
||||||
|
sendFrame(frame);
|
||||||
|
receiveFrame(1000);
|
||||||
|
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Assert.assertEquals("Hello World", message.getText());
|
||||||
|
// Assert default priority 4 is used when priority header is not set
|
||||||
|
Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
|
||||||
|
|
||||||
|
// Make sure that the timestamp is valid - should
|
||||||
|
// be very close to the current time.
|
||||||
|
long tnow = System.currentTimeMillis();
|
||||||
|
long tmsg = message.getJMSTimestamp();
|
||||||
|
Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
|
||||||
|
|
||||||
|
assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_TOPIC + nonExistentTopic)));
|
||||||
|
|
||||||
|
// closing the consumer here should trigger auto-deletion of the topic
|
||||||
|
consumer.close();
|
||||||
|
assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_TOPIC + nonExistentTopic)));
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
|
* Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
|
||||||
* This means next frame read might have a \n a the beginning.
|
* This means next frame read might have a \n a the beginning.
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage;
|
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage;
|
||||||
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage;
|
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage;
|
||||||
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
|
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
|
||||||
|
@ -348,6 +349,7 @@ public class MessageProducerTest extends JMSTestCase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateProducerOnInexistentDestination() throws Exception {
|
public void testCreateProducerOnInexistentDestination() throws Exception {
|
||||||
|
getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false));
|
||||||
Connection pconn = createConnection();
|
Connection pconn = createConnection();
|
||||||
try {
|
try {
|
||||||
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
|
@ -172,6 +172,7 @@ public class SessionTest extends ActiveMQServerTestCase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateNonExistentTopic() throws Exception {
|
public void testCreateNonExistentTopic() throws Exception {
|
||||||
|
getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false));
|
||||||
Connection conn = getConnectionFactory().createConnection();
|
Connection conn = getConnectionFactory().createConnection();
|
||||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
try {
|
try {
|
||||||
|
@ -201,6 +202,7 @@ public class SessionTest extends ActiveMQServerTestCase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateTopicWhileQueueWithSameNameExists() throws Exception {
|
public void testCreateTopicWhileQueueWithSameNameExists() throws Exception {
|
||||||
|
getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false));
|
||||||
Connection conn = getConnectionFactory().createConnection();
|
Connection conn = getConnectionFactory().createConnection();
|
||||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue