git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@834922 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-11-11 15:50:45 +00:00
parent fb7d8d82d6
commit b1a9130149
4 changed files with 53 additions and 24 deletions

View File

@ -732,7 +732,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
try { try {
brokerService.stop(); brokerService.stop();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failure occured while stopping broker"); LOG.warn("Failure occured while stopping broker", e);
} }
} }
}.start(); }.start();

View File

@ -17,6 +17,8 @@
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -37,7 +39,7 @@ import java.util.Set;
* @org.apache.xbean.XBean element="kahaDB" * @org.apache.xbean.XBean element="kahaDB"
* @version $Revision: 1.17 $ * @version $Revision: 1.17 $
*/ */
public class KahaDBPersistenceAdapter implements PersistenceAdapter { public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private KahaDBStore letter = new KahaDBStore(); private KahaDBStore letter = new KahaDBStore();
@ -364,4 +366,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter {
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
} }
public void setBrokerService(BrokerService brokerService) {
letter.setBrokerService(brokerService);
}
} }

View File

@ -36,6 +36,8 @@ import java.util.TreeSet;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
@ -75,8 +77,11 @@ import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet; import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller; import org.apache.kahadb.util.VariableMarshaller;
import org.springframework.core.enums.LetterCodedLabeledEnum;
public class MessageDatabase { public class MessageDatabase implements BrokerServiceAware {
private BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500")); public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@ -259,6 +264,9 @@ public class MessageDatabase {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread... // Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
stopBroker();
} }
} }
}; };
@ -575,26 +583,22 @@ public class MessageDatabase {
return journal.getNextLocation(null); return journal.getNextLocation(null);
} }
protected void checkpointCleanup(final boolean cleanup) { protected void checkpointCleanup(final boolean cleanup) throws IOException {
try { long start = System.currentTimeMillis();
long start = System.currentTimeMillis(); synchronized (indexMutex) {
synchronized (indexMutex) { if( !opened.get() ) {
if( !opened.get() ) { return;
return;
}
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
}
});
}
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: cleanup took "+(end-start));
} }
} catch (IOException e) { pageFile.tx().execute(new Transaction.Closure<IOException>() {
e.printStackTrace(); public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
}
});
} }
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: cleanup took "+(end-start));
}
} }
@ -623,7 +627,6 @@ public class MessageDatabase {
* durring a recovery process. * durring a recovery process.
*/ */
public Location store(JournalCommand data, boolean sync) throws IOException { public Location store(JournalCommand data, boolean sync) throws IOException {
int size = data.serializedSizeFramed(); int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@ -1530,4 +1533,20 @@ public class MessageDatabase {
public void setChecksumJournalFiles(boolean checksumJournalFiles) { public void setChecksumJournalFiles(boolean checksumJournalFiles) {
this.checksumJournalFiles = checksumJournalFiles; this.checksumJournalFiles = checksumJournalFiles;
} }
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
protected void stopBroker() {
new Thread() {
public void run() {
try {
brokerService.stop();
} catch (Exception e) {
LOG.warn("Failure occured while stopping broker", e);
}
}
}.start();
}
} }

View File

@ -158,7 +158,7 @@ class DataFileAppender {
* @throws * @throws
*/ */
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer. // Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE; int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@ -298,6 +298,7 @@ class DataFileAppender {
protected void processQueue() { protected void processQueue() {
DataFile dataFile = null; DataFile dataFile = null;
RandomAccessFile file = null; RandomAccessFile file = null;
WriteBatch wb = null;
try { try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize); DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@ -321,7 +322,7 @@ class DataFileAppender {
enqueueMutex.notify(); enqueueMutex.notify();
} }
WriteBatch wb = (WriteBatch)o; wb = (WriteBatch)o;
if (dataFile != wb.dataFile) { if (dataFile != wb.dataFile) {
if (file != null) { if (file != null) {
file.setLength(dataFile.getLength()); file.setLength(dataFile.getLength());
@ -403,6 +404,9 @@ class DataFileAppender {
wb.latch.countDown(); wb.latch.countDown();
} }
} catch (IOException e) { } catch (IOException e) {
if (wb != null) {
wb.latch.countDown();
}
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
firstAsyncException = e; firstAsyncException = e;
} }