Beef up Translog testing with random channel exceptions (#18997)

Today we only throw random exceptions on the translog writer. This commit
extends it to also throw exceptions during checkpoint writing etc to test
if the correct flags are provided to open method etc.
This commit is contained in:
Simon Willnauer 2016-06-21 21:25:01 +02:00 committed by GitHub
parent c7710daed0
commit c80e837606
5 changed files with 103 additions and 38 deletions

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
/**
* only for testing until we have a disk-full FileSystem
*/
@FunctionalInterface
interface ChannelFactory {
default FileChannel open(Path path) throws IOException {
return open(path, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}
FileChannel open(Path path, OpenOption... options) throws IOException;
}

View File

@ -82,8 +82,8 @@ class Checkpoint {
}
}
public static void write(Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
try (FileChannel channel = FileChannel.open(checkpointFile, options)) {
public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
try (FileChannel channel = factory.open(checkpointFile, options)) {
checkpoint.write(channel);
channel.force(false);
}

View File

@ -200,7 +200,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Files.createDirectories(location);
final long generation = 1;
Checkpoint checkpoint = new Checkpoint(0, 0, generation);
Checkpoint.write(location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint.write(getChannelFactory(), location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
current = createWriter(generation);
this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
@ -1313,8 +1313,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return outstandingViews.size();
}
TranslogWriter.ChannelFactory getChannelFactory() {
return TranslogWriter.ChannelFactory.DEFAULT;
ChannelFactory getChannelFactory() {
return FileChannel::open;
}
/**

View File

@ -49,6 +49,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
public static final int VERSION = VERSION_CHECKPOINTS;
private final ShardId shardId;
private final ChannelFactory channelFactory;
/* the offset in bytes that was written when the file was last synced*/
private volatile long lastSyncedOffset;
/* the number of translog operations written to this file */
@ -64,9 +65,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// lock order synchronized(syncLock) -> synchronized(this)
private final Object syncLock = new Object();
public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
public TranslogWriter(ChannelFactory channelFactory, ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
super(generation, channel, path, channel.position());
this.shardId = shardId;
this.channelFactory = channelFactory;
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedOffset = channel.position();
totalOffset = lastSyncedOffset;
@ -92,8 +94,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
out.writeInt(ref.length);
out.writeBytes(ref.bytes, ref.offset, ref.length);
channel.force(true);
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, channel, file, bufferSize);
writeCheckpoint(channelFactory, headerLength, 0, file.getParent(), fileGeneration);
final TranslogWriter writer = new TranslogWriter(channelFactory, shardId, fileGeneration, channel, file, bufferSize);
return writer;
} catch (Throwable throwable) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
@ -254,7 +256,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// we can continue writing to the buffer etc.
try {
channel.force(false);
writeCheckpoint(offsetToSync, opsCounter, path.getParent(), generation, StandardOpenOption.WRITE);
writeCheckpoint(channelFactory, offsetToSync, opsCounter, path.getParent(), generation);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
@ -286,20 +288,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
}
private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException {
private static void writeCheckpoint(ChannelFactory channelFactory, long syncPosition, int numOperations, Path translogFile, long generation) throws IOException {
final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
Checkpoint.write(checkpointFile, checkpoint, options);
}
static class ChannelFactory {
static final ChannelFactory DEFAULT = new ChannelFactory();
// only for testing until we have a disk-full FileSystem
public FileChannel open(Path file) throws IOException {
return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}
Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE);
}
protected final void ensureOpen() {

View File

@ -60,7 +60,9 @@ import java.nio.charset.Charset;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
@ -1125,13 +1127,13 @@ public class TranslogTests extends ESTestCase {
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0, 0, 0);
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try (Translog translog = new Translog(config, translogGeneration)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
}
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
@ -1564,22 +1566,20 @@ public class TranslogTests extends ESTestCase {
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException {
return new Translog(config, generation) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
ChannelFactory getChannelFactory() {
final ChannelFactory factory = super.getChannelFactory();
return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
boolean success = false;
try {
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
return (file, openOption) -> {
FileChannel channel = factory.open(file, openOption);
boolean success = false;
try {
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the 20bytes are written as an atomic operation
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : paritalWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
}
};
@ -1840,11 +1840,18 @@ public class TranslogTests extends ESTestCase {
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
} finally {
Checkpoint checkpoint = failableTLog.readCheckpoint();
if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) {
syncedDocs.addAll(unsynced); // failed in fsync but got fully written
unsynced.clear();
}
generation = failableTLog.getGeneration();
IOUtils.closeWhileHandlingException(failableTLog);
}
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
}
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
if (randomBoolean()) {
@ -1852,9 +1859,12 @@ public class TranslogTests extends ESTestCase {
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generation));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
}
}
fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
try (Translog translog = new Translog(config, generation)) {
Translog.Snapshot snapshot = translog.newSnapshot();
assertEquals(syncedDocs.size(), snapshot.totalOperations());
@ -1866,4 +1876,30 @@ public class TranslogTests extends ESTestCase {
}
}
}
public void testCheckpointOnDiskFull() throws IOException {
Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong());
Path tempDir = createTempDir();
Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong());
try {
Checkpoint.write((p, o) -> {
if (randomBoolean()) {
throw new MockDirectoryWrapper.FakeIOException();
}
FileChannel open = FileChannel.open(p, o);
FailSwitch failSwitch = new FailSwitch();
failSwitch.failNever(); // don't fail in the ctor
ThrowingFileChannel channel = new ThrowingFileChannel(failSwitch, false, false, open);
failSwitch.failAlways();
return channel;
}, tempDir.resolve("foo.cpk"), checkpoint2, StandardOpenOption.WRITE);
fail("should have failed earlier");
} catch (MockDirectoryWrapper.FakeIOException ex) {
//fine
}
Checkpoint read = Checkpoint.read(tempDir.resolve("foo.cpk"));
assertEquals(read, checkpoint);
}
}