Add missing license header

This commit is contained in:
Timothy Bish 2014-01-17 14:34:11 -05:00
parent e7703f70e0
commit 9b00a0947e
1 changed files with 98 additions and 81 deletions

View File

@ -1,12 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.*;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
@ -15,49 +62,41 @@ import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.jms.Connection;
import javax.sql.DataSource;
import java.net.URI;
import java.sql.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
/**
* Test creates a broker network with two brokers -
* producerBroker (with a message producer attached) and consumerBroker (with consumer attached)
* Test creates a broker network with two brokers - producerBroker (with a
* message producer attached) and consumerBroker (with consumer attached)
* <p/>
* Simulates network duplicate message by stopping and restarting the consumerBroker after message (with message ID ending in
* 120) is persisted to consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the network connection.
* When the network connection is reestablished the producerBroker resends
* message (with messageID ending in 120).
* Simulates network duplicate message by stopping and restarting the
* consumerBroker after message (with message ID ending in 120) is persisted to
* consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the
* network connection. When the network connection is reestablished the
* producerBroker resends message (with messageID ending in 120).
* <p/>
* Expectation:
* <p/>
* With the following policy entries set, would expect the duplicate message to be read from the store
* and dispatched to the consumer - where the duplicate could be detected by consumer.
* With the following policy entries set, would expect the duplicate message to
* be read from the store and dispatched to the consumer - where the duplicate
* could be detected by consumer.
* <p/>
* PolicyEntry policy = new PolicyEntry();
* policy.setQueue(">");
* policy.setEnableAudit(false);
* policy.setUseCache(false);
* PolicyEntry policy = new PolicyEntry(); policy.setQueue(">");
* policy.setEnableAudit(false); policy.setUseCache(false);
* policy.setExpireMessagesPeriod(0);
* <p/>
* <p/>
* Note 1: Network needs to use replaywhenNoConsumers so enabling the networkAudit to avoid this scenario is not feasible.
* Note 1: Network needs to use replaywhenNoConsumers so enabling the
* networkAudit to avoid this scenario is not feasible.
* <p/>
* NOTE 2: Added a custom plugin to the consumerBroker so that the consumerBroker shutdown will occur after a message has been
* persisted to consumerBroker store but before an ACK is sent back to ProducerBroker. This is just a hack to ensure producerBroker will resend
* the message after shutdown.
* NOTE 2: Added a custom plugin to the consumerBroker so that the
* consumerBroker shutdown will occur after a message has been persisted to
* consumerBroker store but before an ACK is sent back to ProducerBroker. This
* is just a hack to ensure producerBroker will resend the message after
* shutdown.
*/
@RunWith(value = Parameterized.class)
@ -81,9 +120,9 @@ public class AMQ4952Test extends TestCase {
@Parameterized.Parameter(0)
public boolean enableCursorAudit;
@Parameterized.Parameters(name="enableAudit={0}")
@Parameterized.Parameters(name = "enableAudit={0}")
public static Iterable<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{{Boolean.TRUE},{Boolean.FALSE}});
return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } });
}
@Test
@ -106,7 +145,6 @@ public class AMQ4952Test extends TestCase {
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME);
while (true) {
TextMessage textMsg = (TextMessage) messageConsumer.receive(5000);
@ -117,14 +155,14 @@ public class AMQ4952Test extends TestCase {
receivedMessageCount++;
LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID());
// on first delivery ensure the message is pending an ack when it is resent from the producer broker
// on first delivery ensure the message is pending an
// ack when it is resent from the producer broker
if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) {
LOG.info("Waiting for restart...");
consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS);
}
textMsg.acknowledge();
}
} finally {
consumerConnection.close();
@ -133,21 +171,20 @@ public class AMQ4952Test extends TestCase {
};
Runnable consumerBrokerResetTask = new Runnable() {
@Override
public void run() {
try {
// wait for signal
stopConsumerBroker.await();
LOG.info("********* STOPPING CONSUMER BROKER");
consumerBroker.stop();
consumerBroker.waitUntilStopped();
LOG.info("***** STARTING CONSUMER BROKER");
// do not delete messages on startup
// do not delete messages on startup
consumerBroker = createConsumerBroker(false);
LOG.info("***** CONSUMER BROKER STARTED!!");
@ -162,28 +199,24 @@ public class AMQ4952Test extends TestCase {
}));
consumerRestartedAndMessageForwarded.countDown();
} catch (Exception e) {
LOG.error("Exception when stopping/starting the consumerBroker ", e);
}
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
//start consumerBroker start/stop task
// start consumerBroker start/stop task
executor.execute(consumerBrokerResetTask);
//start consuming messages
// start consuming messages
Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask);
produceMessages();
//Wait for consumer to finish
// Wait for consumer to finish
int totalMessagesConsumed = numberOfConsumedMessage.get();
StringBuffer contents = new StringBuffer();
@ -193,7 +226,6 @@ public class AMQ4952Test extends TestCase {
assertEquals("number of messages received", 2, totalMessagesConsumed);
assertEquals("messages left in store", true, messageInStore);
assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ"));
}
private void produceMessages() throws JMSException {
@ -253,20 +285,16 @@ public class AMQ4952Test extends TestCase {
consumerBroker = createConsumerBroker(true);
}
/**
* Producer broker
* listens on localhost:2003
* networks to consumerBroker - localhost:2006
* Producer broker listens on localhost:2003 networks to consumerBroker -
* localhost:2006
*
* @return
* @throws Exception
*/
protected BrokerService createProducerBroker() throws Exception {
String networkToPorts[] = new String[]{"2006"};
String networkToPorts[] = new String[] { "2006" };
HashMap<String, String> networkProps = new HashMap<String, String>();
networkProps.put("networkTTL", "10");
@ -287,8 +315,7 @@ public class AMQ4952Test extends TestCase {
transportConnectors.add(transportConnector);
broker.setTransportConnectors(transportConnectors);
//network to consumerBroker
// network to consumerBroker
if (networkToPorts != null && networkToPorts.length > 0) {
StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
@ -296,10 +323,10 @@ public class AMQ4952Test extends TestCase {
if (networkProps != null) {
IntrospectionSupport.setProperties(nc, networkProps);
}
nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue[]{QUEUE_NAME}));
nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination> asList(new ActiveMQQueue[] { QUEUE_NAME }));
}
//Persistence adapter
// Persistence adapter
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
@ -308,7 +335,7 @@ public class AMQ4952Test extends TestCase {
jdbc.setDataSource(remoteDataSource);
broker.setPersistenceAdapter(jdbc);
//set Policy entries
// set Policy entries
PolicyEntry policy = new PolicyEntry();
policy.setQueue(">");
@ -317,8 +344,7 @@ public class AMQ4952Test extends TestCase {
policy.setExpireMessagesPeriod(0);
// set replay with no consumers
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
new ConditionalNetworkBridgeFilterFactory();
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
@ -332,16 +358,14 @@ public class AMQ4952Test extends TestCase {
return broker;
}
/**
* consumerBroker
* - listens on localhost:2006
* consumerBroker - listens on localhost:2006
*
* @param deleteMessages - drop messages when broker instance is created
* @param deleteMessages
* - drop messages when broker instance is created
* @return
* @throws Exception
*/
protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception {
String scheme = "tcp";
@ -358,7 +382,7 @@ public class AMQ4952Test extends TestCase {
transportConnectors.add(transportConnector);
broker.setTransportConnectors(transportConnectors);
//policy entries
// policy entries
PolicyEntry policy = new PolicyEntry();
@ -367,8 +391,7 @@ public class AMQ4952Test extends TestCase {
policy.setExpireMessagesPeriod(0);
// set replay with no consumers
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
new ConditionalNetworkBridgeFilterFactory();
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
@ -377,7 +400,6 @@ public class AMQ4952Test extends TestCase {
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
// Persistence adapter
JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
EmbeddedDataSource localDataSource = new EmbeddedDataSource();
@ -388,7 +410,7 @@ public class AMQ4952Test extends TestCase {
if (deleteMessages) {
// no plugin on restart
broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
broker.setPlugins(new BrokerPlugin[] { new MyTestPlugin() });
}
this.localDataSource = localDataSource;
@ -399,7 +421,6 @@ public class AMQ4952Test extends TestCase {
return broker;
}
/**
* Query JDBC Store to see if messages are left
*
@ -407,9 +428,7 @@ public class AMQ4952Test extends TestCase {
* @return
* @throws SQLException
*/
private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer)
throws SQLException {
private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
boolean tableHasData = false;
String query = "select * from ACTIVEMQ_MSGS";
@ -419,8 +438,6 @@ public class AMQ4952Test extends TestCase {
ResultSet set = null;
try {
StringBuffer headers = new StringBuffer();
set = s.executeQuery();
@ -434,7 +451,6 @@ public class AMQ4952Test extends TestCase {
}
LOG.error(headers.toString());
while (set.next()) {
tableHasData = true;
@ -462,16 +478,16 @@ public class AMQ4952Test extends TestCase {
return tableHasData;
}
/**
* plugin used to ensure consumerbroker is restared before the network message from producerBroker is acked
* plugin used to ensure consumerbroker is restared before the network
* message from producerBroker is acked
*/
class MyTestPlugin implements BrokerPlugin {
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new MyTestBroker(broker);
}
}
class MyTestBroker extends BrokerFilter {
@ -480,10 +496,11 @@ public class AMQ4952Test extends TestCase {
super(next);
}
@Override
public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception {
super.send(producerExchange, messageSend);
LOG.error("Stopping broker on send: " +messageSend.getMessageId().getProducerSequenceId());
LOG.error("Stopping broker on send: " + messageSend.getMessageId().getProducerSequenceId());
stopConsumerBroker.countDown();
producerExchange.getConnectionContext().setDontSendReponse(true);
}