From b1a9130149ca0f60b9abd8b4fc1cda468727b1d5 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 11 Nov 2009 15:50:45 +0000 Subject: [PATCH] http://issues.apache.org/activemq/browse/AMQ-2042 - shutdown if kahadb cannot access disk git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@834922 13f79535-47bb-0310-9956-ffa450edef68 --- .../journal/JournalPersistenceAdapter.java | 2 +- .../kahadb/KahaDBPersistenceAdapter.java | 8 ++- .../store/kahadb/MessageDatabase.java | 59 ++++++++++++------- .../kahadb/journal/DataFileAppender.java | 8 ++- 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 8ae5fb039d..5ffb32663a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -732,7 +732,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve try { brokerService.stop(); } catch (Exception e) { - LOG.warn("Failure occured while stopping broker"); + LOG.warn("Failure occured while stopping broker", e); } } }.start(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 7d6b735c5c..38c1b3a71b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -17,6 +17,8 @@ package org.apache.activemq.store.kahadb; import org.apache.activeio.journal.Journal; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -37,7 +39,7 @@ import java.util.Set; * @org.apache.xbean.XBean element="kahaDB" * @version $Revision: 1.17 $ */ -public class KahaDBPersistenceAdapter implements PersistenceAdapter { +public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { private KahaDBStore letter = new KahaDBStore(); @@ -364,4 +366,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter { public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); } + + public void setBrokerService(BrokerService brokerService) { + letter.setBrokerService(brokerService); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 5ccf55b174..4abbe60be4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -36,6 +36,8 @@ import java.util.TreeSet; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.SubscriptionInfo; @@ -75,8 +77,11 @@ import org.apache.kahadb.util.Sequence; import org.apache.kahadb.util.SequenceSet; import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.VariableMarshaller; +import org.springframework.core.enums.LetterCodedLabeledEnum; -public class MessageDatabase { +public class MessageDatabase implements BrokerServiceAware { + + private BrokerService brokerService; public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500")); @@ -259,6 +264,9 @@ public class MessageDatabase { } } catch (InterruptedException e) { // Looks like someone really wants us to exit this thread... + } catch (IOException ioe) { + LOG.error("Checkpoint failed", ioe); + stopBroker(); } } }; @@ -575,26 +583,22 @@ public class MessageDatabase { return journal.getNextLocation(null); } - protected void checkpointCleanup(final boolean cleanup) { - try { - long start = System.currentTimeMillis(); - synchronized (indexMutex) { - if( !opened.get() ) { - return; - } - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, cleanup); - } - }); - } - long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { - LOG.info("Slow KahaDB access: cleanup took "+(end-start)); + protected void checkpointCleanup(final boolean cleanup) throws IOException { + long start = System.currentTimeMillis(); + synchronized (indexMutex) { + if( !opened.get() ) { + return; } - } catch (IOException e) { - e.printStackTrace(); + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, cleanup); + } + }); } + long end = System.currentTimeMillis(); + if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { + LOG.info("Slow KahaDB access: cleanup took "+(end-start)); + } } @@ -623,7 +627,6 @@ public class MessageDatabase { * durring a recovery process. */ public Location store(JournalCommand data, boolean sync) throws IOException { - int size = data.serializedSizeFramed(); DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); @@ -1530,4 +1533,20 @@ public class MessageDatabase { public void setChecksumJournalFiles(boolean checksumJournalFiles) { this.checksumJournalFiles = checksumJournalFiles; } + + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + protected void stopBroker() { + new Thread() { + public void run() { + try { + brokerService.stop(); + } catch (Exception e) { + LOG.warn("Failure occured while stopping broker", e); + } + } + }.start(); + } } diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java b/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java index c1d9e5b82c..b1350f1d34 100644 --- a/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java +++ b/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java @@ -158,7 +158,7 @@ class DataFileAppender { * @throws */ public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { - + // Write the packet our internal buffer. int size = data.getLength() + Journal.RECORD_HEAD_SPACE; @@ -298,6 +298,7 @@ class DataFileAppender { protected void processQueue() { DataFile dataFile = null; RandomAccessFile file = null; + WriteBatch wb = null; try { DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize); @@ -321,7 +322,7 @@ class DataFileAppender { enqueueMutex.notify(); } - WriteBatch wb = (WriteBatch)o; + wb = (WriteBatch)o; if (dataFile != wb.dataFile) { if (file != null) { file.setLength(dataFile.getLength()); @@ -403,6 +404,9 @@ class DataFileAppender { wb.latch.countDown(); } } catch (IOException e) { + if (wb != null) { + wb.latch.countDown(); + } synchronized (enqueueMutex) { firstAsyncException = e; }