[TEST] Test that translog can recover after random IOException
This commit adds a new test that can throw an IOException at any point in time and ensures that all previously synced documents can be successfully recovered after hitting an excepiton. Relates to #15788
This commit is contained in:
parent
132df10342
commit
e7f9d685f1
|
@ -51,6 +51,7 @@ import org.elasticsearch.index.shard.IndexShardComponent;
|
|||
import java.io.Closeable;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
|
@ -440,7 +441,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (config.isSyncOnEachOperation()) {
|
||||
current.sync();
|
||||
}
|
||||
assert current.assertBytesAtLocation(location, bytes);
|
||||
assert assertBytesAtLocation(location, bytes);
|
||||
return location;
|
||||
}
|
||||
} catch (AlreadyClosedException | IOException ex) {
|
||||
|
@ -454,6 +455,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
|
||||
// tests can override this
|
||||
ByteBuffer buffer = ByteBuffer.allocate(location.size);
|
||||
current.readBytes(buffer, location.translogLocation);
|
||||
return new BytesArray(buffer.array()).equals(expectedBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
|
||||
* Snapshots are fixed in time and will not be updated with future operations.
|
||||
|
|
|
@ -218,11 +218,6 @@ public class TranslogWriter extends TranslogReader {
|
|||
}
|
||||
}
|
||||
|
||||
boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(location.size);
|
||||
readBytes(buffer, location.translogLocation);
|
||||
return new BytesArray(buffer.array()).equals(expectedBytes);
|
||||
}
|
||||
|
||||
private long getWrittenOffset() throws IOException {
|
||||
return channelReference.getChannel().position();
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.lucene.util.LuceneTestCase;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -1316,7 +1317,7 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
public void testFailFlush() throws IOException {
|
||||
Path tempDir = createTempDir();
|
||||
final AtomicBoolean fail = new AtomicBoolean();
|
||||
final FailSwitch fail = new FailSwitch();
|
||||
TranslogConfig config = getTranslogConfig(tempDir);
|
||||
Translog translog = getFailableTranslog(fail, config);
|
||||
|
||||
|
@ -1336,9 +1337,13 @@ public class TranslogTests extends ESTestCase {
|
|||
assertFalse(translog.isOpen());
|
||||
assertEquals("__FAKE__ no space left on device", ex.getMessage());
|
||||
}
|
||||
fail.set(randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
fail.failAlways();
|
||||
} else {
|
||||
fail.failNever();
|
||||
}
|
||||
}
|
||||
fail.set(false);
|
||||
fail.failNever();
|
||||
if (randomBoolean()) {
|
||||
try {
|
||||
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
|
||||
|
@ -1409,13 +1414,13 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
public void testTragicEventCanBeAnyException() throws IOException {
|
||||
Path tempDir = createTempDir();
|
||||
final AtomicBoolean fail = new AtomicBoolean();
|
||||
final FailSwitch fail = new FailSwitch();
|
||||
TranslogConfig config = getTranslogConfig(tempDir);
|
||||
assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
|
||||
Translog translog = getFailableTranslog(fail, config, false, true);
|
||||
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
|
||||
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
||||
fail.set(true);
|
||||
fail.failAlways();
|
||||
try {
|
||||
Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
||||
if (randomBoolean()) {
|
||||
|
@ -1436,7 +1441,7 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
|
||||
Path tempDir = createTempDir();
|
||||
final AtomicBoolean fail = new AtomicBoolean(false);
|
||||
final FailSwitch fail = new FailSwitch();
|
||||
|
||||
TranslogConfig config = getTranslogConfig(tempDir);
|
||||
Translog translog = getFailableTranslog(fail, config);
|
||||
|
@ -1473,7 +1478,7 @@ public class TranslogTests extends ESTestCase {
|
|||
// this holds a reference to the current tlog channel such that it's not closed
|
||||
// if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip
|
||||
// otherwise our assertions here are off by one sometimes.
|
||||
fail.set(true);
|
||||
fail.failAlways();
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
@ -1525,11 +1530,40 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
|
||||
private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
|
||||
return getFailableTranslog(fail, config, randomBoolean(), false);
|
||||
}
|
||||
|
||||
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
|
||||
private static class FailSwitch {
|
||||
private volatile int failRate;
|
||||
private volatile boolean onceFailedFailAlways = false;
|
||||
public boolean fail() {
|
||||
boolean fail = randomIntBetween(1, 100) <= failRate;
|
||||
if (fail && onceFailedFailAlways) {
|
||||
failAlways();
|
||||
}
|
||||
return fail;
|
||||
}
|
||||
|
||||
public void failNever() {
|
||||
failRate = 0;
|
||||
}
|
||||
|
||||
public void failAlways() {
|
||||
failRate = 100;
|
||||
}
|
||||
|
||||
public void failRandomly() {
|
||||
failRate = randomIntBetween(1, 100);
|
||||
}
|
||||
|
||||
public void onceFailedFailAlways() {
|
||||
onceFailedFailAlways = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
|
||||
return new Translog(config) {
|
||||
@Override
|
||||
TranslogWriter.ChannelFactory getChannelFactory() {
|
||||
|
@ -1539,23 +1573,56 @@ public class TranslogTests extends ESTestCase {
|
|||
@Override
|
||||
public FileChannel open(Path file) throws IOException {
|
||||
FileChannel channel = factory.open(file);
|
||||
return new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
|
||||
boolean success = false;
|
||||
try {
|
||||
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
|
||||
success = true;
|
||||
return throwingFileChannel;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException {
|
||||
return true; // we don't wanna fail in the assert
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static class ThrowingFileChannel extends FilterFileChannel {
|
||||
private final AtomicBoolean fail;
|
||||
private final FailSwitch fail;
|
||||
private final boolean partialWrite;
|
||||
private final boolean throwUnknownException;
|
||||
|
||||
public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) {
|
||||
public ThrowingFileChannel(FailSwitch fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) throws MockDirectoryWrapper.FakeIOException {
|
||||
super(delegate);
|
||||
this.fail = fail;
|
||||
this.partialWrite = partialWrite;
|
||||
this.throwUnknownException = throwUnknownException;
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
return super.read(dst);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
return super.read(dsts, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1570,7 +1637,7 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
|
||||
public int write(ByteBuffer src) throws IOException {
|
||||
if (fail.get()) {
|
||||
if (fail.fail()) {
|
||||
if (partialWrite) {
|
||||
if (src.hasRemaining()) {
|
||||
final int pos = src.position();
|
||||
|
@ -1590,6 +1657,22 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
return super.write(src);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void force(boolean metaData) throws IOException {
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
super.force(metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long position() throws IOException {
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
return super.position();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class UnknownException extends RuntimeException {
|
||||
|
@ -1711,4 +1794,78 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test adds operations to the translog which might randomly throw an IOException. The only thing this test verifies is
|
||||
* that we can, after we hit an exception, open and recover the translog successfully and retrieve all successfully synced operations
|
||||
* from the transaction log.
|
||||
*/
|
||||
public void testWithRandomException() throws IOException {
|
||||
final int runs = randomIntBetween(5, 10);
|
||||
for (int run = 0; run < runs; run++) {
|
||||
Path tempDir = createTempDir();
|
||||
final FailSwitch fail = new FailSwitch();
|
||||
fail.failRandomly();
|
||||
TranslogConfig config = getTranslogConfig(tempDir);
|
||||
final int numOps = randomIntBetween(100, 200);
|
||||
List<String> syncedDocs = new ArrayList<>();
|
||||
List<String> unsynced = new ArrayList<>();
|
||||
if (randomBoolean()) {
|
||||
fail.onceFailedFailAlways();
|
||||
}
|
||||
try {
|
||||
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false);
|
||||
try {
|
||||
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
|
||||
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
|
||||
String doc = lineFileDocs.nextDoc().toString();
|
||||
failableTLog.add(new Translog.Index("test", "" + opsAdded, doc.getBytes(Charset.forName("UTF-8"))));
|
||||
unsynced.add(doc);
|
||||
if (randomBoolean()) {
|
||||
failableTLog.sync();
|
||||
syncedDocs.addAll(unsynced);
|
||||
unsynced.clear();
|
||||
}
|
||||
if (randomFloat() < 0.1) {
|
||||
failableTLog.sync(); // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails
|
||||
syncedDocs.addAll(unsynced);
|
||||
unsynced.clear();
|
||||
if (randomBoolean()) {
|
||||
failableTLog.prepareCommit();
|
||||
}
|
||||
failableTLog.commit();
|
||||
syncedDocs.clear();
|
||||
}
|
||||
}
|
||||
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
||||
// fair enough
|
||||
} catch (IOException ex) {
|
||||
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
|
||||
} finally {
|
||||
config.setTranslogGeneration(failableTLog.getGeneration());
|
||||
IOUtils.closeWhileHandlingException(failableTLog);
|
||||
}
|
||||
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
||||
// failed - that's ok, we didn't even create it
|
||||
}
|
||||
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
|
||||
if (randomBoolean()) {
|
||||
try {
|
||||
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false));
|
||||
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
||||
// failed - that's ok, we didn't even create it
|
||||
}
|
||||
}
|
||||
|
||||
try (Translog translog = new Translog(config)) {
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||
assertEquals(syncedDocs.size(), snapshot.estimatedTotalOperations());
|
||||
for (int i = 0; i < syncedDocs.size(); i++) {
|
||||
Translog.Operation next = snapshot.next();
|
||||
assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8());
|
||||
assertNotNull("operation " + i + " must be non-null", next);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue