mirror of https://github.com/apache/activemq.git
resolve https://issues.apache.org/activemq/browse/AMQ-2580 - patch was good for AMQ store, fix needed for kahaDB and JDBC now done, test case was great, thanks.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@967134 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
623da6ffab
commit
f206a1bd11
|
@ -46,10 +46,11 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
|
|||
|
||||
public boolean recoverMessage(Message message) throws Exception {
|
||||
if (listener.hasSpace()) {
|
||||
listener.recoverMessage(message);
|
||||
lastRecovered = message.getMessageId();
|
||||
count++;
|
||||
return true;
|
||||
if (listener.recoverMessage(message)) {
|
||||
lastRecovered = message.getMessageId();
|
||||
count++;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -108,10 +108,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
if (listener.hasSpace()) {
|
||||
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
||||
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
||||
listener.recoverMessage(msg);
|
||||
finalLast.set(sequenceId);
|
||||
finalPriority.set(msg.getPriority());
|
||||
return true;
|
||||
if (listener.recoverMessage(msg)) {
|
||||
finalLast.set(sequenceId);
|
||||
finalPriority.set(msg.getPriority());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -451,7 +451,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
} else {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
|
||||
}
|
||||
s.setMaxRows(maxReturned);
|
||||
// no set max rows as selectors may need to scan more than maxReturned messages to get what they need
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
s.setString(2, clientId);
|
||||
s.setString(3, subscriptionName);
|
||||
|
@ -466,16 +466,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
while (rs.next() && count < maxReturned) {
|
||||
if (listener.recoverMessageReference(rs.getString(1))) {
|
||||
count++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while (rs.next() && count < maxReturned) {
|
||||
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
|
||||
count++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -305,8 +305,6 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
if (recoverReference(listener, msg)) {
|
||||
count++;
|
||||
container.setBatchEntry(msg.getMessageId(), entry);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
container.reset();
|
||||
|
|
|
@ -798,8 +798,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
|
||||
.hasNext();) {
|
||||
entry = iterator.next();
|
||||
listener.recoverMessage(loadMessage(entry.getValue().location));
|
||||
counter++;
|
||||
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
|
||||
counter++;
|
||||
}
|
||||
if (counter >= maxReturned || listener.hasSpace() == false) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -47,14 +47,6 @@ public class JmsTopicSelectorTest extends TestSupport {
|
|||
protected boolean durable;
|
||||
protected int deliveryMode = DeliveryMode.PERSISTENT;
|
||||
|
||||
public JmsTopicSelectorTest() {
|
||||
super();
|
||||
}
|
||||
|
||||
public JmsTopicSelectorTest(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package org.apache.activemq;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Enumeration;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
@ -34,6 +36,10 @@ import org.apache.activemq.command.ActiveMQDestination;
|
|||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -42,18 +48,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
*
|
||||
* @version $Revision: 1.5 $
|
||||
*/
|
||||
public class TestSupport extends TestCase {
|
||||
public abstract class TestSupport extends CombinationTestSupport {
|
||||
|
||||
protected ActiveMQConnectionFactory connectionFactory;
|
||||
protected boolean topic = true;
|
||||
|
||||
public TestSupport() {
|
||||
super();
|
||||
}
|
||||
|
||||
public TestSupport(String name) {
|
||||
super(name);
|
||||
}
|
||||
public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
|
||||
|
||||
protected ActiveMQMessage createMessage() {
|
||||
return new ActiveMQMessage();
|
||||
|
@ -173,4 +172,28 @@ public class TestSupport extends TestCase {
|
|||
regionBroker.getQueueRegion().getDestinationMap() :
|
||||
regionBroker.getTopicRegion().getDestinationMap();
|
||||
}
|
||||
|
||||
public static enum PersistenceAdapterChoice {KahaDB, AMQ, JDBC };
|
||||
|
||||
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
|
||||
return setPersistenceAdapter(broker, defaultPersistenceAdapter);
|
||||
}
|
||||
|
||||
public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
|
||||
PersistenceAdapter adapter = null;
|
||||
switch (choice) {
|
||||
case AMQ:
|
||||
adapter = new AMQPersistenceAdapter();
|
||||
break;
|
||||
case JDBC:
|
||||
adapter = new JDBCPersistenceAdapter();
|
||||
break;
|
||||
case KahaDB:
|
||||
adapter = new KahaDBPersistenceAdapter();
|
||||
break;
|
||||
}
|
||||
broker.setPersistenceAdapter(adapter);
|
||||
return adapter;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import junit.framework.Test;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicConnection;
|
||||
import javax.jms.TopicSession;
|
||||
|
||||
public class AMQ2580Test extends TestSupport {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AMQ2580Test.class);
|
||||
|
||||
private static final String TOPIC_NAME = "topicName";
|
||||
private static final String CLIENT_ID = "client_id";
|
||||
private static final String textOfSelectedMsg = "good_message";
|
||||
|
||||
protected TopicConnection connection;
|
||||
|
||||
private Topic topic;
|
||||
private Session session;
|
||||
private MessageProducer producer;
|
||||
private ConnectionFactory connectionFactory;
|
||||
private TopicConnection topicConnection;
|
||||
private BrokerService service;
|
||||
|
||||
public static Test suite() {
|
||||
return suite(AMQ2580Test.class);
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
initDurableBroker();
|
||||
initConnectionFactory();
|
||||
initTopic();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
shutdownClient();
|
||||
service.stop();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
private void initConnection() throws JMSException {
|
||||
if (connection == null) {
|
||||
LOG.info("Initializing connection");
|
||||
|
||||
connection = (TopicConnection) connectionFactory.createConnection();
|
||||
connection.start();
|
||||
}
|
||||
}
|
||||
|
||||
public void initCombosForTestTopicIsDurableSmokeTest() throws Exception {
|
||||
addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
|
||||
}
|
||||
|
||||
public void testTopicIsDurableSmokeTest() throws Exception {
|
||||
|
||||
initClient();
|
||||
MessageConsumer consumer = createMessageConsumer();
|
||||
LOG.info("Consuming message");
|
||||
assertNull(consumer.receive(1));
|
||||
shutdownClient();
|
||||
consumer.close();
|
||||
|
||||
sendMessages();
|
||||
shutdownClient();
|
||||
|
||||
initClient();
|
||||
consumer = createMessageConsumer();
|
||||
|
||||
LOG.info("Consuming message");
|
||||
TextMessage answer1 = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull("we got our message", answer1);
|
||||
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
private MessageConsumer createMessageConsumer() throws JMSException {
|
||||
LOG.info("creating durable subscriber");
|
||||
return session.createDurableSubscriber(topic,
|
||||
TOPIC_NAME,
|
||||
"name='value'",
|
||||
false);
|
||||
}
|
||||
|
||||
private void initClient() throws JMSException {
|
||||
LOG.info("Initializing client");
|
||||
|
||||
initConnection();
|
||||
initSession();
|
||||
}
|
||||
|
||||
private void shutdownClient()
|
||||
throws JMSException {
|
||||
LOG.info("Closing session and connection");
|
||||
session.close();
|
||||
connection.close();
|
||||
session = null;
|
||||
connection = null;
|
||||
}
|
||||
|
||||
private void sendMessages()
|
||||
throws JMSException {
|
||||
initConnection();
|
||||
|
||||
initSession();
|
||||
|
||||
LOG.info("Creating producer");
|
||||
producer = session.createProducer(topic);
|
||||
|
||||
sendMessageThatFailsSelection();
|
||||
|
||||
sendMessage(textOfSelectedMsg, "value");
|
||||
}
|
||||
|
||||
private void initSession() throws JMSException {
|
||||
LOG.info("Initializing session");
|
||||
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
private void sendMessageThatFailsSelection() throws JMSException {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
String textOfNotSelectedMsg = "Msg_" + i;
|
||||
sendMessage(textOfNotSelectedMsg, "not_value");
|
||||
LOG.info("#");
|
||||
}
|
||||
}
|
||||
|
||||
private void sendMessage(
|
||||
String msgText,
|
||||
String propertyValue) throws JMSException {
|
||||
LOG.info("Creating message: " + msgText);
|
||||
TextMessage messageToSelect = session.createTextMessage(msgText);
|
||||
messageToSelect.setStringProperty("name", propertyValue);
|
||||
LOG.info("Sending message");
|
||||
producer.send(messageToSelect);
|
||||
}
|
||||
|
||||
protected void initConnectionFactory() throws Exception {
|
||||
ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
|
||||
connectionFactory = activeMqConnectionFactory;
|
||||
}
|
||||
|
||||
|
||||
private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception {
|
||||
ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(
|
||||
"failover:" + service.getTransportConnectors().get(0).getConnectUri().toString());
|
||||
activeMqConnectionFactory.setWatchTopicAdvisories(false);
|
||||
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
|
||||
prefetchPolicy.setDurableTopicPrefetch(2);
|
||||
prefetchPolicy.setOptimizeDurableTopicPrefetch(2);
|
||||
activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy);
|
||||
activeMqConnectionFactory.setClientID(CLIENT_ID);
|
||||
return activeMqConnectionFactory;
|
||||
}
|
||||
|
||||
private void initDurableBroker() throws Exception {
|
||||
service = new BrokerService();
|
||||
setDefaultPersistenceAdapter(service);
|
||||
service.setDeleteAllMessagesOnStartup(true);
|
||||
service.setAdvisorySupport(false);
|
||||
service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"});
|
||||
service.setPersistent(true);
|
||||
service.setUseJmx(false);
|
||||
service.start();
|
||||
|
||||
}
|
||||
|
||||
private void initTopic() throws JMSException {
|
||||
topicConnection = (TopicConnection) connectionFactory.createConnection();
|
||||
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
topic = topicSession.createTopic(TOPIC_NAME);
|
||||
}
|
||||
}
|
|
@ -16,20 +16,24 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.failover;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import junit.framework.Test;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.AutoFailTestSupport;
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ConsumerBrokerExchange;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.SocketProxy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -42,108 +46,90 @@ import javax.jms.ServerSessionPool;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.TransactionRolledBackException;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.AutoFailTestSupport;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ConsumerBrokerExchange;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.SocketProxy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import java.net.URI;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
// see https://issues.apache.org/activemq/browse/AMQ-2473
|
||||
|
||||
// https://issues.apache.org/activemq/browse/AMQ-2590
|
||||
public class FailoverTransactionTest {
|
||||
public class FailoverTransactionTest extends TestSupport {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
|
||||
private static final String QUEUE_NAME = "FailoverWithTx";
|
||||
private String url = "tcp://localhost:61616";
|
||||
BrokerService broker;
|
||||
private static final String QUEUE_NAME = "FailoverWithTx";
|
||||
private String url = "tcp://localhost:61616";
|
||||
BrokerService broker;
|
||||
|
||||
public void startCleanBroker() throws Exception {
|
||||
startBroker(true);
|
||||
}
|
||||
public static Test suite() {
|
||||
return suite(FailoverTransactionTest.class);
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
public void startCleanBroker() throws Exception {
|
||||
startBroker(true);
|
||||
}
|
||||
|
||||
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
|
||||
broker = createBroker(deleteAllMessagesOnStartup);
|
||||
public void tearDown() throws Exception {
|
||||
stopBroker();
|
||||
}
|
||||
|
||||
public void stopBroker() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
|
||||
broker = createBroker(deleteAllMessagesOnStartup);
|
||||
broker.start();
|
||||
}
|
||||
}
|
||||
|
||||
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setUseJmx(false);
|
||||
broker.setAdvisorySupport(false);
|
||||
broker.addConnector(url);
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
|
||||
return broker;
|
||||
}
|
||||
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setUseJmx(false);
|
||||
broker.setAdvisorySupport(false);
|
||||
broker.addConnector(url);
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
|
||||
return broker;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
|
||||
startCleanBroker();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
|
||||
startCleanBroker();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
produceMessage(session, destination);
|
||||
produceMessage(session, destination);
|
||||
|
||||
// restart to force failover and connection state recovery before the commit
|
||||
broker.stop();
|
||||
startBroker(false);
|
||||
// restart to force failover and connection state recovery before the commit
|
||||
broker.stop();
|
||||
startBroker(false);
|
||||
|
||||
session.commit();
|
||||
assertNotNull("we got the message", consumer.receive(20000));
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverCommitReplyLostAMQ() throws Exception {
|
||||
doTestFailoverCommitReplyLost(0);
|
||||
session.commit();
|
||||
assertNotNull("we got the message", consumer.receive(20000));
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverCommitReplyLostJdbc() throws Exception {
|
||||
doTestFailoverCommitReplyLost(1);
|
||||
public void initCombosForTestFailoverCommitReplyLost() {
|
||||
addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverCommitReplyLostKahaDB() throws Exception {
|
||||
doTestFailoverCommitReplyLost(2);
|
||||
}
|
||||
|
||||
public void doTestFailoverCommitReplyLost(final int adapter) throws Exception {
|
||||
public void testFailoverCommitReplyLost() throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
|
||||
broker.setPlugins(new BrokerPlugin[] {
|
||||
broker.setPlugins(new BrokerPlugin[]{
|
||||
new BrokerPluginSupport() {
|
||||
@Override
|
||||
public void commitTransaction(ConnectionContext context,
|
||||
TransactionId xid, boolean onePhase) throws Exception {
|
||||
TransactionId xid, boolean onePhase) throws Exception {
|
||||
super.commitTransaction(context, xid, onePhase);
|
||||
// so commit will hang as if reply is lost
|
||||
context.setDontSendReponse(true);
|
||||
|
@ -157,7 +143,7 @@ public class FailoverTransactionTest {
|
|||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
broker.start();
|
||||
|
@ -190,7 +176,7 @@ public class FailoverTransactionTest {
|
|||
// will be stopped by the plugin
|
||||
broker.waitUntilStopped();
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
||||
|
@ -210,7 +196,7 @@ public class FailoverTransactionTest {
|
|||
|
||||
LOG.info("Checking for remaining/hung messages..");
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
// after restart, ensure no dangling messages
|
||||
|
@ -229,31 +215,24 @@ public class FailoverTransactionTest {
|
|||
}
|
||||
|
||||
|
||||
//@Test not implemented
|
||||
public void testFailoverSendReplyLostAMQ() throws Exception {
|
||||
doTestFailoverSendReplyLost(0);
|
||||
public void initCombosForTestFailoverSendReplyLost() {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB,
|
||||
PersistenceAdapterChoice.JDBC
|
||||
// not implemented for AMQ store
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverSendReplyLostJdbc() throws Exception {
|
||||
doTestFailoverSendReplyLost(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverSendReplyLostKahaDB() throws Exception {
|
||||
doTestFailoverSendReplyLost(2);
|
||||
}
|
||||
|
||||
public void doTestFailoverSendReplyLost(final int adapter) throws Exception {
|
||||
public void testFailoverSendReplyLost() throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
|
||||
broker.setPlugins(new BrokerPlugin[] {
|
||||
broker.setPlugins(new BrokerPlugin[]{
|
||||
new BrokerPluginSupport() {
|
||||
@Override
|
||||
public void send(ProducerBrokerExchange producerExchange,
|
||||
org.apache.activemq.command.Message messageSend)
|
||||
org.apache.activemq.command.Message messageSend)
|
||||
throws Exception {
|
||||
// so send will hang as if reply is lost
|
||||
super.send(producerExchange, messageSend);
|
||||
|
@ -300,7 +279,7 @@ public class FailoverTransactionTest {
|
|||
// will be stopped by the plugin
|
||||
broker.waitUntilStopped();
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
LOG.info("restarting....");
|
||||
broker.start();
|
||||
|
||||
|
@ -315,8 +294,8 @@ public class FailoverTransactionTest {
|
|||
connection.close();
|
||||
|
||||
// verify stats
|
||||
assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
|
||||
assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
|
||||
assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
|
||||
assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
|
||||
|
||||
// ensure no dangling messages with fresh broker etc
|
||||
broker.stop();
|
||||
|
@ -324,7 +303,7 @@ public class FailoverTransactionTest {
|
|||
|
||||
LOG.info("Checking for remaining/hung messages with second restart..");
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
// after restart, ensure no dangling messages
|
||||
|
@ -342,35 +321,37 @@ public class FailoverTransactionTest {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
// not implemented.. @Test
|
||||
public void testFailoverConnectionSendReplyLostAMQ() throws Exception {
|
||||
doTestFailoverConnectionSendReplyLost(0);
|
||||
|
||||
public void initCombosForTestFailoverConnectionSendReplyLost() {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB,
|
||||
PersistenceAdapterChoice.JDBC
|
||||
// last producer message id store feature not implemented for AMQ store
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverConnectionSendReplyLostJdbc() throws Exception {
|
||||
doTestFailoverConnectionSendReplyLost(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverConnectionSendReplyLostKahaDB() throws Exception {
|
||||
doTestFailoverConnectionSendReplyLost(2);
|
||||
}
|
||||
|
||||
public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception {
|
||||
public void testFailoverConnectionSendReplyLost() throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
setPersistenceAdapter(adapter);
|
||||
PersistenceAdapter store = setDefaultPersistenceAdapter(broker);
|
||||
if (store instanceof KahaDBPersistenceAdapter) {
|
||||
// duplicate checker not updated on canceled tasks, even it
|
||||
// it was, recovery of the audit would fail as the message is
|
||||
// not recorded in the store and the audit may not be up to date.
|
||||
// So if duplicate messages are a absolute no no after restarts,
|
||||
// ConcurrentStoreAndDispatchQueues must be disabled
|
||||
((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false);
|
||||
}
|
||||
|
||||
final SocketProxy proxy = new SocketProxy();
|
||||
|
||||
broker.setPlugins(new BrokerPlugin[] {
|
||||
broker.setPlugins(new BrokerPlugin[]{
|
||||
new BrokerPluginSupport() {
|
||||
private boolean firstSend = true;
|
||||
|
||||
@Override
|
||||
public void send(ProducerBrokerExchange producerExchange,
|
||||
org.apache.activemq.command.Message messageSend)
|
||||
org.apache.activemq.command.Message messageSend)
|
||||
throws Exception {
|
||||
// so send will hang as if reply is lost
|
||||
super.send(producerExchange, messageSend);
|
||||
|
@ -435,7 +416,7 @@ public class FailoverTransactionTest {
|
|||
connection.close();
|
||||
|
||||
// verify stats, connection dup suppression means dups don't get to broker
|
||||
assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
|
||||
assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
|
||||
|
||||
// ensure no dangling messages with fresh broker etc
|
||||
broker.stop();
|
||||
|
@ -443,7 +424,7 @@ public class FailoverTransactionTest {
|
|||
|
||||
LOG.info("Checking for remaining/hung messages with restart..");
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
// after restart, ensure no dangling messages
|
||||
|
@ -461,87 +442,60 @@ public class FailoverTransactionTest {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
|
||||
startCleanBroker();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
produceMessage(session, destination);
|
||||
|
||||
private void setPersistenceAdapter(int adapter) throws IOException {
|
||||
switch (adapter) {
|
||||
case 0:
|
||||
broker.setPersistenceAdapter(new AMQPersistenceAdapter());
|
||||
break;
|
||||
case 1:
|
||||
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
||||
break;
|
||||
case 2:
|
||||
KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
|
||||
// duplicate checker not updated on canceled tasks, even it
|
||||
// it was, reovery of the audit would fail as the message is
|
||||
// not recorded in the store and the audit may not be up to date.
|
||||
// So if duplicate are a nono (w.r.t stats), this must be disabled
|
||||
store.setConcurrentStoreAndDispatchQueues(false);
|
||||
store.setMaxFailoverProducersToTrack(10);
|
||||
store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
|
||||
broker.setPersistenceAdapter(store);
|
||||
break;
|
||||
}
|
||||
// restart to force failover and connection state recovery before the commit
|
||||
broker.stop();
|
||||
startBroker(false);
|
||||
|
||||
session.commit();
|
||||
|
||||
// without tracking producers, message will not be replayed on recovery
|
||||
assertNull("we got the message", consumer.receive(5000));
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
|
||||
startCleanBroker();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
|
||||
startCleanBroker();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
produceMessage(session, destination);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer;
|
||||
TextMessage message;
|
||||
final int count = 10;
|
||||
for (int i = 0; i < count; i++) {
|
||||
producer = session.createProducer(destination);
|
||||
message = session.createTextMessage("Test message: " + count);
|
||||
producer.send(message);
|
||||
producer.close();
|
||||
}
|
||||
|
||||
// restart to force failover and connection state recovery before the commit
|
||||
broker.stop();
|
||||
startBroker(false);
|
||||
// restart to force failover and connection state recovery before the commit
|
||||
broker.stop();
|
||||
startBroker(false);
|
||||
|
||||
session.commit();
|
||||
session.commit();
|
||||
for (int i = 0; i < count; i++) {
|
||||
assertNotNull("we got all the message: " + count, consumer.receive(20000));
|
||||
}
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
// without tracking producers, message will not be replayed on recovery
|
||||
assertNull("we got the message", consumer.receive(5000));
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
|
||||
startCleanBroker();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer;
|
||||
TextMessage message;
|
||||
final int count = 10;
|
||||
for (int i=0; i<count; i++) {
|
||||
producer = session.createProducer(destination);
|
||||
message = session.createTextMessage("Test message: " + count);
|
||||
producer.send(message);
|
||||
producer.close();
|
||||
}
|
||||
|
||||
// restart to force failover and connection state recovery before the commit
|
||||
broker.stop();
|
||||
startBroker(false);
|
||||
|
||||
session.commit();
|
||||
for (int i=0; i<count; i++) {
|
||||
assertNotNull("we got all the message: " + count, consumer.receive(20000));
|
||||
}
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
// https://issues.apache.org/activemq/browse/AMQ-2772
|
||||
public void testFailoverWithConnectionConsumer() throws Exception {
|
||||
startCleanBroker();
|
||||
|
@ -560,6 +514,7 @@ public class FailoverTransactionTest {
|
|||
public Session getSession() throws JMSException {
|
||||
return poolSession;
|
||||
}
|
||||
|
||||
public void start() throws JMSException {
|
||||
connectionConsumerGotOne.countDown();
|
||||
poolSession.run();
|
||||
|
@ -572,7 +527,7 @@ public class FailoverTransactionTest {
|
|||
MessageProducer producer;
|
||||
TextMessage message;
|
||||
final int count = 10;
|
||||
for (int i=0; i<count; i++) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
producer = session.createProducer(destination);
|
||||
message = session.createTextMessage("Test message: " + count);
|
||||
producer.send(message);
|
||||
|
@ -584,7 +539,7 @@ public class FailoverTransactionTest {
|
|||
startBroker(false);
|
||||
|
||||
session.commit();
|
||||
for (int i=0; i<count-1; i++) {
|
||||
for (int i = 0; i < count - 1; i++) {
|
||||
assertNotNull("we got all the message: " + count, consumer.receive(20000));
|
||||
}
|
||||
session.commit();
|
||||
|
@ -593,10 +548,9 @@ public class FailoverTransactionTest {
|
|||
assertTrue("connectionconsumer got a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverConsumerAckLost() throws Exception {
|
||||
// as failure depends on hash order of state tracker recovery, do a few times
|
||||
for (int i=0; i<3; i++) {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
try {
|
||||
doTestFailoverConsumerAckLost(i);
|
||||
} finally {
|
||||
|
@ -606,11 +560,10 @@ public class FailoverTransactionTest {
|
|||
}
|
||||
|
||||
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
|
||||
final int adapter = 0;
|
||||
broker = createBroker(true);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
|
||||
broker.setPlugins(new BrokerPlugin[] {
|
||||
broker.setPlugins(new BrokerPlugin[]{
|
||||
new BrokerPluginSupport() {
|
||||
|
||||
// broker is killed on delivered ack as prefetch is 1
|
||||
|
@ -622,7 +575,7 @@ public class FailoverTransactionTest {
|
|||
consumerExchange.getConnectionContext().setDontSendReponse(true);
|
||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||
public void run() {
|
||||
LOG.info("Stopping broker on ack: " + ack);
|
||||
LOG.info("Stopping broker on ack: " + ack);
|
||||
try {
|
||||
broker.stop();
|
||||
} catch (Exception e) {
|
||||
|
@ -712,7 +665,7 @@ public class FailoverTransactionTest {
|
|||
// will be stopped by the plugin
|
||||
broker.waitUntilStopped();
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
||||
|
@ -739,7 +692,7 @@ public class FailoverTransactionTest {
|
|||
consumerSession2.commit();
|
||||
}
|
||||
|
||||
for (Connection c: connections) {
|
||||
for (Connection c : connections) {
|
||||
c.close();
|
||||
}
|
||||
|
||||
|
@ -749,7 +702,7 @@ public class FailoverTransactionTest {
|
|||
|
||||
LOG.info("Checking for remaining/hung messages..");
|
||||
broker = createBroker(false);
|
||||
setPersistenceAdapter(adapter);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
// after restart, ensure no dangling messages
|
||||
|
@ -767,7 +720,6 @@ public class FailoverTransactionTest {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
|
||||
broker = createBroker(true);
|
||||
broker.start();
|
||||
|
@ -787,7 +739,7 @@ public class FailoverTransactionTest {
|
|||
broker.stop();
|
||||
broker = createBroker(false);
|
||||
// use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
|
||||
setPersistenceAdapter(1);
|
||||
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
|
||||
broker.start();
|
||||
|
||||
try {
|
||||
|
@ -805,7 +757,6 @@ public class FailoverTransactionTest {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWaitForMissingRedeliveries() throws Exception {
|
||||
LOG.info("testWaitForMissingRedeliveries()");
|
||||
broker = createBroker(true);
|
||||
|
@ -828,7 +779,7 @@ public class FailoverTransactionTest {
|
|||
broker.stop();
|
||||
broker = createBroker(false);
|
||||
// use empty jdbc store so that wait for re-deliveries occur when failover resumes
|
||||
setPersistenceAdapter(1);
|
||||
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
|
||||
broker.start();
|
||||
|
||||
final CountDownLatch commitDone = new CountDownLatch(1);
|
||||
|
@ -848,14 +799,13 @@ public class FailoverTransactionTest {
|
|||
broker = createBroker(false);
|
||||
broker.start();
|
||||
|
||||
assertTrue("commit was successfull", commitDone.await(30, TimeUnit.SECONDS));
|
||||
assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
|
||||
|
||||
assertNull("should not get committed message", consumer.receive(5000));
|
||||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPoisonOnDeliveryWhilePending() throws Exception {
|
||||
LOG.info("testWaitForMissingRedeliveries()");
|
||||
broker = createBroker(true);
|
||||
|
|
Loading…
Reference in New Issue