mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5162 - handle io exception in lease database locker start
This commit is contained in:
parent
ec35588e56
commit
d60022ec65
|
@ -83,6 +83,13 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
|
|||
|
||||
} catch (Exception e) {
|
||||
LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e);
|
||||
if (isStopping()) {
|
||||
throw new Exception(
|
||||
"Cannot start broker as being asked to shut down. "
|
||||
+ "Interrupted attempt to acquire lock: "
|
||||
+ e, e);
|
||||
}
|
||||
lockable.getBrokerService().handleIOException(IOExceptionSupport.create(e));
|
||||
} finally {
|
||||
close(statement);
|
||||
close(connection);
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.PrintWriter;
|
|||
import java.sql.SQLException;
|
||||
import java.sql.SQLFeatureNotSupportedException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
||||
|
@ -27,6 +28,7 @@ import junit.framework.TestCase;
|
|||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -47,7 +49,12 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
|
|||
private BrokerService broker;
|
||||
|
||||
protected BrokerService createBroker(boolean withJMX) throws Exception {
|
||||
return createBroker("localhost", withJMX, true, true);
|
||||
}
|
||||
|
||||
protected BrokerService createBroker(String name, boolean withJMX, boolean leaseLocker, boolean startStopConnectors) throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName(name);
|
||||
|
||||
broker.setUseJmx(withJMX);
|
||||
|
||||
|
@ -63,13 +70,16 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
|
|||
jdbc.setDataSource(dataSource);
|
||||
|
||||
jdbc.setLockKeepAlivePeriod(1000l);
|
||||
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
|
||||
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
|
||||
jdbc.setLocker(leaseDatabaseLocker);
|
||||
if (leaseLocker) {
|
||||
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
|
||||
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
|
||||
jdbc.setLocker(leaseDatabaseLocker);
|
||||
}
|
||||
|
||||
broker.setPersistenceAdapter(jdbc);
|
||||
JDBCIOExceptionHandler jdbcioExceptionHandler = new JDBCIOExceptionHandler();
|
||||
jdbcioExceptionHandler.setResumeCheckSleepPeriod(1000l);
|
||||
jdbcioExceptionHandler.setStopStartConnectors(startStopConnectors);
|
||||
broker.setIoExceptionHandler(jdbcioExceptionHandler);
|
||||
String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
|
||||
|
||||
|
@ -92,6 +102,73 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
|
|||
recoverFromDisconnectDB(true);
|
||||
}
|
||||
|
||||
public void testSlaveStoppedLease() throws Exception {
|
||||
testSlaveStopped(true);
|
||||
}
|
||||
|
||||
public void testSlaveStoppedDefault() throws Exception {
|
||||
testSlaveStopped(false);
|
||||
}
|
||||
|
||||
public void testSlaveStopped(final boolean lease) throws Exception {
|
||||
final BrokerService master = createBroker("master", true, lease, false);
|
||||
master.start();
|
||||
master.waitUntilStarted();
|
||||
|
||||
final AtomicReference<BrokerService> slave = new AtomicReference<BrokerService>();
|
||||
|
||||
Thread slaveThread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName("slave");
|
||||
|
||||
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
|
||||
jdbc.setDataSource(dataSource);
|
||||
|
||||
jdbc.setLockKeepAlivePeriod(1000l);
|
||||
|
||||
if (lease) {
|
||||
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
|
||||
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
|
||||
jdbc.setLocker(leaseDatabaseLocker);
|
||||
}
|
||||
|
||||
broker.setPersistenceAdapter(jdbc);
|
||||
JDBCIOExceptionHandler jdbcioExceptionHandler = new JDBCIOExceptionHandler();
|
||||
jdbcioExceptionHandler.setResumeCheckSleepPeriod(1000l);
|
||||
jdbcioExceptionHandler.setStopStartConnectors(false);
|
||||
broker.setIoExceptionHandler(jdbcioExceptionHandler);
|
||||
slave.set(broker);
|
||||
broker.start();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
slaveThread.start();
|
||||
|
||||
Thread.sleep(5000);
|
||||
|
||||
dataSource.stopDB();
|
||||
|
||||
assertTrue("Master hasn't been stopped", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return master.isStopped();
|
||||
}
|
||||
}));
|
||||
|
||||
assertTrue("Slave hasn't been stopped", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return slave.get().isStopped();
|
||||
}
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
public void recoverFromDisconnectDB(boolean withJMX) throws Exception {
|
||||
try {
|
||||
broker = createBroker(withJMX);
|
||||
|
|
Loading…
Reference in New Issue