Merge pull request #15535 from s1monw/fail_on_any_tragic_event
Check for tragic event on all kinds of exceptions not only ACE and IOException
This commit is contained in:
commit
ca7a4f9ee3
|
@ -424,6 +424,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
closeOnTragicEvent(ex);
|
closeOnTragicEvent(ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
closeOnTragicEvent(e);
|
||||||
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
|
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
|
||||||
} finally {
|
} finally {
|
||||||
Releasables.close(out.bytes());
|
Releasables.close(out.bytes());
|
||||||
|
@ -500,7 +501,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
if (closed.get() == false) {
|
if (closed.get() == false) {
|
||||||
current.sync();
|
current.sync();
|
||||||
}
|
}
|
||||||
} catch (AlreadyClosedException | IOException ex) {
|
} catch (Throwable ex) {
|
||||||
closeOnTragicEvent(ex);
|
closeOnTragicEvent(ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
|
@ -533,7 +534,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
return current.syncUpTo(location.translogLocation + location.size);
|
return current.syncUpTo(location.translogLocation + location.size);
|
||||||
}
|
}
|
||||||
} catch (AlreadyClosedException | IOException ex) {
|
} catch (Throwable ex) {
|
||||||
closeOnTragicEvent(ex);
|
closeOnTragicEvent(ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Predicate;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
|
@ -1387,6 +1386,35 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testTragicEventCanBeAnyException() throws IOException {
|
||||||
|
Path tempDir = createTempDir();
|
||||||
|
final AtomicBoolean fail = new AtomicBoolean();
|
||||||
|
TranslogConfig config = getTranslogConfig(tempDir);
|
||||||
|
assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation());
|
||||||
|
Translog translog = getFailableTranslog(fail, config, false, true);
|
||||||
|
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
|
||||||
|
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
||||||
|
fail.set(true);
|
||||||
|
try {
|
||||||
|
Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
||||||
|
if (config.getType() == TranslogWriter.Type.BUFFERED) { // the buffered case will fail on the add if we exceed the buffer or will fail on the flush once we sync
|
||||||
|
if (randomBoolean()) {
|
||||||
|
translog.ensureSynced(location);
|
||||||
|
} else {
|
||||||
|
translog.sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//TODO once we have a mock FS that can simulate we can also fail on plain sync
|
||||||
|
fail("WTF");
|
||||||
|
} catch (UnknownException ex) {
|
||||||
|
// w00t
|
||||||
|
} catch (TranslogException ex) {
|
||||||
|
assertTrue(ex.getCause() instanceof UnknownException);
|
||||||
|
}
|
||||||
|
assertFalse(translog.isOpen());
|
||||||
|
assertTrue(translog.getTragicException() instanceof UnknownException);
|
||||||
|
}
|
||||||
|
|
||||||
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
|
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
|
||||||
Path tempDir = createTempDir();
|
Path tempDir = createTempDir();
|
||||||
final AtomicBoolean fail = new AtomicBoolean(false);
|
final AtomicBoolean fail = new AtomicBoolean(false);
|
||||||
|
@ -1432,9 +1460,9 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
boolean atLeastOneFailed = false;
|
boolean atLeastOneFailed = false;
|
||||||
for (Throwable ex : threadExceptions) {
|
for (Throwable ex : threadExceptions) {
|
||||||
|
assertTrue(ex.toString(), ex instanceof IOException || ex instanceof AlreadyClosedException);
|
||||||
if (ex != null) {
|
if (ex != null) {
|
||||||
atLeastOneFailed = true;
|
atLeastOneFailed = true;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (atLeastOneFailed == false) {
|
if (atLeastOneFailed == false) {
|
||||||
|
@ -1477,8 +1505,11 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
|
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
|
||||||
|
return getFailableTranslog(fail, config, randomBoolean(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
|
||||||
return new Translog(config) {
|
return new Translog(config) {
|
||||||
@Override
|
@Override
|
||||||
TranslogWriter.ChannelFactory getChannelFactory() {
|
TranslogWriter.ChannelFactory getChannelFactory() {
|
||||||
|
@ -1488,7 +1519,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
@Override
|
@Override
|
||||||
public FileChannel open(Path file) throws IOException {
|
public FileChannel open(Path file) throws IOException {
|
||||||
FileChannel channel = factory.open(file);
|
FileChannel channel = factory.open(file);
|
||||||
return new ThrowingFileChannel(fail, randomBoolean(), channel);
|
return new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -1498,11 +1529,13 @@ public class TranslogTests extends ESTestCase {
|
||||||
public static class ThrowingFileChannel extends FilterFileChannel {
|
public static class ThrowingFileChannel extends FilterFileChannel {
|
||||||
private final AtomicBoolean fail;
|
private final AtomicBoolean fail;
|
||||||
private final boolean partialWrite;
|
private final boolean partialWrite;
|
||||||
|
private final boolean throwUnknownException;
|
||||||
|
|
||||||
public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) {
|
public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) {
|
||||||
super(delegate);
|
super(delegate);
|
||||||
this.fail = fail;
|
this.fail = fail;
|
||||||
this.partialWrite = partialWrite;
|
this.partialWrite = partialWrite;
|
||||||
|
this.throwUnknownException = throwUnknownException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1519,19 +1552,27 @@ public class TranslogTests extends ESTestCase {
|
||||||
public int write(ByteBuffer src) throws IOException {
|
public int write(ByteBuffer src) throws IOException {
|
||||||
if (fail.get()) {
|
if (fail.get()) {
|
||||||
if (partialWrite) {
|
if (partialWrite) {
|
||||||
if (src.limit() > 1) {
|
if (src.hasRemaining()) {
|
||||||
final int pos = src.position();
|
final int pos = src.position();
|
||||||
final int limit = src.limit();
|
final int limit = src.limit();
|
||||||
src.limit(limit / 2);
|
src.limit(randomIntBetween(pos, limit));
|
||||||
super.write(src);
|
super.write(src);
|
||||||
src.position(pos);
|
|
||||||
src.limit(limit);
|
src.limit(limit);
|
||||||
|
src.position(pos);
|
||||||
throw new IOException("__FAKE__ no space left on device");
|
throw new IOException("__FAKE__ no space left on device");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new MockDirectoryWrapper.FakeIOException();
|
if (throwUnknownException) {
|
||||||
|
throw new UnknownException();
|
||||||
|
} else {
|
||||||
|
throw new MockDirectoryWrapper.FakeIOException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return super.write(src);
|
return super.write(src);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class UnknownException extends RuntimeException {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue