https://issues.apache.org/jira/browse/AMQ-1885 - make tests more deterministic, ensure ioexception handler is used with lease locker to avoid contending masters, the resumption after an error is dependent on a keepAlive win, so the lease expiry tests w/o the io handler is invalid

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1352120 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-06-20 14:14:33 +00:00
parent f3d84aeb1a
commit 32d3fd51b4
6 changed files with 98 additions and 72 deletions

View File

@ -1088,7 +1088,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
/* public void dumpTables(Connection c, String destinationName, String clientId, String /* public static void dumpTables(Connection c, String destinationName, String clientId, String
subscriptionName) throws SQLException { subscriptionName) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
@ -1100,23 +1100,23 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
printQuery(s,System.out); } printQuery(s,System.out); }
public void dumpTables(Connection c) throws SQLException { public static void dumpTables(java.sql.Connection c) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
} }
private void printQuery(Connection c, String query, PrintStream out) private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
throws SQLException { throws SQLException {
printQuery(c.prepareStatement(query), out); printQuery(c.prepareStatement(query), out);
} }
private void printQuery(PreparedStatement s, PrintStream out) private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
throws SQLException { throws SQLException {
ResultSet set = null; ResultSet set = null;
try { try {
set = s.executeQuery(); set = s.executeQuery();
ResultSetMetaData metaData = set.getMetaData(); java.sql.ResultSetMetaData metaData = set.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) { for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (i == 1) if (i == 1)
out.print("||"); out.print("||");

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbRestartJDBCQueueMasterSlaveLeaseIntactTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactTest.class);
@Override
protected void delayTillRestartRequired() {
LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
}
}

View File

@ -20,12 +20,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler; import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseIntactTest { public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.class); private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.class);
private long restartDelay = 500; private long restartDelay = 500;

View File

@ -17,10 +17,14 @@
package org.apache.activemq.broker.ft; package org.apache.activemq.broker.ft;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,9 +50,9 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
@Override @Override
protected void delayTillRestartRequired() { protected void delayTillRestartRequired() {
LOG.info("restart db after lease has expired. While Db is offline, master should stay alive, them lease up for grabs"); LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
try { try {
TimeUnit.MILLISECONDS.sleep(3000); TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -56,10 +60,8 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
@Override @Override
protected void verifyExpectedBroker(int inflightMessageCount) { protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0) { if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName()); assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
} }
// the lock is up for grabs after the expiry
} }
} }

View File

@ -16,16 +16,17 @@
*/ */
package org.apache.activemq.broker.ft; package org.apache.activemq.broker.ft;
import java.sql.SQLException; import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest { public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class); private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
@ -34,7 +35,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
verifyExpectedBroker(inflightMessageCount); verifyExpectedBroker(inflightMessageCount);
if (++inflightMessageCount == failureCount) { if (++inflightMessageCount == failureCount) {
LOG.info("STOPPING DB!@!!!!"); LOG.info("STOPPING DB!@!!!!");
final EmbeddedDataSource ds = getExistingDataSource(); final EmbeddedDataSource ds = ((SyncDataSource)getExistingDataSource()).getDelegate();
ds.setShutdownDatabase("shutdown"); ds.setShutdownDatabase("shutdown");
LOG.info("DB STOPPED!@!!!!"); LOG.info("DB STOPPED!@!!!!");
@ -42,9 +43,6 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
public void run() { public void run() {
delayTillRestartRequired(); delayTillRestartRequired();
ds.setShutdownDatabase("false"); ds.setShutdownDatabase("false");
try {
ds.getConnection().close();
} catch (SQLException ignored) {}
LOG.info("DB RESTARTED!@!!!!"); LOG.info("DB RESTARTED!@!!!!");
} }
}; };
@ -77,7 +75,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
producer.send(producerDestination, message); producer.send(producerDestination, message);
sent = true; sent = true;
} catch (JMSException e) { } catch (JMSException e) {
LOG.info("Exception on producer send:", e); LOG.info("Exception on producer send for: " + message, e);
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
@ -86,4 +84,22 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
} while(!sent); } while(!sent);
} }
} }
@Override
protected Session createReceiveSession(Connection receiveConnection) throws Exception {
return receiveConnection.createSession(true, Session.SESSION_TRANSACTED);
}
@Override
protected void consumeMessage(Message message, List<Message> messageList) {
try {
receiveSession.commit();
super.consumeMessage(message, messageList);
} catch (JMSException e) {
LOG.info("Faild to commit message receipt: " + message, e);
try {
receiveSession.rollback();
} catch (JMSException ignored) {}
}
}
} }

View File

@ -17,10 +17,12 @@
package org.apache.activemq.broker.ft; package org.apache.activemq.broker.ft;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import javax.sql.DataSource;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -31,13 +33,13 @@ import org.apache.activemq.transport.TransportServer;
import org.apache.derby.jdbc.EmbeddedDataSource; import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest { public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
protected EmbeddedDataSource sharedDs; protected DataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001"; protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002"; protected String SLAVE_URL = "tcp://localhost:62002";
protected void setUp() throws Exception { protected void setUp() throws Exception {
// startup db // startup db
sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource(); sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource());
super.setUp(); super.setUp();
} }
@ -97,7 +99,61 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
persistenceAdapter.setLockAcquireSleepInterval(500); persistenceAdapter.setLockAcquireSleepInterval(500);
} }
protected EmbeddedDataSource getExistingDataSource() throws Exception { protected DataSource getExistingDataSource() throws Exception {
return sharedDs; return sharedDs;
} }
// prevent concurrent calls from attempting to create the db at the same time
// can result in "already exists in this jvm" errors
class SyncDataSource implements DataSource {
final EmbeddedDataSource delegate;
SyncDataSource(EmbeddedDataSource dataSource) {
this.delegate = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
synchronized (this) {
return delegate.getConnection();
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
synchronized (this) {
return delegate.getConnection();
}
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return null;
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
}
@Override
public int getLoginTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
EmbeddedDataSource getDelegate() {
return delegate;
}
};
} }