https://issues.apache.org/jira/browse/AMQ-1780 - ActiveMQ broker does not automatically reconnect if the connection to the database is lost - extend the DefaultIOExceptionHandler http://activemq.apache.org/configurable-ioexception-handling.html to be aware of sql exceptions and provide a connector stop/resume option. This is now called in the event of a failure to get a jdbc connection, the default behaviour is to stop as before, but new iptions to ignore some sql exceptions or to stop/resume connectors are supported which allow a db restart to be recovered without restarting the broker

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1086182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-28 11:00:06 +00:00
parent 53f789b475
commit 41506505f4
7 changed files with 313 additions and 16 deletions

View File

@ -1639,7 +1639,7 @@ public class BrokerService implements Service {
} }
} }
protected void stopAllConnectors(ServiceStopper stopper) { public void stopAllConnectors(ServiceStopper stopper) {
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next(); NetworkConnector connector = iter.next();
unregisterNetworkConnectorMBean(connector); unregisterNetworkConnectorMBean(connector);
@ -2063,7 +2063,7 @@ public class BrokerService implements Service {
* *
* @throws Exception * @throws Exception
*/ */
protected void startAllConnectors() throws Exception { public void startAllConnectors() throws Exception {
if (!isSlave()) { if (!isSlave()) {
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>(); List<TransportConnector> al = new ArrayList<TransportConnector>();
@ -2331,11 +2331,21 @@ public class BrokerService implements Service {
this.passiveSlave = passiveSlave; this.passiveSlave = passiveSlave;
} }
/**
* override the Default IOException handler, called when persistence adapter
* has experiences File or JDBC I/O Exceptions
*
* @param ioExceptionHandler
*/
public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
ioExceptionHandler.setBrokerService(this); configureService(ioExceptionHandler);
this.ioExceptionHandler = ioExceptionHandler; this.ioExceptionHandler = ioExceptionHandler;
} }
public IOExceptionHandler getIoExceptionHandler() {
return ioExceptionHandler;
}
/** /**
* @return the schedulerSupport * @return the schedulerSupport
*/ */

View File

@ -88,7 +88,7 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
+ exceptionHandler.getClass().getCanonicalName() + exceptionHandler.getClass().getCanonicalName()
+ " threw this exception: " + " threw this exception: "
+ handlerException + handlerException
+ " while trying to handle this excpetion: " + " while trying to handle this exception: "
+ e, handlerException); + e, handlerException);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store.jdbc;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
@ -489,7 +490,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
} }
public TransactionContext getTransactionContext() throws IOException { public TransactionContext getTransactionContext() throws IOException {
TransactionContext answer = new TransactionContext(getDataSource()); TransactionContext answer = new TransactionContext(this);
if (transactionIsolation > 0) { if (transactionIsolation > 0) {
answer.setTransactionIsolation(transactionIsolation); answer.setTransactionIsolation(transactionIsolation);
} }
@ -619,7 +620,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
try { try {
brokerService.stop(); brokerService.stop();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failure occured while stopping broker"); LOG.warn("Failure occurred while stopping broker");
} }
} }
@ -642,7 +643,23 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void setDirectory(File dir) { public void setDirectory(File dir) {
} }
// interesting bit here is proof that DB is ok
public void checkpoint(boolean sync) throws IOException { public void checkpoint(boolean sync) throws IOException {
// by pass TransactionContext to avoid IO Exception handler
Connection connection = null;
try {
connection = getDataSource().getConnection();
} catch (SQLException e) {
LOG.debug("Could not get JDBC connection for checkpoint: " + e);
throw IOExceptionSupport.create(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (Throwable ignored) {
}
}
}
} }
public long size(){ public long size(){

View File

@ -38,6 +38,7 @@ public class TransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
private final DataSource dataSource; private final DataSource dataSource;
private final JDBCPersistenceAdapter persistenceAdapter;
private Connection connection; private Connection connection;
private boolean inTx; private boolean inTx;
private PreparedStatement addMessageStatement; private PreparedStatement addMessageStatement;
@ -46,8 +47,9 @@ public class TransactionContext {
// a cheap dirty level that we can live with // a cheap dirty level that we can live with
private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
public TransactionContext(DataSource dataSource) { public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
this.dataSource = dataSource; this.persistenceAdapter = persistenceAdapter;
this.dataSource = persistenceAdapter.getDataSource();
} }
public Connection getConnection() throws IOException { public Connection getConnection() throws IOException {
@ -60,7 +62,10 @@ public class TransactionContext {
} }
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
throw IOExceptionSupport.create(e); IOException ioe = IOExceptionSupport.create(e);
persistenceAdapter.getBrokerService().handleIOException(ioe);
throw ioe;
} }
try { try {

View File

@ -17,19 +17,30 @@
package org.apache.activemq.util; package org.apache.activemq.util;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class DefaultIOExceptionHandler implements IOExceptionHandler { /**
* @org.apache.xbean.XBean
*/
public class DefaultIOExceptionHandler implements IOExceptionHandler {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(DefaultIOExceptionHandler.class); .getLogger(DefaultIOExceptionHandler.class);
private BrokerService broker; private BrokerService broker;
private boolean ignoreAllErrors = false; private boolean ignoreAllErrors = false;
private boolean ignoreNoSpaceErrors = true; private boolean ignoreNoSpaceErrors = true;
private boolean ignoreSQLExceptions = true;
private boolean stopStartConnectors = false;
private String noSpaceMessage = "space"; private String noSpaceMessage = "space";
private String sqlExceptionMessage = ""; // match all
private long resumeCheckSleepPeriod = 5*1000;
private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
public void handle(IOException exception) { public void handle(IOException exception) {
if (ignoreAllErrors) { if (ignoreAllErrors) {
@ -48,13 +59,71 @@ public class DefaultIOExceptionHandler implements IOExceptionHandler {
} }
} }
if (ignoreSQLExceptions) {
Throwable cause = exception;
while (cause != null) {
if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage)) {
LOG.info("Ignoring SQLException, " + exception, cause);
return;
}
cause = cause.getCause();
}
}
if (stopStartConnectors) {
if (!stopStartInProgress.compareAndSet(false, true)) {
// we are already working on it
return;
}
LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
new Thread("stop transport connectors on IO exception") {
public void run() {
try {
ServiceStopper stopper = new ServiceStopper();
broker.stopAllConnectors(stopper);
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker connectors", e);
}
}
}.start();
// resume again
new Thread("restart transport connectors post IO exception") {
public void run() {
try {
while (isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
broker.startAllConnectors();
} catch (Exception e) {
LOG.warn("Failure occurred while restarting broker connectors", e);
} finally {
stopStartInProgress.compareAndSet(true, false);
}
}
private boolean isPersistenceAdapterDown() {
boolean checkpointSuccess = false;
try {
broker.getPersistenceAdapter().checkpoint(true);
checkpointSuccess = true;
} catch (Throwable ignored) {}
return !checkpointSuccess;
}
}.start();
return;
}
LOG.info("Stopping the broker due to IO exception, " + exception, exception); LOG.info("Stopping the broker due to IO exception, " + exception, exception);
new Thread() { new Thread("Stopping the broker due to IO exception") {
public void run() { public void run() {
try { try {
broker.stop(); broker.stop();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failure occured while stopping broker", e); LOG.warn("Failure occurred while stopping broker", e);
} }
} }
}.start(); }.start();
@ -88,4 +157,35 @@ public class DefaultIOExceptionHandler implements IOExceptionHandler {
this.noSpaceMessage = noSpaceMessage; this.noSpaceMessage = noSpaceMessage;
} }
public boolean isIgnoreSQLExceptions() {
return ignoreSQLExceptions;
}
public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
this.ignoreSQLExceptions = ignoreSQLExceptions;
}
public String getSqlExceptionMessage() {
return sqlExceptionMessage;
}
public void setSqlExceptionMessage(String sqlExceptionMessage) {
this.sqlExceptionMessage = sqlExceptionMessage;
}
public boolean isStopStartConnectors() {
return stopStartConnectors;
}
public void setStopStartConnectors(boolean stopStartConnectors) {
this.stopStartConnectors = stopStartConnectors;
}
public long getResumeCheckSleepPeriod() {
return resumeCheckSleepPeriod;
}
public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
}
} }

View File

@ -49,8 +49,8 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes
LOG.info("Created sendConnection: " + sendConnection); LOG.info("Created sendConnection: " + sendConnection);
LOG.info("Created receiveConnection: " + receiveConnection); LOG.info("Created receiveConnection: " + receiveConnection);
session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = createSendSession(sendConnection);
receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); receiveSession = createReceiveSession(receiveConnection);
LOG.info("Created sendSession: " + session); LOG.info("Created sendSession: " + session);
LOG.info("Created receiveSession: " + receiveSession); LOG.info("Created receiveSession: " + receiveSession);
@ -80,6 +80,14 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes
LOG.info("Started connections"); LOG.info("Started connections");
} }
protected Session createReceiveSession(Connection receiveConnection) throws Exception {
return receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected Session createSendSession(Connection sendConnection) throws Exception {
return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected Connection createReceiveConnection() throws Exception { protected Connection createReceiveConnection() throws Exception {
return createConnection(); return createConnection();
} }

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnectionsTest implements ExceptionListener {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueTest.class);
public boolean transactedSends = false;
public int failureCount = 25; // or 20 for even tx batch boundary
int inflightMessageCount = 0;
EmbeddedDataSource sharedDs;
BrokerService broker;
final CountDownLatch restartDBLatch = new CountDownLatch(1);
protected void setUp() throws Exception {
setAutoFail(true);
topic = false;
verbose = true;
// startup db
sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
broker = new BrokerService();
DefaultIOExceptionHandler handler = new DefaultIOExceptionHandler();
handler.setIgnoreSQLExceptions(false);
handler.setStopStartConnectors(true);
broker.setIoExceptionHandler(handler);
broker.addConnector("tcp://localhost:0");
broker.setUseJmx(false);
broker.setPersistent(true);
broker.setDeleteAllMessagesOnStartup(true);
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
persistenceAdapter.setDataSource(sharedDs);
persistenceAdapter.setUseDatabaseLock(false);
persistenceAdapter.setLockKeepAlivePeriod(500);
persistenceAdapter.setLockAcquireSleepInterval(500);
broker.setPersistenceAdapter(persistenceAdapter);
broker.start();
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
broker.stop();
}
protected Session createSendSession(Connection sendConnection) throws Exception {
if (transactedSends) {
return sendConnection.createSession(true, Session.SESSION_TRANSACTED);
} else {
return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory f =
new ActiveMQConnectionFactory("failover://" + broker.getTransportConnectors().get(0).getPublishableConnectString());
f.setExceptionListener(this);
return f;
}
@Override
protected void messageSent() throws Exception {
if (++inflightMessageCount == failureCount) {
LOG.info("STOPPING DB!@!!!!");
final EmbeddedDataSource ds = sharedDs;
ds.setShutdownDatabase("shutdown");
try {
ds.getConnection();
} catch (Exception ignored) {
}
LOG.info("DB STOPPED!@!!!!");
Thread dbRestartThread = new Thread("db-re-start-thread") {
public void run() {
LOG.info("Sleeping for 10 seconds before allowing db restart");
try {
restartDBLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
ds.setShutdownDatabase("false");
LOG.info("DB RESTARTED!@!!!!");
}
};
dbRestartThread.start();
}
}
protected void sendToProducer(MessageProducer producer,
Destination producerDestination, Message message) throws JMSException {
{
// do some retries as db failures filter back to the client until broker sees
// db lock failure and shuts down
boolean sent = false;
do {
try {
producer.send(producerDestination, message);
if (transactedSends && ((inflightMessageCount+1) %10 == 0 || (inflightMessageCount+1) >= messageCount)) {
LOG.info("committing on send: " + inflightMessageCount + " message: " + message);
session.commit();
}
sent = true;
} catch (JMSException e) {
LOG.info("Exception on producer send:", e);
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {
}
}
} while(!sent);
}
}
@Override
public void onException(JMSException exception) {
LOG.error("exception on connection: ", exception);
}
}