[AMQ-6625] ensure kahadb stops operation on the first IOException to facilitate auto recovery from partial writes

This commit is contained in:
gtully 2017-03-13 13:49:41 +00:00
parent 0707031334
commit c5a8b2c8b1
7 changed files with 124 additions and 4 deletions

View File

@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
@Override
public void handle(IOException exception) {
if (ignoreAllErrors) {
allowIOResumption();
LOG.info("Ignoring IO exception, " + exception, exception);
return;
}
@ -62,6 +63,7 @@ import org.slf4j.LoggerFactory;
String message = cause.getMessage();
if (message != null && message.contains(noSpaceMessage)) {
LOG.info("Ignoring no space left exception, " + exception, exception);
allowIOResumption();
return;
}
cause = cause.getCause();
@ -106,6 +108,7 @@ import org.slf4j.LoggerFactory;
@Override
public void run() {
try {
allowIOResumption();
while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
@ -162,6 +165,9 @@ import org.slf4j.LoggerFactory;
throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
}
protected void allowIOResumption() {
}
private void stopBroker(Exception exception) {
LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
new Thread("IOExceptionHandler: stopping " + broker) {

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @org.apache.xbean.XBean
*/
public class KahaDBIOExceptionHandler extends DefaultIOExceptionHandler {
private static final Logger LOG = LoggerFactory
.getLogger(KahaDBIOExceptionHandler.class);
protected void allowIOResumption() {
try {
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
kahaDBPersistenceAdapter.getStore().allowIOResumption();
}
} catch (IOException e) {
LOG.warn("Failed to allow IO resumption", e);
}
}
}

View File

@ -302,6 +302,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
unload();
}
public void allowIOResumption() {
if (pageFile != null) {
pageFile.allowIOResumption();
}
if (journal != null) {
journal.allowIOResumption();
}
}
private void loadPageFile() throws IOException {
this.indexLock.writeLock().lock();
try {

View File

@ -353,6 +353,7 @@ class DataFileAppender implements FileAppender {
} catch (Throwable error) {
logger.warn("Journal failed while writing at: " + wb.dataFile.getDataFileId() + ":" + wb.offset, error);
synchronized (enqueueMutex) {
shutdown = true;
running = false;
signalError(wb, error);
if (nextWriteBatch != null) {

View File

@ -109,6 +109,13 @@ public class Journal {
return accessorPool;
}
public void allowIOResumption() {
if (appender instanceof DataFileAppender) {
DataFileAppender dataFileAppender = (DataFileAppender)appender;
dataFileAppender.shutdown = false;
}
}
public enum PreallocationStrategy {
SPARSE_FILE,
OS_KERNEL_COPY,

View File

@ -493,6 +493,10 @@ public class PageFile {
return loaded.get();
}
public void allowIOResumption() {
loaded.set(true);
}
/**
* Flush and sync all write buffers to disk.
*
@ -1101,6 +1105,13 @@ public class PageFile {
if (enableDiskSyncs) {
writeFile.sync();
}
} catch (IOException ioError) {
LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError);
// any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates
// to ensure disk image is self consistent
loaded.set(false);
throw ioError;
} finally {
synchronized (writes) {
for (PageWrite w : batch) {

View File

@ -80,6 +80,7 @@ public class DataFileAppenderNoSpaceNoBatchTest {
underTest.storeItem(byteSequence, (byte) 1, true);
fail("expect no space");
} catch (IOException expected) {
underTest.shutdown = false;
}
}
@ -88,7 +89,45 @@ public class DataFileAppenderNoSpaceNoBatchTest {
}
@Test
public void testSingleNoSpaceNextWriteSameBatch() throws Exception {
final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>());
final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) {
public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException {
return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") {
public void seek(long pos) throws IOException {
seekPositions.add(pos);
}
public void write(byte[] bytes, int offset, int len) throws IOException {
throw new IOException("No space on device");
}
};
};
};
underTest = new DataFileAppender(new Journal() {
@Override
public DataFile getCurrentDataFile(int capacity) throws IOException {
return currentDataFile;
};
});
final ByteSequence byteSequence = new ByteSequence(new byte[4*1024]);
for (int i=0; i<2; i++) {
try {
underTest.storeItem(byteSequence, (byte) 1, true);
fail("expect no space");
} catch (IOException expected) {
}
}
assertEquals("got 1 seeks: " + seekPositions, 1, seekPositions.size());
}
@Test(timeout = 10000)
public void testNoSpaceNextWriteSameBatchAsync() throws Exception {
final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>());
@ -129,9 +168,13 @@ public class DataFileAppenderNoSpaceNoBatchTest {
ConcurrentLinkedQueue<Location> locations = new ConcurrentLinkedQueue<Location>();
HashSet<CountDownLatch> latches = new HashSet<CountDownLatch>();
for (int i = 0; i <= 20; i++) {
Location location = underTest.storeItem(byteSequence, (byte) 1, false);
locations.add(location);
latches.add(location.getLatch());
try {
Location location = underTest.storeItem(byteSequence, (byte) 1, false);
locations.add(location);
latches.add(location.getLatch());
} catch (IOException expected) {
underTest.shutdown = false;
}
}
for (CountDownLatch latch: latches) {