diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java b/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java deleted file mode 100644 index 655bb40189..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.advisory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { - - private static final transient Logger LOG = LoggerFactory.getLogger(MasterSlaveTempQueueMemoryTest.class); - - BrokerService slave; - - /* - * add a slave broker - * @see org.apache.activemq.EmbeddedBrokerTestSupport#createBroker() - */ - @Override - protected BrokerService createBroker() throws Exception { - // bindAddress is used by super.createBroker - bindAddress = "tcp://localhost:0"; - BrokerService master = super.createBroker(); - master.setBrokerName("master"); - configureBroker(master); - slave = super.createBroker(); - slave.setBrokerName("slave"); - slave.setMasterConnectorURI(master.getTransportConnectors().get(0).getPublishableConnectString()); - - configureBroker(slave); - bindAddress = master.getTransportConnectors().get(0).getPublishableConnectString(); - return master; - } - - private void configureBroker(BrokerService broker) { - broker.setUseJmx(false); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setOptimizedDispatch(true); - policyMap.setDefaultEntry(defaultEntry); - // optimized dispatch does not effect the determinism of inflight between - // master and slave in this test - //broker.setDestinationPolicy(policyMap); - - } - - @Override - protected void startBroker() throws Exception { - - // because master will wait for slave to connect it needs - // to be in a separate thread - Thread starterThread = new Thread() { - public void run() { - try { - broker.setWaitForSlave(true); - broker.start(); - } catch (Exception e) { - fail("failed to start broker, reason:" + e); - e.printStackTrace(); - } - } - }; - starterThread.start(); - - slave.start(); - starterThread.join(60*1000); - assertTrue("slave is indeed a slave", slave.isSlave()); - } - - @Override - protected void tearDown() throws Exception { - slave.stop(); - super.tearDown(); - - } - - @Override - public void testLoadRequestReply() throws Exception { - super.testLoadRequestReply(); - - Thread.sleep(2000); - - // some checks on the slave - AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor( - AdvisoryBroker.class); - - assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size()); - - RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor( - RegionBroker.class); - - //serverDestination + - assertEquals(6, rb.getDestinationMap().size()); - - RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor( - RegionBroker.class); - - LOG.info("enqueues " + rb.getDestinationStatistics().getEnqueues().getCount()); - assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount()); - - LOG.info("dequeues " + rb.getDestinationStatistics().getDequeues().getCount()); - assertEquals("dequeues match", - rb.getDestinationStatistics().getDequeues().getCount(), - masterRb.getDestinationStatistics().getDequeues().getCount()); - - LOG.info("inflight, slave " + rb.getDestinationStatistics().getInflight().getCount() - + ", master " + masterRb.getDestinationStatistics().getInflight().getCount()); - - // not totally deterministic for this test - maybe due to async send - //assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); - - // slave does not actually dispatch any messages, so no request/reply(2) pair per iteration(COUNT) - // slave estimate must be >= actual master value - // master does not always reach expected total, should be assertEquals.., why? - assertTrue("dispatched to slave is as good as master, master=" - + masterRb.getDestinationStatistics().getDispatched().getCount(), - rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >= - masterRb.getDestinationStatistics().getDispatched().getCount()); - } - - public void testMoreThanPageSizeUnacked() throws Exception { - - final int messageCount = Queue.MAX_PAGE_SIZE + 10; - final CountDownLatch latch = new CountDownLatch(1); - - serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - ActiveMQSession s = (ActiveMQSession) serverSession; - s.setSessionAsyncDispatch(true); - - MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination); - serverConsumer.setMessageListener(new MessageListener() { - - public void onMessage(Message msg) { - try { - latch.await(30L, TimeUnit.SECONDS); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - MessageProducer producer = clientSession.createProducer(serverDestination); - for (int i =0; i< messageCount; i++) { - Message msg = clientSession.createMessage(); - producer.send(msg); - } - Thread.sleep(5000); - - RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor( - RegionBroker.class); - RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor( - RegionBroker.class); - - assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount()); - assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); - - latch.countDown(); - Thread.sleep(5000); - assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount()); - assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); - } - - public void testLoadRequestReplyWithNoTempQueueDelete() throws Exception { - deleteTempQueue = false; - messagesToSend = 10; - testLoadRequestReply(); - } - - public void testLoadRequestReplyWithTransactions() throws Exception { - serverTransactional = clientTransactional = true; - messagesToSend = 100; - reInitialiseSessions(); - testLoadRequestReply(); - } - - public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception { - serverTransactional = true; - numConsumers = numProducers = 10; - messagesToSend = 100; - reInitialiseSessions(); - testLoadRequestReply(); - } - - protected void reInitialiseSessions() throws Exception { - // reinitialize so they can respect the transactional flags - serverSession.close(); - clientSession.close(); - serverSession = serverConnection.createSession(serverTransactional, - serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - clientSession = clientConnection.createSession(clientTransactional, - clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveProducerFlowControlTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveProducerFlowControlTest.java deleted file mode 100644 index 488cabbbca..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveProducerFlowControlTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.ft; - -import org.apache.activemq.ProducerFlowControlTest; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MasterSlaveProducerFlowControlTest extends ProducerFlowControlTest { - static final Logger LOG = LoggerFactory.getLogger(MasterSlaveProducerFlowControlTest.class); - BrokerService slave; - protected BrokerService createBroker() throws Exception { - BrokerService service = super.createBroker(); - service.start(); - - slave = new BrokerService(); - slave.setBrokerName("Slave"); - slave.setPersistent(false); - slave.setUseJmx(false); - - // Setup a destination policy where it takes lots of message at a time. - // so that slave does not block first as there is no producer flow control - // on the master connector - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); - // don't apply the same memory limit as the master in this case - //policy.setMemoryLimit(10); - policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); - policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); - policy.setProducerFlowControl(true); - policyMap.setDefaultEntry(policy); - - slave.setDestinationPolicy(policyMap); - slave.setMasterConnectorURI(connector.getConnectUri().toString()); - slave.start(); - return service; - } - - protected void tearDown() throws Exception { - super.tearDown(); - if (slave != null) { - slave.stop(); - } - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java deleted file mode 100644 index 9f3990559a..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.ft; - -import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MasterSlaveSlaveDieTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(MasterSlaveSlaveDieTest.class); - - private final AtomicBoolean pluginStopped = new AtomicBoolean(false); - class Plugin extends BrokerPluginSupport { - - @Override - public void start() throws Exception { - LOG.info("plugin start"); - super.start(); - } - - @Override - public void stop() throws Exception { - LOG.info("plugin stop"); - pluginStopped.set(true); - super.stop(); - } - - } - public void testSlaveDieMasterStays() throws Exception { - final BrokerService master = new BrokerService(); - master.setBrokerName("master"); - master.setPersistent(false); - // The wireformat negotiation timeout (defaults to same as - // MaxInactivityDurationInitalDelay) needs to be a bit longer - // on slow running machines - set it to 90 seconds. - master.addConnector("tcp://localhost:0?wireFormat.maxInactivityDurationInitalDelay=90000"); - master.setWaitForSlave(true); - master.setPlugins(new BrokerPlugin[] { new Plugin() }); - - final BrokerService slave = new BrokerService(); - slave.setBrokerName("slave"); - slave.setPersistent(false); - URI masterUri = master.getTransportConnectors().get(0).getConnectUri(); - //SocketProxy masterProxy = new SocketProxy(masterUri); - slave.setMasterConnectorURI(masterUri.toString()); - - slave.setUseJmx(false); - slave.getManagementContext().setCreateConnector(false); - - Executors.newSingleThreadExecutor().execute(new Runnable() { - public void run() { - try { - master.start(); - } catch (Exception e) { - LOG.warn("Exception starting master: " + e); - e.printStackTrace(); - } - } - }); - slave.start(); - slave.waitUntilStarted(); - - master.waitUntilStarted(); - - LOG.info("killing slave.."); - slave.stop(); - slave.waitUntilStopped(); - - LOG.info("checking master still alive"); - assertTrue("master is still alive", master.isStarted()); - assertFalse("plugin was not yet stopped", pluginStopped.get()); - master.stop(); - master.waitUntilStopped(); - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java deleted file mode 100644 index d16b32d0cd..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.ft; - -import java.io.File; -import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MasterSlaveSlaveShutdownTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(MasterSlaveSlaveShutdownTest.class); - - BrokerService master; - BrokerService slave; - - private void createMasterBroker() throws Exception { - final BrokerService master = new BrokerService(); - master.setBrokerName("master"); - master.setPersistent(false); - master.addConnector("tcp://localhost:0"); - - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - kaha.deleteAllMessages(); - master.setPersistenceAdapter(kaha); - - this.master = master; - } - - private void createSlaveBroker() throws Exception { - - final BrokerService slave = new BrokerService(); - slave.setBrokerName("slave"); - slave.setPersistent(false); - URI masterUri = master.getTransportConnectors().get(0).getConnectUri(); - slave.setMasterConnectorURI(masterUri.toString()); - slave.setUseJmx(false); - slave.getManagementContext().setCreateConnector(false); - - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - slave.setPersistenceAdapter(kaha); - - this.slave = slave; - } - - public void tearDown() { - try { - this.master.stop(); - } catch (Exception e) { - } - this.master.waitUntilStopped(); - this.master = null; - this.slave = null; - } - - public void testSlaveShutsdownWhenWaitingForLock() throws Exception { - - createMasterBroker(); - createSlaveBroker(); - - Executors.newSingleThreadExecutor().execute(new Runnable() { - public void run() { - try { - master.start(); - } catch (Exception e) { - LOG.warn("Exception starting master: " + e); - e.printStackTrace(); - } - } - }); - master.waitUntilStarted(); - - Thread.sleep(2000); - - Executors.newSingleThreadExecutor().execute(new Runnable() { - public void run() { - try { - slave.start(); - } catch (Exception e) { - LOG.warn("Exception starting master: " + e); - e.printStackTrace(); - } - } - }); - slave.waitUntilStarted(); - Thread.sleep(TimeUnit.SECONDS.toMillis(15)); - - LOG.info("killing slave.."); - Executors.newSingleThreadExecutor().execute(new Runnable() { - public void run() { - try { - slave.stop(); - } catch (Exception e) { - LOG.warn("Exception starting master: " + e); - e.printStackTrace(); - } - } - }); - - Thread.sleep(TimeUnit.SECONDS.toMillis(15)); - assertFalse(slave.isStarted()); - slave.waitUntilStopped(); - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java deleted file mode 100644 index 2a8bfc2456..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.ft; - -import java.io.File; -import java.net.URISyntaxException; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsTopicTransactionTest; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.test.JmsResourceProvider; - -/** - * Test failover for Topics - */ -public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest { - protected BrokerService slave; - protected int inflightMessageCount; - protected int failureCount = 50; - protected String uriString = "failover://(tcp://localhost:62001?soWriteTimeout=15000,tcp://localhost:62002?soWriteTimeout=15000)?randomize=false"; - private boolean stopMaster = false; - - @Override - protected void setUp() throws Exception { - failureCount = super.batchCount / 2; - // this will create the main (or master broker) - broker = createBroker(); - File dir = new File ("target" + File.separator + "slave"); - KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); - adapter.setDirectory(dir); - broker.start(); - slave = new BrokerService(); - slave.setBrokerName("slave"); - slave.setPersistenceAdapter(adapter); - slave.setDeleteAllMessagesOnStartup(true); - slave.setMasterConnectorURI("tcp://localhost:62001"); - slave.addConnector("tcp://localhost:62002"); - slave.start(); - // wait for thing to connect - Thread.sleep(1000); - resourceProvider = getJmsResourceProvider(); - topic = resourceProvider.isTopic(); - // We will be using transacted sessions. - resourceProvider.setTransacted(true); - connectionFactory = resourceProvider.createConnectionFactory(); - reconnect(); - } - - @Override - protected void tearDown() throws Exception { - slave.stop(); - slave = null; - super.tearDown(); - } - - @Override - protected BrokerService createBroker() throws Exception, URISyntaxException { - File dir = new File ("target" + File.separator + "master"); - KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); - adapter.setDirectory(dir); - BrokerService broker = new BrokerService(); - broker.setBrokerName("master"); - broker.setPersistenceAdapter(adapter); - broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector("tcp://localhost:62001"); - return broker; - } - - @Override - protected JmsResourceProvider getJmsResourceProvider() { - JmsResourceProvider p = super.getJmsResourceProvider(); - p.setServerUri(uriString); - return p; - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(uriString); - } - - public void testSendReceiveTransactedBatchesWithMasterStop() throws Exception { - try { - stopMaster = true; - testSendReceiveTransactedBatches(); - } finally { - stopMaster = false; - } - } - - @Override - protected void messageSent() throws Exception { - if (stopMaster) { - if (++inflightMessageCount >= failureCount) { - inflightMessageCount = 0; - Thread.sleep(1000); - broker.stop(); - } - } - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java deleted file mode 100644 index e8ffcb715c..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - - -import java.lang.Thread.UncaughtExceptionHandler; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2102Test extends CombinationTestSupport implements UncaughtExceptionHandler { - - final static int MESSAGE_COUNT = 12120; - final static int NUM_CONSUMERS = 10; - final static int CONSUME_ALL = -1; - - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2102Test.class); - - private final static Map exceptions = new ConcurrentHashMap(); - - private class Consumer implements Runnable, ExceptionListener { - private ActiveMQConnectionFactory connectionFactory; - private String name; - private String queueName; - private boolean running; - private org.omg.CORBA.IntHolder startup; - private Thread thread; - private final int numToProcessPerIteration; - - Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) { - this.connectionFactory = connectionFactory; - this.queueName = queueName; - this.startup = startup; - name = "Consumer-" + queueName + "-" + id; - numToProcessPerIteration = numToProcess; - thread = new Thread(this, name); - } - - private String getClientId() { - try { - return InetAddress.getLocalHost().getHostName() + ":" + name; - } catch (UnknownHostException e) { - return "localhost:" + name; - } - } - - synchronized boolean isRunning() { - return running; - } - - void join() { - try { - thread.join(30000); - } catch (InterruptedException e) { - error("Interrupted waiting for " + name + " to stop", e); - } - } - - public void onException(JMSException e) { - exceptions.put(Thread.currentThread(), e); - error("JMS exception: ", e); - } - - private void processMessage(Session session, MessageProducer producer, Message message) throws Exception { - if (message instanceof TextMessage) { - TextMessage textMessage = (TextMessage) message; - - Destination replyQueue = textMessage.getJMSReplyTo(); - if (replyQueue != null) { - TextMessage reply = session.createTextMessage("reply-" + textMessage.getText()); - - reply.setJMSCorrelationID(textMessage.getJMSCorrelationID()); - - producer.send(replyQueue, reply); - debug("replied via " + replyQueue + " for message => " + textMessage.getText() + ", " + textMessage.getJMSMessageID()); - } else { - debug("no reply to message => " + textMessage.getText() + ", " + textMessage.getJMSMessageID()); - } - } else { - error("Consumer cannot process " + message.getClass().getSimpleName()); - } - } - - private void processMessages() throws JMSException { - ActiveMQConnection connection = null; - - try { - connection = (ActiveMQConnection) connectionFactory.createConnection(); - - RedeliveryPolicy policy = connection.getRedeliveryPolicy(); - - policy.setMaximumRedeliveries(6); - policy.setInitialRedeliveryDelay(1000); - policy.setUseCollisionAvoidance(false); - policy.setCollisionAvoidancePercent((short) 15); - policy.setUseExponentialBackOff(false); - policy.setBackOffMultiplier((short) 5); - - connection.setClientID(getClientId()); - connection.setExceptionListener(this); - connection.start(); - - processMessages(connection); - } finally { - connection.close(); - connection = null; - } - } - - private void processMessages(Connection connection) throws JMSException { - Session session = null; - try { - session = connection.createSession(true, Session.SESSION_TRANSACTED); - if (numToProcessPerIteration > 0) { - while(isRunning()) { - processMessages(session); - } - } else { - processMessages(session); - } - } finally { - if (session != null) { - session.close(); - } - } - } - private void processMessages(Session session) throws JMSException { - MessageConsumer consumer = null; - - try { - consumer = session.createConsumer(session.createQueue(queueName), null); - processMessages(session, consumer); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - private void processMessages(Session session, MessageConsumer consumer) throws JMSException { - MessageProducer producer = null; - - try { - producer = session.createProducer(null); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - processMessages(session, consumer, producer); - } finally { - if (producer != null) { - producer.close(); - } - } - } - - private void processMessages(Session session, MessageConsumer consumer, MessageProducer producer) throws JMSException { - debug("waiting for messages..."); - if (startup != null) { - synchronized (startup) { - startup.value--; - startup.notify(); - } - startup = null; - } - int numToProcess = numToProcessPerIteration; - do { - Message message = consumer.receive(5000); - - if (message != null) { - try { - processMessage(session, producer, message); - session.commit(); - numToProcess--; - } catch (Throwable t) { - error("message=" + message + " failure", t); - session.rollback(); - } - } else { - info("got null message on: " + numToProcess); - } - } while ((numToProcessPerIteration == CONSUME_ALL || numToProcess > 0) && isRunning()); - } - - public void run() { - setRunning(true); - - while (isRunning()) { - try { - processMessages(); - } catch (Throwable t) { - error("Unexpected consumer problem: ", t); - } - } - } - synchronized void setRunning(boolean running) { - this.running = running; - } - - void start() { - thread.start(); - } - } - - private class Producer implements ExceptionListener { - private ActiveMQConnectionFactory connectionFactory; - private String queueName; - - Producer(ActiveMQConnectionFactory connectionFactory, String queueName) { - this.connectionFactory = connectionFactory; - this.queueName = queueName; - } - - void execute(String[] args) { - try { - sendMessages(); - } catch (Exception e) { - error("Producer failed", e); - } - } - - private void sendMessages() throws JMSException { - ActiveMQConnection connection = null; - - try { - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setExceptionListener(this); - connection.start(); - - sendMessages(connection); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (JMSException e) { - error("Problem closing connection", e); - } - } - } - } - - private void sendMessages(ActiveMQConnection connection) throws JMSException { - Session session = null; - - try { - session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - sendMessages(session); - } catch (JMSException e) { - e.printStackTrace(); - exceptions.put(Thread.currentThread(), e); - if (session != null) { - session.rollback(); - } - } finally { - if (session != null) { - session.close(); - } - } - } - - private void sendMessages(Session session) throws JMSException { - TemporaryQueue replyQueue = null; - - try { - replyQueue = session.createTemporaryQueue(); - - sendMessages(session, replyQueue); - } finally { - if (replyQueue != null) { - replyQueue.delete(); - } - } - } - - private void sendMessages(Session session, Destination replyQueue) throws JMSException { - MessageConsumer consumer = null; - - try { - consumer = session.createConsumer(replyQueue); - sendMessages(session, replyQueue, consumer); - } finally { - consumer.close(); - session.commit(); - } - } - - private void sendMessages(Session session, Destination replyQueue, int messageCount) throws JMSException { - MessageProducer producer = null; - - try { - producer = session.createProducer(session.createQueue(queueName)); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.setTimeToLive(0); - producer.setPriority(Message.DEFAULT_PRIORITY); - - for (int i = 0; i < messageCount; i++) { - TextMessage message = session.createTextMessage("message#" + i); - message.setJMSReplyTo(replyQueue); - producer.send(message); - } - } finally { - if (producer != null) { - producer.close(); - } - } - } - - private void sendMessages(final Session session, Destination replyQueue, MessageConsumer consumer) throws JMSException { - final org.omg.CORBA.IntHolder messageCount = new org.omg.CORBA.IntHolder(MESSAGE_COUNT); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message reply) { - if (reply instanceof TextMessage) { - TextMessage textReply = (TextMessage) reply; - synchronized (messageCount) { - try { - debug("receive reply#" + messageCount.value + " " + textReply.getText() + ", " + textReply.getJMSMessageID()); - } catch (JMSException e) { - error("Problem processing reply", e); - } - messageCount.value--; - if (messageCount.value % 200 == 0) { - // ack a bunch of replys - info("acking via session commit: messageCount=" + messageCount.value); - try { - session.commit(); - } catch (JMSException e) { - error("Failed to commit with count: " + messageCount.value, e); - } - } - messageCount.notifyAll(); - } - } else { - error("Producer cannot process " + reply.getClass().getSimpleName()); - } - }}); - - sendMessages(session, replyQueue, messageCount.value); - - session.commit(); - - synchronized (messageCount) { - while (messageCount.value > 0) { - - - try { - messageCount.wait(); - } catch (InterruptedException e) { - error("Interrupted waiting for replies", e); - } - } - } - // outstanding replys - session.commit(); - debug("All replies received..."); - } - - public void onException(JMSException exception) { - LOG.error(exception.toString()); - exceptions.put(Thread.currentThread(), exception); - } - } - - private static void debug(String message) { - LOG.info(message); - } - - private static void info(String message) { - LOG.info(message); - } - - private static void error(String message) { - LOG.error(message); - } - - private static void error(String message, Throwable t) { - t.printStackTrace(); - String msg = message + ": " + (t.getMessage() != null ? t.getMessage() : t.toString()); - LOG.error(msg, t); - exceptions.put(Thread.currentThread(), t); - fail(msg); - } - - private ArrayList createConsumers(ActiveMQConnectionFactory connectionFactory, String queueName, - int max, int numToProcessPerConsumer) { - ArrayList consumers = new ArrayList(max); - org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max); - - for (int id = 0; id < max; id++) { - consumers.add(new Consumer(connectionFactory, queueName, startup, id, numToProcessPerConsumer)); - } - for (Consumer consumer : consumers) { - consumer.start(); - } - synchronized (startup) { - while (startup.value > 0) { - try { - startup.wait(); - } catch (InterruptedException e) { - error("Interrupted waiting for consumers to start", e); - } - } - } - return consumers; - } - - final BrokerService master = new BrokerService(); - BrokerService slave = new BrokerService(); - String masterUrl; - - public void setUp() throws Exception { - setMaxTestTime(12 * 60 * 1000); - setAutoFail(true); - super.setUp(); - master.setUseShutdownHook(false); - master.setBrokerName("Master"); - master.addConnector("tcp://localhost:0"); - master.deleteAllMessages(); - master.setWaitForSlave(true); - - Thread t = new Thread() { - public void run() { - try { - master.start(); - } catch (Exception e) { - e.printStackTrace(); - exceptions.put(Thread.currentThread(), e); - } - } - }; - t.start(); - masterUrl = master.getTransportConnectors().get(0).getConnectUri().toString(); - - debug("masterUrl: " + masterUrl); - slave.setUseShutdownHook(false); - slave.setBrokerName("Slave"); - slave.deleteAllMessages(); - slave.addConnector("tcp://localhost:0"); - slave.setMasterConnectorURI(masterUrl); - slave.start(); - slave.waitUntilStarted(); - assertTrue("master started", master.waitUntilStarted()); - } - - public void tearDown() throws Exception { - master.stop(); - slave.stop(); - exceptions.clear(); - } - - public void testMasterSlaveBug() throws Exception { - - Thread.setDefaultUncaughtExceptionHandler(this); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + - masterUrl + ")?randomize=false"); - String queueName = "MasterSlaveBug"; - ArrayList consumers = createConsumers(connectionFactory, queueName, NUM_CONSUMERS, CONSUME_ALL); - - Producer producer = new Producer(connectionFactory, queueName); - producer.execute(new String[]{}); - - for (Consumer consumer : consumers) { - consumer.setRunning(false); - } - - for (Consumer consumer : consumers) { - consumer.join(); - } - assertTrue(exceptions.isEmpty()); - } - - - public void testMasterSlaveBugWithStopStartConsumers() throws Exception { - - Thread.setDefaultUncaughtExceptionHandler(this); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( - "failover:(" + masterUrl + ")?randomize=false"); - String queueName = "MasterSlaveBug"; - ArrayList consumers = createConsumers(connectionFactory, - queueName, NUM_CONSUMERS, 10); - - Producer producer = new Producer(connectionFactory, queueName); - producer.execute(new String[] {}); - - for (Consumer consumer : consumers) { - consumer.setRunning(false); - } - - for (Consumer consumer : consumers) { - consumer.join(); - } - assertTrue(exceptions.isEmpty()); - } - - public void uncaughtException(Thread t, Throwable e) { - error("" + t + e); - exceptions.put(t,e); - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java deleted file mode 100644 index 12c8006b7f..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - - -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptionHandler, MessageListener { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2183Test.class); - private static final int maxSent = 2000; - private final Map exceptions = new ConcurrentHashMap(); - - BrokerService master = new BrokerService(); - BrokerService slave = new BrokerService(); - URI masterUrl, slaveUrl; - - public void onException(JMSException e) { - exceptions.put(Thread.currentThread(), e); - } - - public void setUp() throws Exception { - setAutoFail(true); - super.setUp(); - master = new BrokerService(); - slave = new BrokerService(); - - master.setBrokerName("Master"); - master.addConnector("tcp://localhost:0"); - master.deleteAllMessages(); - master.setWaitForSlave(true); - - Thread t = new Thread() { - public void run() { - try { - master.start(); - } catch (Exception e) { - e.printStackTrace(); - exceptions.put(Thread.currentThread(), e); - } - } - }; - t.start(); - Thread.sleep(2000); - masterUrl = master.getTransportConnectors().get(0).getConnectUri(); - } - - private void startSlave() throws IOException, Exception, URISyntaxException { - slave.setBrokerName("Slave"); - slave.deleteAllMessages(); - slave.addConnector("tcp://localhost:0"); - slave.setMasterConnectorURI(masterUrl.toString()); - slave.start(); - slaveUrl = slave.getTransportConnectors().get(0).getConnectUri(); - } - - public void tearDown() throws Exception { - master.stop(); - slave.stop(); - exceptions.clear(); - } - - class MessageCounter implements MessageListener { - int count = 0; - public void onMessage(Message message) { - count++; - } - - int getCount() { - return count; - } - } - - public void testMasterSlaveBugWithStopStartConsumers() throws Exception { - - Thread.setDefaultUncaughtExceptionHandler(this); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( - "failover:(" + masterUrl + ")?randomize=false"); - - final Connection connection = connectionFactory.createConnection(); - final CountDownLatch startCommenced = new CountDownLatch(1); - final CountDownLatch startDone = new CountDownLatch(1); - - // start will be blocked pending slave connection but should resume after slave started - Executors.newSingleThreadExecutor().execute(new Runnable() { - public void run() { - startCommenced.countDown(); - try { - connection.start(); - startDone.countDown(); - } catch (Exception e) { - exceptions.put(Thread.currentThread(), e); - } - }}); - - - assertTrue("connection.start has commenced", startCommenced.await(10, TimeUnit.SECONDS)); - startSlave(); - assertTrue("connection.start done", startDone.await(70, TimeUnit.SECONDS)); - - final MessageCounter counterA = new MessageCounter(); - connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA); - - final MessageCounter counterB = new MessageCounter(); - connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.B.VirtualTopic.T")).setMessageListener(counterB); - - Thread.sleep(2000); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.T")); - for (int i=0; i