Beef up TranslogTests with concurrent fatal exceptions test

Today we only test this when writing sequentially. Yet, in practice we mainly
write concurrently, this commit adds a test that tests that concurrent writes with
sudden fatal failure will not corrupt our translog.

Relates to #15420
This commit is contained in:
Simon Willnauer 2015-12-15 15:45:36 +01:00
parent 082632dcac
commit 34703a838d
2 changed files with 163 additions and 40 deletions

View File

@ -158,7 +158,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try {
if (translogGeneration != null) {
final Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
final Checkpoint checkpoint = readCheckpoint();
this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint);
if (recoveredTranslogs.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
@ -545,6 +545,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
ensureOpen();
return current.syncUpTo(location.translogLocation + location.size);
}
} catch (AlreadyClosedException | IOException ex) {
if (current.getTragicException() != null) {
try {
close();
} catch (Exception inner) {
ex.addSuppressed(inner);
}
}
throw ex;
}
return false;
}
@ -1433,4 +1442,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return current.getTragicException();
}
/** Reads and returns the current checkpoint */
final Checkpoint readCheckpoint() throws IOException {
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
}
}

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
@ -62,6 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import static org.hamcrest.Matchers.*;
@ -1242,11 +1244,11 @@ public class TranslogTests extends ESTestCase {
private final CountDownLatch downLatch;
private final int opsPerThread;
private final int threadId;
private final BlockingQueue<LocationOperation> writtenOperations;
private final Collection<LocationOperation> writtenOperations;
private final Throwable[] threadExceptions;
private final Translog translog;
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, Collection<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
this.translog = translog;
this.downLatch = downLatch;
this.opsPerThread = opsPerThread;
@ -1276,59 +1278,34 @@ public class TranslogTests extends ESTestCase {
throw new ElasticsearchException("not supported op type");
}
Translog.Location loc = translog.add(op);
Translog.Location loc = add(op);
writtenOperations.add(new LocationOperation(op, loc));
afterAdd();
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}
protected Translog.Location add(Translog.Operation op) throws IOException {
return translog.add(op);
}
protected void afterAdd() throws IOException {}
}
public void testFailFlush() throws IOException {
Path tempDir = createTempDir();
final AtomicBoolean simulateDiskFull = new AtomicBoolean();
final AtomicBoolean fail = new AtomicBoolean();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new FilterFileChannel(channel) {
@Override
public int write(ByteBuffer src) throws IOException {
if (simulateDiskFull.get()) {
if (src.limit() > 1) {
final int pos = src.position();
final int limit = src.limit();
src.limit(limit / 2);
super.write(src);
src.position(pos);
src.limit(limit);
throw new IOException("__FAKE__ no space left on device");
}
}
return super.write(src);
}
};
}
};
}
};
Translog translog = getFailableTranslog(fail, config);
List<Translog.Location> locations = new ArrayList<>();
int opsSynced = 0;
int opsAdded = 0;
boolean failed = false;
while(failed == false) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
opsAdded++;
translog.sync();
opsSynced++;
} catch (IOException ex) {
@ -1336,9 +1313,9 @@ public class TranslogTests extends ESTestCase {
assertFalse(translog.isOpen());
assertEquals("__FAKE__ no space left on device", ex.getMessage());
}
simulateDiskFull.set(randomBoolean());
fail.set(randomBoolean());
}
simulateDiskFull.set(false);
fail.set(false);
if (randomBoolean()) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
@ -1402,4 +1379,136 @@ public class TranslogTests extends ESTestCase {
}
}
}
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean(false);
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = getFailableTranslog(fail, config);
final int threadCount = randomIntBetween(1, 5);
Thread[] threads = new Thread[threadCount];
final Throwable[] threadExceptions = new Throwable[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
final CountDownLatch added = new CountDownLatch(randomIntBetween(10, 100));
List<LocationOperation> writtenOperations = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, threadExceptions) {
@Override
protected Translog.Location add(Translog.Operation op) throws IOException {
Translog.Location add = super.add(op);
added.countDown();
return add;
}
@Override
protected void afterAdd() throws IOException {
if (randomBoolean()) {
translog.sync();
}
}
};
threads[i].setDaemon(true);
threads[i].start();
}
downLatch.countDown();
added.await();
try (Translog.View view = translog.newView()) {
// this holds a reference to the current tlog channel such that it's not closed
// if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip
// otherwise our assertions here are off by one sometimes.
fail.set(true);
for (int i = 0; i < threadCount; i++) {
threads[i].join();
}
Collections.sort(writtenOperations, (a, b) -> a.location.compareTo(b.location));
assertFalse(translog.isOpen());
final Checkpoint checkpoint = Checkpoint.read(config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME));
Iterator<LocationOperation> iterator = writtenOperations.iterator();
while (iterator.hasNext()) {
LocationOperation next = iterator.next();
if (checkpoint.offset < (next.location.translogLocation + next.location.size)) {
// drop all that haven't been synced
iterator.remove();
}
}
config.setTranslogGeneration(translog.getGeneration());
try (Translog tlog = new Translog(config)) {
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
if (writtenOperations.size() != snapshot.estimatedTotalOperations()) {
for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null)
threadExceptions[i].printStackTrace();
}
}
assertEquals(writtenOperations.size(), snapshot.estimatedTotalOperations());
for (int i = 0; i < writtenOperations.size(); i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(next, writtenOperations.get(i).operation);
}
}
}
}
}
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
return new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new ThrowingFileChannel(fail, randomBoolean(), channel);
}
};
}
};
}
public static class ThrowingFileChannel extends FilterFileChannel {
private final AtomicBoolean fail;
private final boolean partialWrite;
public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) {
super(delegate);
this.fail = fail;
this.partialWrite = partialWrite;
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int write(ByteBuffer src, long position) throws IOException {
throw new UnsupportedOperationException();
}
public int write(ByteBuffer src) throws IOException {
if (fail.get()) {
if (partialWrite) {
if (src.limit() > 1) {
final int pos = src.position();
final int limit = src.limit();
src.limit(limit / 2);
super.write(src);
src.position(pos);
src.limit(limit);
throw new IOException("__FAKE__ no space left on device");
}
}
throw new MockDirectoryWrapper.FakeIOException();
}
return super.write(src);
}
}
}