ARTEMIS-789 Fixed a number of failing tests
This commit is contained in:
parent
fe52ca6d75
commit
fa67d40b9d
|
@ -621,7 +621,7 @@ public class ArtemisTest {
|
|||
|
||||
// This is usually set when run from the command line via artemis.profile
|
||||
Run.setEmbedded(true);
|
||||
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--topics", topics, "--no-autotune", "--require-login", "--ping", "127.0.0.1");
|
||||
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--addresses", topics, "--no-autotune", "--require-login", "--ping", "127.0.0.1");
|
||||
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
|
||||
|
||||
FileConfiguration fc = new FileConfiguration();
|
||||
|
|
|
@ -436,7 +436,7 @@ public interface ActiveMQServerControl {
|
|||
|
||||
@Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
|
||||
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
@Parameter(name = "routingTypes", desc = "Comma separated list of Routing Typles (anycast/multicast)") String routingTypes) throws Exception;
|
||||
@Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception;
|
||||
|
||||
@Operation(desc = "add the provided routing type to an address", impact = MBeanOperationInfo.ACTION)
|
||||
void addRoutingType(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
|
|||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.codec.WritableBuffer;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.EndpointState;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -108,8 +109,9 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
return transportConnection.isWritable(callback);
|
||||
public boolean isWritable(ReadyListener callback, Object protocolContext) {
|
||||
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) protocolContext;
|
||||
return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED;
|
||||
}
|
||||
|
||||
public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState;
|
|||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.EndpointState;
|
||||
import org.apache.qpid.proton.engine.Sender;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -580,6 +581,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
int size = nettyBuffer.writerIndex();
|
||||
|
||||
synchronized (connection.getLock()) {
|
||||
if (sender.getLocalState() == EndpointState.CLOSED) {
|
||||
return 0;
|
||||
}
|
||||
final Delivery delivery;
|
||||
delivery = sender.delivery(tag, 0, tag.length);
|
||||
delivery.setMessageFormat((int) messageFormat);
|
||||
|
|
|
@ -37,7 +37,7 @@ public class MQTTSessionCallback implements SessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
public boolean isWritable(ReadyListener callback, Object protocolContext) {
|
||||
return connection.isWritable(callback);
|
||||
}
|
||||
|
||||
|
|
|
@ -209,7 +209,7 @@ public class AMQSession implements SessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
public boolean isWritable(ReadyListener callback, Object protocolContext) {
|
||||
return connection.isWritable(callback);
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ public class StompSession implements SessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
public boolean isWritable(ReadyListener callback, Object protocolContext) {
|
||||
return connection.isWritable(callback);
|
||||
}
|
||||
|
||||
|
|
|
@ -473,9 +473,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
synchronized (addressLock) {
|
||||
if (RoutingType.MULTICAST.equals(routingType)) {
|
||||
final Bindings bindings = addressManager.getBindingsForRoutingAddress(addressName);
|
||||
final boolean existsQueueBindings = bindings.getBindings().stream().anyMatch(QueueBinding.class::isInstance);
|
||||
if (existsQueueBindings) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidMulticastRoutingTypeDelete();
|
||||
if (bindings != null) {
|
||||
final boolean existsQueueBindings = bindings.getBindings().stream().anyMatch(QueueBinding.class::isInstance);
|
||||
if (existsQueueBindings) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidMulticastRoutingTypeDelete();
|
||||
}
|
||||
}
|
||||
}
|
||||
final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> {
|
||||
|
|
|
@ -55,7 +55,7 @@ public final class CoreSessionCallback implements SessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
public boolean isWritable(ReadyListener callback, Object protocolContext) {
|
||||
return connection.isWritable(callback);
|
||||
}
|
||||
|
||||
|
|
|
@ -2519,6 +2519,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true));
|
||||
addressAlreadyExists = false;
|
||||
}
|
||||
} else if (info == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
||||
}
|
||||
|
||||
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
|
||||
|
|
|
@ -69,10 +69,6 @@ public class QueueFactoryImpl implements QueueFactory {
|
|||
|
||||
@Override
|
||||
public Queue createQueueWith(final QueueConfig config) {
|
||||
|
||||
// Add default address info if one doesn't exist
|
||||
postOffice.addAddressInfo(new AddressInfo(config.address()));
|
||||
|
||||
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
|
||||
final Queue queue;
|
||||
if (addressSettings.isLastValueQueue()) {
|
||||
|
|
|
@ -326,7 +326,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
// should go back into the
|
||||
// queue for delivery later.
|
||||
// TCP-flow control has to be done first than everything else otherwise we may lose notifications
|
||||
if (!callback.isWritable(this) || !started || transferring) {
|
||||
if (!callback.isWritable(this, protocolContext) || !started || transferring) {
|
||||
return HandleStatus.BUSY;
|
||||
}
|
||||
|
||||
|
|
|
@ -242,7 +242,7 @@ public class ManagementServiceImpl implements ManagementService {
|
|||
queueControl.setMessageCounter(counter);
|
||||
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
|
||||
}
|
||||
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, queue.getName(),queue.getRoutingType());
|
||||
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, queue.getName(), queue.getRoutingType());
|
||||
registerInJMX(objectName, queueControl);
|
||||
registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ public interface SessionCallback {
|
|||
|
||||
void disconnect(ServerConsumer consumerId, String queueName);
|
||||
|
||||
boolean isWritable(ReadyListener callback);
|
||||
boolean isWritable(ReadyListener callback, Object protocolContext);
|
||||
|
||||
/**
|
||||
* Some protocols (Openwire) needs a special message with the browser is finished.
|
||||
|
|
|
@ -881,7 +881,63 @@
|
|||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="addresses" type="addressesType" maxOccurs="1" minOccurs="0" />
|
||||
<xsd:element name="addresses" type="addressesType" maxOccurs="1" minOccurs="0"/>
|
||||
|
||||
<xsd:element name="network-check-list" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
A comma separated list of IPs to be used to validate if the broker should be kept up
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="network-check-URL-list" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
A comma separated list of URLs to be used to validate if the broker should be kept up
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="network-check-period" type="xsd:long" default="10000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
A frequency in milliseconds to how often we should check if the network is still up
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="network-check-timeout" type="xsd:long" default="1000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
A timeout used in milliseconds to be used on the ping.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="network-check-NIC" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The network interface card name to be used to validate the address.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="network-check-ping-command" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The ping command used to ping IPV4 addresses.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="network-check-ping6-command" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The ping command used to ping IPV6 addresses.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
</xsd:complexType>
|
||||
|
||||
|
|
|
@ -501,7 +501,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
public boolean isWritable(ReadyListener callback, Object protocolContext) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import javax.jms.Session;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
|
||||
|
@ -40,6 +41,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
|||
|
||||
@Before
|
||||
public void createQueue() throws Exception {
|
||||
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST));
|
||||
server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jms.cluster;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||
|
||||
public class ReplicatedJMSFailoverTest extends JMSFailoverTest {
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
protected void startServers() throws Exception {
|
||||
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
|
||||
|
||||
backupConf = createBasicConfig().setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir(0, true)).setJournalMinFiles(2).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setHAPolicyConfiguration(new ReplicaPolicyConfiguration());
|
||||
|
||||
backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConf, true));
|
||||
|
||||
backupJMSServer = new JMSServerManagerImpl(backupServer);
|
||||
|
||||
backupJMSServer.setRegistry(new JndiBindingRegistry(ctx2));
|
||||
|
||||
backupJMSServer.start();
|
||||
|
||||
liveConf = createBasicConfig().setJournalType(getDefaultJournalType()).addConnectorConfiguration("toBackup", new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams)).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)).setBindingsDirectory(getBindingsDir(0, false)).setJournalMinFiles(2).setJournalDirectory(getJournalDir(0, false)).setPagingDirectory(getPageDir(0, false)).setLargeMessagesDirectory(getLargeMessagesDir(0, false)).setHAPolicyConfiguration(new ReplicatedPolicyConfiguration());
|
||||
|
||||
liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConf, true));
|
||||
|
||||
liveJMSServer = new JMSServerManagerImpl(liveServer);
|
||||
|
||||
liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1));
|
||||
|
||||
liveJMSServer.start();
|
||||
}
|
||||
|
||||
// Private -------------------------------------------------------
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
}
|
|
@ -591,9 +591,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
@Parameter(name = "routingType", desc = "The delivery modes enabled for this address'") String routingTypes) throws Exception {
|
||||
|
||||
public void createAddress(String name, String routingTypes) throws Exception {
|
||||
proxy.invokeOperation("createAddress", name, routingTypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -65,15 +65,15 @@ public class BasicOpenWireTest extends OpenWireTestBase {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
SimpleString coreQueue = new SimpleString(queueName);
|
||||
this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
|
||||
this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false, -1, false, true);
|
||||
testQueues.put(queueName, coreQueue);
|
||||
|
||||
SimpleString coreQueue2 = new SimpleString(queueName2);
|
||||
this.server.createQueue(coreQueue2, RoutingType.ANYCAST, coreQueue2, null, false, false);
|
||||
this.server.createQueue(coreQueue2, RoutingType.ANYCAST, coreQueue2, null, false, false, -1, false, true);
|
||||
testQueues.put(queueName2, coreQueue2);
|
||||
|
||||
SimpleString durableQueue = new SimpleString(durableQueueName);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
testQueues.put(durableQueueName, durableQueue);
|
||||
|
||||
if (!enableSecurity) {
|
||||
|
@ -139,7 +139,7 @@ public class BasicOpenWireTest extends OpenWireTestBase {
|
|||
SimpleString coreQ = testQueues.get(qname);
|
||||
if (coreQ == null) {
|
||||
coreQ = new SimpleString(qname);
|
||||
this.server.createQueue(coreQ, RoutingType.ANYCAST, coreQ, null, false, false);
|
||||
this.server.createQueue(coreQ, RoutingType.ANYCAST, coreQ, null, false, false, -1, false, true);
|
||||
testQueues.put(qname, coreQ);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ public class OpenWireTestBase extends ActiveMQTestBase {
|
|||
|
||||
server.getConfiguration().putSecurityRoles("#", roles);
|
||||
}
|
||||
addServer(server);
|
||||
jmsServer = new JMSServerManagerImpl(server);
|
||||
namingContext = new InVMNamingContext();
|
||||
jmsServer.setRegistry(new JndiBindingRegistry(namingContext));
|
||||
|
|
|
@ -574,7 +574,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
Connection exConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
|
@ -616,7 +616,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
Connection exConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
|
@ -653,7 +653,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
Connection openConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
|
||||
ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
|
||||
|
||||
|
@ -693,7 +693,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
Connection conn1 = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
|
||||
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import javax.jms.TextMessage;
|
|||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -42,7 +43,7 @@ public class VerySimpleOenwireTest extends OpenWireTestBase {
|
|||
Connection exConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
|
@ -79,7 +80,7 @@ public class VerySimpleOenwireTest extends OpenWireTestBase {
|
|||
Connection openConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
|
||||
|
||||
ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||
|
@ -78,7 +80,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
|||
ClientSession session = sf.createSession();
|
||||
|
||||
try {
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
|
||||
|
||||
PageSubscriptionCounter counter = locateCounter(queue);
|
||||
|
||||
|
@ -107,7 +110,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
|||
ClientSession session = sf.createSession();
|
||||
|
||||
try {
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
|
||||
|
||||
PageSubscriptionCounter counter = locateCounter(queue);
|
||||
|
||||
|
@ -162,7 +166,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
|||
ClientSession session = sf.createSession();
|
||||
|
||||
try {
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
|
||||
|
||||
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
|
||||
|
||||
PageSubscriptionCounter counter = locateCounter(queue);
|
||||
|
||||
|
@ -215,7 +221,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testRestartCounter() throws Exception {
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
|
||||
Queue queue = server.createQueue(new SimpleString("A1"), RoutingType.ANYCAST, new SimpleString("A1"), null, true, false);
|
||||
|
||||
PageSubscriptionCounter counter = locateCounter(queue);
|
||||
|
||||
|
|
|
@ -46,6 +46,8 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
|||
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -93,7 +95,8 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
|
@ -182,9 +185,10 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
|
||||
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
|
@ -311,9 +315,10 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
|
||||
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
|
@ -404,7 +409,8 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
|
@ -488,7 +494,8 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
@ -57,7 +59,8 @@ public class PagingReceiveTest extends ActiveMQTestBase {
|
|||
super.setUp();
|
||||
server = internalCreateServer();
|
||||
|
||||
Queue queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
queue.getPageSubscription().getPagingStore().startPaging();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Test;
|
||||
|
@ -69,7 +71,8 @@ public class PagingSyncTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
|
|
|
@ -72,7 +72,9 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
|
@ -356,8 +358,8 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
Queue queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
|
||||
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
|
||||
queue.getPageSubscription().getPagingStore().startPaging();
|
||||
|
||||
|
@ -3600,7 +3602,8 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
server.start();
|
||||
|
||||
try {
|
||||
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false);
|
||||
|
||||
final CountDownLatch pageUp = new CountDownLatch(0);
|
||||
final CountDownLatch pageDone = new CountDownLatch(1);
|
||||
|
@ -3639,7 +3642,7 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
|
||||
server.start();
|
||||
|
||||
// server.createAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
|
||||
|
||||
final CountDownLatch pageUp = new CountDownLatch(0);
|
||||
|
|
|
@ -46,7 +46,9 @@ import org.apache.activemq.artemis.core.security.Role;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
|
@ -227,8 +229,9 @@ public class SecurityTest extends ActiveMQTestBase {
|
|||
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
|
||||
server.getConfiguration().putSecurityRoles("#", roles);
|
||||
server.start();
|
||||
server.createQueue(ADDRESS, DURABLE_QUEUE, null, true, false);
|
||||
server.createQueue(ADDRESS, NON_DURABLE_QUEUE, null, false, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false);
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false);
|
||||
|
||||
ClientSessionFactory cf = createSessionFactory(locator);
|
||||
ClientSession session = addClientSession(cf.createSession("first", "secret", false, true, true, false, 0));
|
||||
|
@ -315,8 +318,9 @@ public class SecurityTest extends ActiveMQTestBase {
|
|||
bRoles.add(new Role(QUEUE_B.toString(), false, true, false, false, false, false, false, false, false, false));
|
||||
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_B).toString(), bRoles);
|
||||
server.start();
|
||||
server.createQueue(ADDRESS, QUEUE_A, null, true, false);
|
||||
server.createQueue(ADDRESS, QUEUE_B, null, true, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_A, null, true, false);
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, QUEUE_B, null, true, false);
|
||||
|
||||
ClientSessionFactory cf = createSessionFactory(locator);
|
||||
ClientSession aSession = addClientSession(cf.createSession("a", "a", false, true, true, false, 0));
|
||||
|
@ -389,8 +393,9 @@ public class SecurityTest extends ActiveMQTestBase {
|
|||
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(tc));
|
||||
ClientSessionFactory cf = createSessionFactory(locator);
|
||||
|
||||
server.createQueue(ADDRESS, DURABLE_QUEUE, null, true, false);
|
||||
server.createQueue(ADDRESS, NON_DURABLE_QUEUE, null, false, false);
|
||||
server.createAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, DURABLE_QUEUE, null, true, false);
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, NON_DURABLE_QUEUE, null, false, false);
|
||||
|
||||
ClientSession session = addClientSession(cf.createSession());
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
|
@ -562,7 +563,7 @@ public class FakeQueue implements Queue {
|
|||
|
||||
@Override
|
||||
public RoutingType getRoutingType() {
|
||||
return null;
|
||||
return ActiveMQDefaultConfiguration.getDefaultRoutingType();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue