better handling of fs gateway when sync to disk of the translog file

This commit is contained in:
kimchy 2010-03-31 15:36:43 +03:00
parent 875e7b7449
commit 1a698df69d

View File

@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.index.translog.TranslogStreams.*;
import static org.elasticsearch.util.io.FileSystemUtils.*;
import static org.elasticsearch.util.lucene.Directories.*;
/**
@ -79,8 +80,6 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
private volatile int lastTranslogSize;
private RandomAccessFile translogFile;
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway, IndexShard indexShard, Store store) {
super(shardId, indexSettings);
this.threadPool = threadPool;
@ -166,51 +165,53 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
}
}
if (translogSnapshot.translogId() != lastTranslogId || translogFile == null) {
// we reopen the RAF each snapshot and not keep an open one since we want to make sure we
// can sync it to disk later on (close it as well)
File translogFile = new File(locationTranslog, "translog-" + translogSnapshot.translogId());
RandomAccessFile translogRaf = null;
// if we have a different trnaslogId (or the file does not exists at all), we want to flush the full
// translog to a new file (based on the translogId). If we still work on existing translog, just
// append the latest translog operations
if (translogSnapshot.translogId() != lastTranslogId || !translogFile.exists()) {
translogDirty = true;
if (translogFile != null) {
try {
translogFile.close();
} catch (IOException e) {
// ignore
}
}
try {
File f = new File(locationTranslog, "translog-" + translogSnapshot.translogId());
translogFile = new RandomAccessFile(f, "rw");
StreamOutput out = new DataOutputStreamOutput(translogFile);
translogRaf = new RandomAccessFile(translogFile, "rw");
StreamOutput out = new DataOutputStreamOutput(translogRaf);
out.writeInt(-1); // write the number of operations header with -1 currently
// double check that we managed to read/write correctly
translogFile.seek(0);
if (translogFile.readInt() != -1) {
throw new ElasticSearchIllegalStateException("Wrote to snapshot file [" + f + "] but did not read...");
translogRaf.seek(0);
if (translogRaf.readInt() != -1) {
throw new ElasticSearchIllegalStateException("Wrote to snapshot file [" + translogFile + "] but did not read...");
}
for (Translog.Operation operation : translogSnapshot) {
writeTranslogOperation(out, operation);
}
} catch (Exception e) {
try {
translogFile.close();
translogRaf.close();
} catch (IOException e1) {
// ignore
}
translogFile = null;
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog", e);
}
} else if (translogSnapshot.size() > lastTranslogSize) {
translogDirty = true;
try {
StreamOutput out = new DataOutputStreamOutput(translogFile);
translogRaf = new RandomAccessFile(translogFile, "rw");
// seek to the end, since we append
translogRaf.seek(translogRaf.length());
StreamOutput out = new DataOutputStreamOutput(translogRaf);
for (Translog.Operation operation : translogSnapshot.skipTo(lastTranslogSize)) {
writeTranslogOperation(out, operation);
}
} catch (Exception e) {
try {
translogFile.close();
translogRaf.close();
} catch (IOException e1) {
// ignore
}
translogFile = null;
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog", e);
}
}
@ -222,18 +223,19 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()));
}
if (translogDirty) {
translogFile.seek(0);
translogFile.writeInt(translogSnapshot.size());
translogFile.seek(translogFile.length());
translogFile.getFD().sync();
translogRaf.seek(0);
translogRaf.writeInt(translogSnapshot.size());
translogRaf.close();
// now, sync the translog
syncFile(translogFile);
}
} catch (Exception e) {
try {
translogFile.close();
translogRaf.close();
} catch (IOException e1) {
// ignore
}
translogFile = null;
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize snapshot", e);
}