Fail and close translog hard if writing to disk fails

Today we are super lenient (how could I missed that for f**k sake) with failing
/ closing the translog writer when we hit an exception. It's actually worse, we allow
to further write to it and don't care what has been already written to disk and what hasn't.
We keep the buffer in memory and try to write it again on the next operation.

When we hit a disk-full expcetion due to for instance a big merge we are likely adding document to the
translog but fail to write them to disk. Once the merge failed and freed up it's diskspace (note this is
a small window when concurrently indexing and failing the shard due to out of space exceptions) we will
allow in-flight operations to add to the translog and then once we fail the shard fsync it. These operations
are written to disk and fsynced which is fine but the previous buffer flush might have written some bytes
to disk which are not corrupting the translog. That wouldn't be an issue if we prevented the fsync.

Closes #15333
This commit is contained in:
Simon Willnauer 2015-12-14 12:51:58 +01:00
parent 2602439a51
commit 2d03a6b808
6 changed files with 218 additions and 56 deletions

View File

@ -48,13 +48,17 @@ public final class BufferingTranslogWriter extends TranslogWriter {
public Translog.Location add(BytesReference data) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
operationCounter++;
final long offset = totalOffset;
if (data.length() >= buffer.length) {
flush();
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
try {
data.writeTo(channel);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
}
writtenOffset += data.length();
totalOffset += data.length();
return new Translog.Location(generation, offset, data.length());
@ -64,6 +68,7 @@ public final class BufferingTranslogWriter extends TranslogWriter {
}
data.writeTo(bufferOs);
totalOffset += data.length();
operationCounter++;
return new Translog.Location(generation, offset, data.length());
}
}
@ -71,10 +76,17 @@ public final class BufferingTranslogWriter extends TranslogWriter {
protected final void flush() throws IOException {
assert writeLock.isHeldByCurrentThread();
if (bufferCount > 0) {
ensureOpen();
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
Channels.writeToChannel(buffer, 0, bufferCount, channel);
writtenOffset += bufferCount;
final int bufferSize = bufferCount;
try {
Channels.writeToChannel(buffer, 0, bufferSize, channel);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
}
writtenOffset += bufferSize;
bufferCount = 0;
}
}
@ -97,7 +109,7 @@ public final class BufferingTranslogWriter extends TranslogWriter {
}
@Override
public boolean syncNeeded() {
public synchronized boolean syncNeeded() {
return totalOffset != lastSyncedOffset;
}
@ -107,15 +119,24 @@ public final class BufferingTranslogWriter extends TranslogWriter {
return;
}
synchronized (this) {
ensureOpen();
channelReference.incRef();
try {
final long offsetToSync;
try (ReleasableLock lock = writeLock.acquire()) {
flush();
lastSyncedOffset = totalOffset;
offsetToSync = totalOffset;
}
// we can do this outside of the write lock but we have to protect from
// concurrent syncs
checkpoint(lastSyncedOffset, operationCounter, channelReference);
try {
ensureOpen();
checkpoint(offsetToSync, operationCounter, channelReference);
} catch (IOException ex) {
closeWithTragicEvent(ex);
throw ex;
}
lastSyncedOffset = offsetToSync;
} finally {
channelReference.decRef();
}

View File

@ -115,7 +115,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final Path location;
private TranslogWriter current;
private volatile ImmutableTranslogReader currentCommittingTranslog;
private long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config;
private final String translogUUID;
@ -287,12 +287,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try (ReleasableLock lock = writeLock.acquire()) {
try {
current.sync();
} finally {
try {
IOUtils.close(current, currentCommittingTranslog);
} finally {
IOUtils.close(recoveredTranslogs);
recoveredTranslogs.clear();
}
}
} finally {
FutureUtils.cancel(syncScheduler);
logger.debug("translog closed");
@ -354,7 +358,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
TranslogWriter createWriter(long fileGeneration) throws IOException {
TranslogWriter newFile;
try {
newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize());
newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize(), getChannelFactory());
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
@ -548,7 +552,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final class OnCloseRunnable implements Callback<ChannelReference> {
@Override
public void handle(ChannelReference channelReference) {
try (ReleasableLock lock = writeLock.acquire()) {
if (isReferencedGeneration(channelReference.getGeneration()) == false) {
Path translogPath = channelReference.getPath();
assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath;
@ -575,7 +578,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
}
}
/**
* a view into the translog, capturing all translog file at the moment of creation
@ -1400,4 +1402,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return outstandingViews.size();
}
TranslogWriter.ChannelFactory getChannelFactory() {
return TranslogWriter.ChannelFactory.DEFAULT;
}
}

View File

@ -140,16 +140,16 @@ public abstract class TranslogReader implements Closeable, Comparable<TranslogRe
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
doClose();
channelReference.decRef();
}
}
protected void doClose() throws IOException {
channelReference.decRef();
protected final boolean isClosed() {
return closed.get();
}
protected void ensureOpen() {
if (closed.get()) {
if (isClosed()) {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed");
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
@ -54,6 +55,8 @@ public class TranslogWriter extends TranslogReader {
protected volatile int operationCounter;
/* the offset in bytes written to the file */
protected volatile long writtenOffset;
protected volatile Throwable tragicEvent;
public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
super(generation, channelReference, channelReference.getChannel().position());
@ -65,10 +68,10 @@ public class TranslogWriter extends TranslogReader {
this.lastSyncedOffset = channelReference.getChannel().position();;
}
public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize) throws IOException {
public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize, ChannelFactory channelFactory) throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
final FileChannel channel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
final FileChannel channel = channelFactory.open(file);
try {
// This OutputStreamDataOutput is intentionally not closed because
// closing it will close the FileChannel
@ -118,6 +121,18 @@ public class TranslogWriter extends TranslogReader {
}
}
protected final void closeWithTragicEvent(Throwable throwable) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
if (throwable != null) {
if (tragicEvent == null) {
tragicEvent = throwable;
} else {
tragicEvent.addSuppressed(throwable);
}
}
close();
}
}
/**
* add the given bytes to the translog and return the location they were written at
@ -127,9 +142,14 @@ public class TranslogWriter extends TranslogReader {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
position = writtenOffset;
try {
data.writeTo(channel);
} catch (Throwable e) {
closeWithTragicEvent(e);
throw e;
}
writtenOffset = writtenOffset + data.length();
operationCounter = operationCounter + 1;
operationCounter++;;
}
return new Translog.Location(generation, position, data.length());
}
@ -147,6 +167,7 @@ public class TranslogWriter extends TranslogReader {
// check if we really need to sync here...
if (syncNeeded()) {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
lastSyncedOffset = writtenOffset;
checkpoint(lastSyncedOffset, operationCounter, channelReference);
}
@ -262,15 +283,6 @@ public class TranslogWriter extends TranslogReader {
return false;
}
@Override
protected final void doClose() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
sync();
} finally {
super.doClose();
}
}
@Override
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
@ -288,4 +300,20 @@ public class TranslogWriter extends TranslogReader {
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
Checkpoint.write(checkpointFile, checkpoint, options);
}
static class ChannelFactory {
static final ChannelFactory DEFAULT = new ChannelFactory();
// only for testing until we have a disk-full FileSystemt
public FileChannel open(Path file) throws IOException {
return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}
}
protected final void ensureOpen() {
if (isClosed()) {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragicEvent);
}
}
}

View File

@ -34,13 +34,12 @@ import java.nio.file.Path;
public class BufferedTranslogTests extends TranslogTests {
@Override
protected Translog create(Path path) throws IOException {
protected TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.settingsBuilder()
.put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name())
.put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES)
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
return new Translog(translogConfig);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.translog;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.IOUtils;
@ -110,13 +111,16 @@ public class TranslogTests extends ESTestCase {
}
}
protected Translog create(Path path) throws IOException {
private Translog create(Path path) throws IOException {
return new Translog(getTranslogConfig(path));
}
protected TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.settingsBuilder()
.put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.SIMPLE.name())
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
return new Translog(translogConfig);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
}
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) {
@ -1279,4 +1283,108 @@ public class TranslogTests extends ESTestCase {
}
}
}
public void testFailFlush() throws IOException {
Path tempDir = createTempDir();
final AtomicBoolean failWrite = new AtomicBoolean();
final AtomicBoolean simulateDiskFull = new AtomicBoolean();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new FilterFileChannel(channel) {
@Override
public int write(ByteBuffer src) throws IOException {
if (failWrite.get()) {
throw new IOException("boom");
}
if (simulateDiskFull.get()) {
if (src.limit() > 1) {
final int pos = src.position();
final int limit = src.limit();
src.limit(limit / 2);
super.write(src);
src.position(pos);
src.limit(limit);
throw new IOException("no space left on device");
}
}
return super.write(src);
}
};
}
};
}
};
List<Translog.Location> locations = new ArrayList<>();
int opsSynced = 0;
int opsAdded = 0;
boolean failed = false;
boolean syncFailed = true;
while(failed == false) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
opsAdded++;
translog.sync();
opsSynced++;
} catch (IOException ex) {
failed = true;
assertEquals("no space left on device", ex.getMessage());
} catch (Exception ex) {
failed = true;
assertTrue(ex.toString(), ex.getMessage().startsWith("Failed to write operation"));
}
simulateDiskFull.set(randomBoolean());
}
simulateDiskFull.set(false);
if (randomBoolean()) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
opsSynced++;
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
assertEquals(ex.getCause().getMessage(), "no space left on device");
}
}
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
try {
translog.newSnapshot();
fail("already closed");
} catch (AlreadyClosedException ex) {
// all is well
}
try {
translog.close();
if (opsAdded != opsSynced) {
fail("already closed");
}
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
}
config.setTranslogGeneration(translogGeneration);
try (Translog tlog = new Translog(config)){
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
assertEquals(opsSynced, snapshot.estimatedTotalOperations());
for (int i = 0; i < opsSynced; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
}
}