https://issues.apache.org/activemq/browse/AMQ-2042 - enable kahadb to recover from 'no space available'

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@880792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-11-16 15:22:19 +00:00
parent c63eb5cc23
commit a19e27a9fc
3 changed files with 88 additions and 60 deletions

View File

@ -232,6 +232,41 @@ public class MessageDatabase implements BrokerServiceAware {
}
}
private void startCheckpoint() {
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
public void run() {
try {
long lastCleanup = System.currentTimeMillis();
long lastCheckpoint = System.currentTimeMillis();
// Sleep for a short time so we can periodically check
// to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval, 500);
while (opened.get()) {
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) {
checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if( now - lastCheckpoint >= checkpointInterval ) {
checkpointCleanup(false);
lastCheckpoint = now;
}
}
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
}
}
};
checkpointThread.setDaemon(true);
checkpointThread.start();
}
/**
* @throws IOException
*/
@ -241,37 +276,7 @@ public class MessageDatabase implements BrokerServiceAware {
loadPageFile();
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
public void run() {
try {
long lastCleanup = System.currentTimeMillis();
long lastCheckpoint = System.currentTimeMillis();
// Sleep for a short time so we can periodically check
// to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval, 500);
while (opened.get()) {
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) {
checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if( now - lastCheckpoint >= checkpointInterval ) {
checkpointCleanup(false);
lastCheckpoint = now;
}
}
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
}
}
};
checkpointThread.setDaemon(true);
checkpointThread.start();
startCheckpoint();
recover();
}
}
@ -621,31 +626,40 @@ public class MessageDatabase implements BrokerServiceAware {
}
/**
* All updated are are funneled through this method. The updates a converted
* All updated are are funneled through this method. The updates are converted
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
* durring a recovery process.
* during a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
long start = System.currentTimeMillis();
Location location = journal.write(os.toByteSequence(), sync);
long start2 = System.currentTimeMillis();
process(data, location);
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
try {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
long start = System.currentTimeMillis();
Location location = journal.write(os.toByteSequence(), sync);
long start2 = System.currentTimeMillis();
process(data, location);
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
}
synchronized (indexMutex) {
metadata.lastUpdate = location;
}
if (!checkpointThread.isAlive()) {
LOG.info("KahaDB: Recovering checkpoint thread after exception");
startCheckpoint();
}
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);
brokerService.handleIOException(ioe);
throw ioe;
}
synchronized (indexMutex) {
metadata.lastUpdate = location;
}
return location;
}
/**

View File

@ -219,10 +219,7 @@ class DataFileAppender {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
if (firstAsyncException != null) {
throw firstAsyncException;
}
if (!running) {
running = true;
thread = new Thread() {
@ -234,6 +231,11 @@ class DataFileAppender {
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
firstAsyncException = null;
}
if (firstAsyncException != null) {
throw firstAsyncException;
}
while ( true ) {
@ -430,6 +432,7 @@ class DataFileAppender {
} catch (Throwable ignore) {
}
shutdownDone.countDown();
running = false;
}
}

View File

@ -44,6 +44,7 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
@ -165,8 +166,8 @@ public class PageFile {
}
void begin() {
diskBound = current;
current = null;
diskBound = current;
current = null;
}
/**
@ -937,12 +938,18 @@ public class PageFile {
// If there is not enough to write, wait for a notification...
batch = new ArrayList<PageWrite>(writes.size());
// build a write batch from the current write cache.
for (PageWrite write : writes.values()) {
// build a write batch from the current write cache.
Iterator<Long> it = writes.keySet().iterator();
while (it.hasNext()) {
Long key = it.next();
PageWrite write = writes.get(key);
batch.add(write);
// Move the current write to the diskBound write, this lets folks update the
// page again without blocking for this write.
write.begin();
if (write.diskBound == null) {
batch.remove(write);
}
}
// Grab on to the existing checkpoint latch cause once we do this write we can
@ -959,7 +966,11 @@ public class PageFile {
// our write batches are going to much larger.
Checksum checksum = new Adler32();
for (PageWrite w : batch) {
checksum.update(w.diskBound, 0, pageSize);
try {
checksum.update(w.diskBound, 0, pageSize);
} catch (Throwable t) {
throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
}
}
// Can we shrink the recovery buffer??