From 9b00a0947e2cf8e603b89070afebf22123867f4f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 17 Jan 2014 14:34:11 -0500 Subject: [PATCH] Add missing license header --- .../org/apache/activemq/bugs/AMQ4952Test.java | 179 ++++++++++-------- 1 file changed, 98 insertions(+), 81 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java index 4e88e82a9b..f9df753216 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java @@ -1,12 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.bugs; +import java.net.URI; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.sql.DataSource; + import junit.framework.TestCase; + import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.*; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; @@ -15,49 +62,41 @@ import org.apache.activemq.util.Wait; import org.apache.derby.jdbc.EmbeddedDataSource; import org.junit.After; import org.junit.Before; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; -import javax.jms.Connection; -import javax.sql.DataSource; -import java.net.URI; -import java.sql.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.*; - /** - * Test creates a broker network with two brokers - - * producerBroker (with a message producer attached) and consumerBroker (with consumer attached) + * Test creates a broker network with two brokers - producerBroker (with a + * message producer attached) and consumerBroker (with consumer attached) *

- * Simulates network duplicate message by stopping and restarting the consumerBroker after message (with message ID ending in - * 120) is persisted to consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the network connection. - * When the network connection is reestablished the producerBroker resends - * message (with messageID ending in 120). + * Simulates network duplicate message by stopping and restarting the + * consumerBroker after message (with message ID ending in 120) is persisted to + * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the + * network connection. When the network connection is reestablished the + * producerBroker resends message (with messageID ending in 120). *

* Expectation: *

- * With the following policy entries set, would expect the duplicate message to be read from the store - * and dispatched to the consumer - where the duplicate could be detected by consumer. + * With the following policy entries set, would expect the duplicate message to + * be read from the store and dispatched to the consumer - where the duplicate + * could be detected by consumer. *

- * PolicyEntry policy = new PolicyEntry(); - * policy.setQueue(">"); - * policy.setEnableAudit(false); - * policy.setUseCache(false); + * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">"); + * policy.setEnableAudit(false); policy.setUseCache(false); * policy.setExpireMessagesPeriod(0); *

*

- * Note 1: Network needs to use replaywhenNoConsumers so enabling the networkAudit to avoid this scenario is not feasible. + * Note 1: Network needs to use replaywhenNoConsumers so enabling the + * networkAudit to avoid this scenario is not feasible. *

- * NOTE 2: Added a custom plugin to the consumerBroker so that the consumerBroker shutdown will occur after a message has been - * persisted to consumerBroker store but before an ACK is sent back to ProducerBroker. This is just a hack to ensure producerBroker will resend - * the message after shutdown. + * NOTE 2: Added a custom plugin to the consumerBroker so that the + * consumerBroker shutdown will occur after a message has been persisted to + * consumerBroker store but before an ACK is sent back to ProducerBroker. This + * is just a hack to ensure producerBroker will resend the message after + * shutdown. */ @RunWith(value = Parameterized.class) @@ -81,9 +120,9 @@ public class AMQ4952Test extends TestCase { @Parameterized.Parameter(0) public boolean enableCursorAudit; - @Parameterized.Parameters(name="enableAudit={0}") + @Parameterized.Parameters(name = "enableAudit={0}") public static Iterable getTestParameters() { - return Arrays.asList(new Object[][]{{Boolean.TRUE},{Boolean.FALSE}}); + return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }); } @Test @@ -106,7 +145,6 @@ public class AMQ4952Test extends TestCase { Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME); - while (true) { TextMessage textMsg = (TextMessage) messageConsumer.receive(5000); @@ -117,14 +155,14 @@ public class AMQ4952Test extends TestCase { receivedMessageCount++; LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID()); - // on first delivery ensure the message is pending an ack when it is resent from the producer broker + // on first delivery ensure the message is pending an + // ack when it is resent from the producer broker if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) { LOG.info("Waiting for restart..."); consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS); } textMsg.acknowledge(); - } } finally { consumerConnection.close(); @@ -133,21 +171,20 @@ public class AMQ4952Test extends TestCase { }; Runnable consumerBrokerResetTask = new Runnable() { + @Override public void run() { try { // wait for signal stopConsumerBroker.await(); - LOG.info("********* STOPPING CONSUMER BROKER"); consumerBroker.stop(); consumerBroker.waitUntilStopped(); - LOG.info("***** STARTING CONSUMER BROKER"); - // do not delete messages on startup + // do not delete messages on startup consumerBroker = createConsumerBroker(false); LOG.info("***** CONSUMER BROKER STARTED!!"); @@ -162,28 +199,24 @@ public class AMQ4952Test extends TestCase { })); consumerRestartedAndMessageForwarded.countDown(); - } catch (Exception e) { LOG.error("Exception when stopping/starting the consumerBroker ", e); } - } }; - ExecutorService executor = Executors.newFixedThreadPool(2); - //start consumerBroker start/stop task + // start consumerBroker start/stop task executor.execute(consumerBrokerResetTask); - //start consuming messages + // start consuming messages Future numberOfConsumedMessage = executor.submit(consumeMessageTask); - produceMessages(); - //Wait for consumer to finish + // Wait for consumer to finish int totalMessagesConsumed = numberOfConsumedMessage.get(); StringBuffer contents = new StringBuffer(); @@ -193,7 +226,6 @@ public class AMQ4952Test extends TestCase { assertEquals("number of messages received", 2, totalMessagesConsumed); assertEquals("messages left in store", true, messageInStore); assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ")); - } private void produceMessages() throws JMSException { @@ -253,20 +285,16 @@ public class AMQ4952Test extends TestCase { consumerBroker = createConsumerBroker(true); } - /** - * Producer broker - * listens on localhost:2003 - * networks to consumerBroker - localhost:2006 + * Producer broker listens on localhost:2003 networks to consumerBroker - + * localhost:2006 * * @return * @throws Exception */ - protected BrokerService createProducerBroker() throws Exception { - - String networkToPorts[] = new String[]{"2006"}; + String networkToPorts[] = new String[] { "2006" }; HashMap networkProps = new HashMap(); networkProps.put("networkTTL", "10"); @@ -287,8 +315,7 @@ public class AMQ4952Test extends TestCase { transportConnectors.add(transportConnector); broker.setTransportConnectors(transportConnectors); - - //network to consumerBroker + // network to consumerBroker if (networkToPorts != null && networkToPorts.length > 0) { StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false"); @@ -296,10 +323,10 @@ public class AMQ4952Test extends TestCase { if (networkProps != null) { IntrospectionSupport.setProperties(nc, networkProps); } - nc.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQQueue[]{QUEUE_NAME})); + nc.setStaticallyIncludedDestinations(Arrays. asList(new ActiveMQQueue[] { QUEUE_NAME })); } - //Persistence adapter + // Persistence adapter JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); EmbeddedDataSource remoteDataSource = new EmbeddedDataSource(); @@ -308,7 +335,7 @@ public class AMQ4952Test extends TestCase { jdbc.setDataSource(remoteDataSource); broker.setPersistenceAdapter(jdbc); - //set Policy entries + // set Policy entries PolicyEntry policy = new PolicyEntry(); policy.setQueue(">"); @@ -317,8 +344,7 @@ public class AMQ4952Test extends TestCase { policy.setExpireMessagesPeriod(0); // set replay with no consumers - ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = - new ConditionalNetworkBridgeFilterFactory(); + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); @@ -332,16 +358,14 @@ public class AMQ4952Test extends TestCase { return broker; } - /** - * consumerBroker - * - listens on localhost:2006 + * consumerBroker - listens on localhost:2006 * - * @param deleteMessages - drop messages when broker instance is created + * @param deleteMessages + * - drop messages when broker instance is created * @return * @throws Exception */ - protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception { String scheme = "tcp"; @@ -358,7 +382,7 @@ public class AMQ4952Test extends TestCase { transportConnectors.add(transportConnector); broker.setTransportConnectors(transportConnectors); - //policy entries + // policy entries PolicyEntry policy = new PolicyEntry(); @@ -367,8 +391,7 @@ public class AMQ4952Test extends TestCase { policy.setExpireMessagesPeriod(0); // set replay with no consumers - ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = - new ConditionalNetworkBridgeFilterFactory(); + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); @@ -377,7 +400,6 @@ public class AMQ4952Test extends TestCase { pMap.setDefaultEntry(policy); broker.setDestinationPolicy(pMap); - // Persistence adapter JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter(); EmbeddedDataSource localDataSource = new EmbeddedDataSource(); @@ -388,7 +410,7 @@ public class AMQ4952Test extends TestCase { if (deleteMessages) { // no plugin on restart - broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()}); + broker.setPlugins(new BrokerPlugin[] { new MyTestPlugin() }); } this.localDataSource = localDataSource; @@ -399,7 +421,6 @@ public class AMQ4952Test extends TestCase { return broker; } - /** * Query JDBC Store to see if messages are left * @@ -407,9 +428,7 @@ public class AMQ4952Test extends TestCase { * @return * @throws SQLException */ - - private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) - throws SQLException { + private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException { boolean tableHasData = false; String query = "select * from ACTIVEMQ_MSGS"; @@ -419,8 +438,6 @@ public class AMQ4952Test extends TestCase { ResultSet set = null; - - try { StringBuffer headers = new StringBuffer(); set = s.executeQuery(); @@ -434,7 +451,6 @@ public class AMQ4952Test extends TestCase { } LOG.error(headers.toString()); - while (set.next()) { tableHasData = true; @@ -462,16 +478,16 @@ public class AMQ4952Test extends TestCase { return tableHasData; } - /** - * plugin used to ensure consumerbroker is restared before the network message from producerBroker is acked + * plugin used to ensure consumerbroker is restared before the network + * message from producerBroker is acked */ class MyTestPlugin implements BrokerPlugin { + @Override public Broker installPlugin(Broker broker) throws Exception { return new MyTestBroker(broker); } - } class MyTestBroker extends BrokerFilter { @@ -480,10 +496,11 @@ public class AMQ4952Test extends TestCase { super(next); } + @Override public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception { super.send(producerExchange, messageSend); - LOG.error("Stopping broker on send: " +messageSend.getMessageId().getProducerSequenceId()); + LOG.error("Stopping broker on send: " + messageSend.getMessageId().getProducerSequenceId()); stopConsumerBroker.countDown(); producerExchange.getConnectionContext().setDontSendReponse(true); }