ARTEMIS-1644 legacy clients can't access resources with old prefixes
This commit is contained in:
parent
3d79a08963
commit
9c8bf2f2ca
|
@ -77,6 +77,10 @@ ${global-max-section}
|
||||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||||
|
|
||||||
|
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
|
||||||
|
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
|
||||||
|
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
|
||||||
|
|
||||||
<!-- Acceptor for every supported protocol -->
|
<!-- Acceptor for every supported protocol -->
|
||||||
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||||
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
|
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
|
|
||||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||||
<acceptor name="hornetq">tcp://${host}:${hq.port}?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
|
<acceptor name="hornetq">tcp://${host}:${hq.port}?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
|
||||||
|
|
|
@ -28,36 +28,16 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
public abstract class QueueAbstractPacket extends PacketImpl {
|
public abstract class QueueAbstractPacket extends PacketImpl {
|
||||||
|
|
||||||
protected SimpleString queueName;
|
protected SimpleString queueName;
|
||||||
protected SimpleString oldVersionQueueName;
|
|
||||||
|
|
||||||
protected SimpleString address;
|
protected SimpleString address;
|
||||||
protected SimpleString oldVersionAddresseName;
|
|
||||||
|
|
||||||
public SimpleString getQueueName(int clientVersion) {
|
public SimpleString getQueueName() {
|
||||||
|
|
||||||
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
|
|
||||||
if (oldVersionQueueName == null) {
|
|
||||||
oldVersionQueueName = convertName(queueName);
|
|
||||||
}
|
|
||||||
|
|
||||||
return oldVersionQueueName;
|
|
||||||
} else {
|
|
||||||
return queueName;
|
return queueName;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public SimpleString getAddress(int clientVersion) {
|
public SimpleString getAddress() {
|
||||||
|
|
||||||
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
|
|
||||||
if (oldVersionAddresseName == null) {
|
|
||||||
oldVersionAddresseName = convertName(address);
|
|
||||||
}
|
|
||||||
|
|
||||||
return oldVersionAddresseName;
|
|
||||||
} else {
|
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It converts the given {@code queueNames} using the JMS prefix found on {@link #address} when {@code clientVersion < }{@link #ADDRESSING_CHANGE_VERSION}.
|
* It converts the given {@code queueNames} using the JMS prefix found on {@link #address} when {@code clientVersion < }{@link #ADDRESSING_CHANGE_VERSION}.
|
||||||
|
@ -69,31 +49,30 @@ public abstract class QueueAbstractPacket extends PacketImpl {
|
||||||
*/
|
*/
|
||||||
public final List<SimpleString> convertQueueNames(int clientVersion, List<SimpleString> queueNames) {
|
public final List<SimpleString> convertQueueNames(int clientVersion, List<SimpleString> queueNames) {
|
||||||
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
|
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
|
||||||
return applyAddressPrefixTo(queueNames);
|
|
||||||
} else {
|
|
||||||
return queueNames;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<SimpleString> applyAddressPrefixTo(List<SimpleString> queueNames) {
|
|
||||||
final int names = queueNames.size();
|
final int names = queueNames.size();
|
||||||
if (names == 0) {
|
if (names == 0) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
} else {
|
} else {
|
||||||
final SimpleString address = this.address;
|
final SimpleString prefix = jmsPrefixOf(this.address);
|
||||||
final SimpleString prefix = jmsPrefixOf(address);
|
|
||||||
if (prefix != null) {
|
if (prefix != null) {
|
||||||
final List<SimpleString> prefixedQueueNames = new ArrayList<>(names);
|
final List<SimpleString> prefixedQueueNames = new ArrayList<>(names);
|
||||||
for (int i = 0; i < names; i++) {
|
for (int i = 0; i < names; i++) {
|
||||||
final SimpleString oldQueueNames = queueNames.get(i);
|
final SimpleString oldQueueName = queueNames.get(i);
|
||||||
final SimpleString prefixedQueueName = prefix.concat(oldQueueNames);
|
if (oldQueueName.startsWith(prefix)) {
|
||||||
|
prefixedQueueNames.add(oldQueueName);
|
||||||
|
} else {
|
||||||
|
final SimpleString prefixedQueueName = prefix.concat(oldQueueName);
|
||||||
prefixedQueueNames.add(prefixedQueueName);
|
prefixedQueueNames.add(prefixedQueueName);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return prefixedQueueNames;
|
return prefixedQueueNames;
|
||||||
} else {
|
} else {
|
||||||
return queueNames;
|
return queueNames;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return queueNames;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SimpleString jmsPrefixOf(SimpleString address) {
|
private static SimpleString jmsPrefixOf(SimpleString address) {
|
||||||
|
@ -111,10 +90,12 @@ public abstract class QueueAbstractPacket extends PacketImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SimpleString getOldPrefixedAddress(SimpleString address, RoutingType routingType) {
|
public static SimpleString getOldPrefixedAddress(SimpleString address, RoutingType routingType) {
|
||||||
switch (routingType) {
|
if (routingType == RoutingType.MULTICAST && !address.startsWith(OLD_TOPIC_PREFIX)) {
|
||||||
case MULTICAST: return OLD_TOPIC_PREFIX.concat(address);
|
return OLD_TOPIC_PREFIX.concat(address);
|
||||||
case ANYCAST: return OLD_QUEUE_PREFIX.concat(address);
|
} else if (routingType == RoutingType.ANYCAST && !address.startsWith(OLD_QUEUE_PREFIX)) {
|
||||||
default: return address;
|
return OLD_QUEUE_PREFIX.concat(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return address;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.activemq.artemis.api.core;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QueueAbstractPacket;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class QueueAbstractTest {
|
|
||||||
|
|
||||||
class MyTest extends QueueAbstractPacket {
|
|
||||||
|
|
||||||
MyTest(String name) {
|
|
||||||
super((byte)0);
|
|
||||||
this.queueName = SimpleString.toSimpleString(name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOldTopic() {
|
|
||||||
MyTest test = new MyTest("jms.topic.mytopic");
|
|
||||||
|
|
||||||
Assert.assertEquals("mytopic", test.getQueueName(127).toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOldQueue() {
|
|
||||||
MyTest test = new MyTest("jms.queue.myQueue");
|
|
||||||
|
|
||||||
Assert.assertEquals("myQueue", test.getQueueName(127).toString());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -63,4 +63,9 @@ class HornetQProtocolManager extends CoreProtocolManager {
|
||||||
String frameStart = new String(array, StandardCharsets.US_ASCII);
|
String frameStart = new String(array, StandardCharsets.US_ASCII);
|
||||||
return frameStart.startsWith("HORNETQ");
|
return frameStart.startsWith("HORNETQ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "HornetQProtocolManager(server=" + super.server + ")";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||||
|
import org.apache.activemq.artemis.utils.uri.BeanSupport;
|
||||||
import org.osgi.service.component.annotations.Component;
|
import org.osgi.service.component.annotations.Component;
|
||||||
|
|
||||||
@Component(service = ProtocolManagerFactory.class)
|
@Component(service = ProtocolManagerFactory.class)
|
||||||
|
@ -40,7 +41,7 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
|
||||||
public ProtocolManager createProtocolManager(final ActiveMQServer server,
|
public ProtocolManager createProtocolManager(final ActiveMQServer server,
|
||||||
final Map<String, Object> parameters,
|
final Map<String, Object> parameters,
|
||||||
final List<BaseInterceptor> incomingInterceptors,
|
final List<BaseInterceptor> incomingInterceptors,
|
||||||
List<BaseInterceptor> outgoingInterceptors) {
|
List<BaseInterceptor> outgoingInterceptors) throws Exception {
|
||||||
|
|
||||||
List<Interceptor> hqIncoming = filterInterceptors(incomingInterceptors);
|
List<Interceptor> hqIncoming = filterInterceptors(incomingInterceptors);
|
||||||
List<Interceptor> hqOutgoing = filterInterceptors(outgoingInterceptors);
|
List<Interceptor> hqOutgoing = filterInterceptors(outgoingInterceptors);
|
||||||
|
@ -48,7 +49,7 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
|
||||||
hqIncoming.add(new HQPropertiesConversionInterceptor(true));
|
hqIncoming.add(new HQPropertiesConversionInterceptor(true));
|
||||||
hqOutgoing.add(new HQPropertiesConversionInterceptor(false));
|
hqOutgoing.add(new HQPropertiesConversionInterceptor(false));
|
||||||
|
|
||||||
return new HornetQProtocolManager(this, server, hqIncoming, hqOutgoing);
|
return BeanSupport.setData(new HornetQProtocolManager(this, server, hqIncoming, hqOutgoing), parameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -314,11 +314,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
case SESS_CREATECONSUMER: {
|
case SESS_CREATECONSUMER: {
|
||||||
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
|
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
|
||||||
requiresResponse = request.isRequiresResponse();
|
requiresResponse = request.isRequiresResponse();
|
||||||
session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getChannelVersion()), request.getFilterString(), request.isBrowseOnly());
|
session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
// We send back queue information on the queue as a response- this allows the queue to
|
// We send back queue information on the queue as a response- this allows the queue to
|
||||||
// be automatically recreated on failover
|
// be automatically recreated on failover
|
||||||
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
|
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
|
||||||
|
|
||||||
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
|
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
|
||||||
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
|
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
|
||||||
|
@ -387,7 +387,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
case SESS_QUEUEQUERY: {
|
case SESS_QUEUEQUERY: {
|
||||||
requiresResponse = true;
|
requiresResponse = true;
|
||||||
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
||||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
|
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
|
||||||
|
|
||||||
if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||||
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
||||||
|
@ -406,7 +406,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
requiresResponse = true;
|
requiresResponse = true;
|
||||||
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
||||||
final int clientVersion = remotingConnection.getChannelVersion();
|
final int clientVersion = remotingConnection.getChannelVersion();
|
||||||
BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion));
|
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
|
||||||
|
|
||||||
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
|
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
|
||||||
* names otherwise the older client won't realize the queue exists and will try to create it and receive
|
* names otherwise the older client won't realize the queue exists and will try to create it and receive
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl;
|
package org.apache.activemq.artemis.core.protocol.core.impl;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
|
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
|
||||||
|
@ -24,6 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||||
|
@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
|
||||||
import org.apache.activemq.artemis.core.server.ServerProducer;
|
import org.apache.activemq.artemis.core.server.ServerProducer;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
|
||||||
|
@ -163,12 +162,6 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
|
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
|
||||||
|
|
||||||
if (connection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
|
||||||
routingTypeMap = new HashMap<>();
|
|
||||||
routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST);
|
|
||||||
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
|
|
||||||
}
|
|
||||||
|
|
||||||
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
|
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
|
||||||
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap);
|
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap);
|
||||||
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
|
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
|
||||||
|
|
||||||
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
|
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
|
||||||
|
|
||||||
private final ActiveMQServer server;
|
protected final ActiveMQServer server;
|
||||||
|
|
||||||
private final List<Interceptor> incomingInterceptors;
|
private final List<Interceptor> incomingInterceptors;
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ configuration = new ConfigurationImpl();
|
||||||
configuration.setJournalType(JournalType.NIO);
|
configuration.setJournalType(JournalType.NIO);
|
||||||
System.out.println("folder:: " + folder);
|
System.out.println("folder:: " + folder);
|
||||||
configuration.setBrokerInstance(new File(folder + "/" + id));
|
configuration.setBrokerInstance(new File(folder + "/" + id));
|
||||||
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616?anycastPrefix=jms.queue.&multicastPrefix=jms.topic.");
|
||||||
configuration.setSecurityEnabled(false);
|
configuration.setSecurityEnabled(false);
|
||||||
configuration.setPersistenceEnabled(false);
|
configuration.setPersistenceEnabled(false);
|
||||||
configuration.addAddressesSetting("myQueue", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeBytes(1024 * 1024 * 1024).setPageSizeBytes(1024));
|
configuration.addAddressesSetting("myQueue", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeBytes(1024 * 1024 * 1024).setPageSizeBytes(1024));
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
package servers
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// starts an artemis server
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
|
||||||
|
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
|
||||||
|
|
||||||
|
|
||||||
|
String folder = arg[0];
|
||||||
|
String id = arg[1];
|
||||||
|
String type = arg[2];
|
||||||
|
String producer = arg[3];
|
||||||
|
String consumer = arg[4];
|
||||||
|
|
||||||
|
println("type = " + type);
|
||||||
|
|
||||||
|
configuration = new ConfigurationImpl();
|
||||||
|
configuration.setJournalType(JournalType.NIO);
|
||||||
|
System.out.println("folder:: " + folder);
|
||||||
|
configuration.setBrokerInstance(new File(folder + "/" + id));
|
||||||
|
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
||||||
|
configuration.setSecurityEnabled(false);
|
||||||
|
configuration.setPersistenceEnabled(persistent);
|
||||||
|
try {
|
||||||
|
if (!type.startsWith("ARTEMIS-1")) {
|
||||||
|
configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// need to ignore this for 1.4
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
jmsConfiguration = new JMSConfigurationImpl();
|
||||||
|
|
||||||
|
server = new EmbeddedJMS();
|
||||||
|
server.setConfiguration(configuration);
|
||||||
|
server.setJmsConfiguration(jmsConfiguration);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
server.getJMSServerManager().createTopic(true, "topic");
|
||||||
|
server.getJMSServerManager().createQueue(true, "queue", null, true);
|
|
@ -0,0 +1,60 @@
|
||||||
|
package servers
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// starts an artemis server
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
|
||||||
|
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
|
||||||
|
|
||||||
|
|
||||||
|
String folder = arg[0];
|
||||||
|
String id = arg[1];
|
||||||
|
String type = arg[2];
|
||||||
|
String producer = arg[3];
|
||||||
|
String consumer = arg[4];
|
||||||
|
|
||||||
|
println("type = " + type);
|
||||||
|
|
||||||
|
configuration = new ConfigurationImpl();
|
||||||
|
configuration.setJournalType(JournalType.NIO);
|
||||||
|
System.out.println("folder:: " + folder);
|
||||||
|
configuration.setBrokerInstance(new File(folder + "/" + id));
|
||||||
|
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
||||||
|
configuration.setSecurityEnabled(false);
|
||||||
|
configuration.setPersistenceEnabled(persistent);
|
||||||
|
try {
|
||||||
|
if (!type.startsWith("ARTEMIS-1")) {
|
||||||
|
configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// need to ignore this for 1.4
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
jmsConfiguration = new JMSConfigurationImpl();
|
||||||
|
|
||||||
|
server = new EmbeddedJMS();
|
||||||
|
server.setConfiguration(configuration);
|
||||||
|
server.setJmsConfiguration(jmsConfiguration);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
server.getJMSServerManager().createTopic(true, "topic");
|
||||||
|
server.getJMSServerManager().createQueue(true, "jms.queue.queue", null, true);
|
|
@ -0,0 +1,105 @@
|
||||||
|
package meshTest
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||||
|
|
||||||
|
import javax.jms.*
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// starts an artemis server
|
||||||
|
String serverType = arg[0];
|
||||||
|
String clientType = arg[1];
|
||||||
|
String operation = arg[2];
|
||||||
|
|
||||||
|
|
||||||
|
String queueName
|
||||||
|
if (clientType.equals(GroovyRun.SNAPSHOT) || clientType.equals(GroovyRun.TWO_FOUR)) {
|
||||||
|
queueName = "jms.queue.queue";
|
||||||
|
} else {
|
||||||
|
queueName = "queue";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
String textBody = "a rapadura e doce mas nao e mole nao";
|
||||||
|
|
||||||
|
println("serverType " + serverType);
|
||||||
|
|
||||||
|
if (clientType.startsWith("ARTEMIS")) {
|
||||||
|
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
|
||||||
|
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
|
||||||
|
} else {
|
||||||
|
// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis
|
||||||
|
GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
|
||||||
|
if (operation.equals("sendAckMessages")) {
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(10);
|
||||||
|
|
||||||
|
CompletionListener completionListener = new CompletionListener() {
|
||||||
|
@Override
|
||||||
|
void onCompletion(Message message) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void onException(Message message, Exception exception) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
producer.send(session.createTextMessage(textBody + i), completionListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
System.out.println("Sending messages");
|
||||||
|
connection.close();
|
||||||
|
System.out.println("Message sent");
|
||||||
|
} else if (operation.equals("receiveMessages")) {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
System.out.println("Receiving messages");
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TextMessage message = consumer.receive(1000);
|
||||||
|
GroovyRun.assertNotNull(message);
|
||||||
|
GroovyRun.assertEquals(textBody + i, message.getText());
|
||||||
|
}
|
||||||
|
|
||||||
|
GroovyRun.assertNull(consumer.receiveNoWait());
|
||||||
|
connection.close();
|
||||||
|
System.out.println("Message received");
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Invalid operation " + operation);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ configuration = new ConfigurationImpl();
|
||||||
configuration.setJournalType(JournalType.NIO);
|
configuration.setJournalType(JournalType.NIO);
|
||||||
System.out.println("folder:: " + folder);
|
System.out.println("folder:: " + folder);
|
||||||
configuration.setBrokerInstance(new File(folder + "/" + id));
|
configuration.setBrokerInstance(new File(folder + "/" + id));
|
||||||
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616?anycastPrefix=jms.queue.&multicastPrefix=jms.topic.");
|
||||||
configuration.setSecurityEnabled(false);
|
configuration.setSecurityEnabled(false);
|
||||||
configuration.setPersistenceEnabled(persistent);
|
configuration.setPersistenceEnabled(persistent);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -46,6 +46,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT
|
||||||
*/
|
*/
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class ExportImportTest extends VersionedBaseTest {
|
public class ExportImportTest extends VersionedBaseTest {
|
||||||
|
private String serverScriptToUse;
|
||||||
|
|
||||||
// this will ensure that all tests in this class are run twice,
|
// this will ensure that all tests in this class are run twice,
|
||||||
// once with "true" passed to the class' constructor and once with "false"
|
// once with "true" passed to the class' constructor and once with "false"
|
||||||
|
@ -107,6 +108,9 @@ public class ExportImportTest extends VersionedBaseTest {
|
||||||
public void internalSendReceive(boolean legacyPrefixes) throws Throwable {
|
public void internalSendReceive(boolean legacyPrefixes) throws Throwable {
|
||||||
setVariable(senderClassloader, "legacy", false);
|
setVariable(senderClassloader, "legacy", false);
|
||||||
setVariable(senderClassloader, "persistent", true);
|
setVariable(senderClassloader, "persistent", true);
|
||||||
|
if (legacyPrefixes) {
|
||||||
|
serverScriptToUse = "exportimport/artemisServer.groovy";
|
||||||
|
}
|
||||||
startServer(serverFolder.getRoot(), senderClassloader, "sender");
|
startServer(serverFolder.getRoot(), senderClassloader, "sender");
|
||||||
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
|
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
|
||||||
stopServer(senderClassloader);
|
stopServer(senderClassloader);
|
||||||
|
@ -133,5 +137,9 @@ public class ExportImportTest extends VersionedBaseTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getServerScriptToUse() {
|
||||||
|
return serverScriptToUse;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.compatibility;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_247;
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class PrefixSendAckTest extends ServerBaseTest {
|
||||||
|
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||||
|
public static Collection getParameters() {
|
||||||
|
List<Object[]> combinations = new ArrayList<>();
|
||||||
|
|
||||||
|
combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FIVE, HORNETQ_247, TWO_FOUR, SNAPSHOT}, new Object[]{ONE_FIVE, HORNETQ_247, TWO_FOUR, SNAPSHOT}));
|
||||||
|
return combinations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PrefixSendAckTest(String server, String sender, String receiver) throws Exception {
|
||||||
|
super(server, sender, receiver);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendReceive() throws Throwable {
|
||||||
|
evaluate(senderClassloader, "prefixSendAckTest/sendAckMessages.groovy", server, sender, "sendAckMessages");
|
||||||
|
evaluate(receiverClassloader, "prefixSendAckTest/sendAckMessages.groovy", server, receiver, "receiveMessages");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getServerScriptToUse() {
|
||||||
|
return "prefixSendAckTest/artemisServer.groovy";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -194,7 +194,9 @@ public abstract class VersionedBaseTest {
|
||||||
System.out.println("Folder::" + folder);
|
System.out.println("Folder::" + folder);
|
||||||
|
|
||||||
String scriptToUse;
|
String scriptToUse;
|
||||||
if (server.startsWith("ARTEMIS")) {
|
if (getServerScriptToUse() != null && getServerScriptToUse().length() != 0) {
|
||||||
|
scriptToUse = getServerScriptToUse();
|
||||||
|
} else if (server.startsWith("ARTEMIS")) {
|
||||||
scriptToUse = "servers/artemisServer.groovy";
|
scriptToUse = "servers/artemisServer.groovy";
|
||||||
} else {
|
} else {
|
||||||
scriptToUse = "servers/hornetqServer.groovy";
|
scriptToUse = "servers/hornetqServer.groovy";
|
||||||
|
@ -206,4 +208,8 @@ public abstract class VersionedBaseTest {
|
||||||
public void stopServer(ClassLoader loader) throws Throwable {
|
public void stopServer(ClassLoader loader) throws Throwable {
|
||||||
execute(loader, "server.stop()");
|
execute(loader, "server.stop()");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getServerScriptToUse() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue