diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 69a27f9bee..112d36bc96 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -83,6 +83,8 @@ import org.apache.commons.logging.LogFactory; */ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { + private BrokerService brokerService; + protected static final Scheduler scheduler = Scheduler.getInstance(); private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class); @@ -599,7 +601,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve */ public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { if (started.get()) { - return journal.write(toPacket(wireFormat.marshal(command)), sync); + try { + return journal.write(toPacket(wireFormat.marshal(command)), sync); + } catch (IOException ioe) { + LOG.error("Cannot write to the journal", ioe); + stopBroker(); + throw ioe; + } } throw new IOException("closed"); } @@ -693,10 +701,23 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; PersistenceAdapter pa = getLongTermPersistence(); if( pa instanceof BrokerServiceAware ) { ((BrokerServiceAware)pa).setBrokerService(brokerService); } } + + protected void stopBroker() { + new Thread() { + public void run() { + try { + brokerService.stop(); + } catch (Exception e) { + LOG.warn("Failure occured while stopping broker"); + } + } + }.start(); + } }