From 1a698df69d5d2e21382dc48cb38fb4f2d1056752 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 31 Mar 2010 15:36:43 +0300 Subject: [PATCH] better handling of fs gateway when sync to disk of the translog file --- .../index/gateway/fs/FsIndexShardGateway.java | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java index c245c1db970..278fb594fe7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicReference; import static com.google.common.collect.Lists.*; import static org.elasticsearch.index.translog.TranslogStreams.*; +import static org.elasticsearch.util.io.FileSystemUtils.*; import static org.elasticsearch.util.lucene.Directories.*; /** @@ -79,8 +80,6 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements private volatile int lastTranslogSize; - private RandomAccessFile translogFile; - @Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway, IndexShard indexShard, Store store) { super(shardId, indexSettings); this.threadPool = threadPool; @@ -166,51 +165,53 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get()); } } - if (translogSnapshot.translogId() != lastTranslogId || translogFile == null) { + // we reopen the RAF each snapshot and not keep an open one since we want to make sure we + // can sync it to disk later on (close it as well) + File translogFile = new File(locationTranslog, "translog-" + translogSnapshot.translogId()); + RandomAccessFile translogRaf = null; + + // if we have a different trnaslogId (or the file does not exists at all), we want to flush the full + // translog to a new file (based on the translogId). If we still work on existing translog, just + // append the latest translog operations + + if (translogSnapshot.translogId() != lastTranslogId || !translogFile.exists()) { translogDirty = true; - if (translogFile != null) { - try { - translogFile.close(); - } catch (IOException e) { - // ignore - } - } try { - File f = new File(locationTranslog, "translog-" + translogSnapshot.translogId()); - translogFile = new RandomAccessFile(f, "rw"); - StreamOutput out = new DataOutputStreamOutput(translogFile); + translogRaf = new RandomAccessFile(translogFile, "rw"); + StreamOutput out = new DataOutputStreamOutput(translogRaf); out.writeInt(-1); // write the number of operations header with -1 currently // double check that we managed to read/write correctly - translogFile.seek(0); - if (translogFile.readInt() != -1) { - throw new ElasticSearchIllegalStateException("Wrote to snapshot file [" + f + "] but did not read..."); + translogRaf.seek(0); + if (translogRaf.readInt() != -1) { + throw new ElasticSearchIllegalStateException("Wrote to snapshot file [" + translogFile + "] but did not read..."); } for (Translog.Operation operation : translogSnapshot) { writeTranslogOperation(out, operation); } } catch (Exception e) { try { - translogFile.close(); + translogRaf.close(); } catch (IOException e1) { // ignore } - translogFile = null; throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog", e); } } else if (translogSnapshot.size() > lastTranslogSize) { translogDirty = true; try { - StreamOutput out = new DataOutputStreamOutput(translogFile); + translogRaf = new RandomAccessFile(translogFile, "rw"); + // seek to the end, since we append + translogRaf.seek(translogRaf.length()); + StreamOutput out = new DataOutputStreamOutput(translogRaf); for (Translog.Operation operation : translogSnapshot.skipTo(lastTranslogSize)) { writeTranslogOperation(out, operation); } } catch (Exception e) { try { - translogFile.close(); + translogRaf.close(); } catch (IOException e1) { // ignore } - translogFile = null; throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog", e); } } @@ -222,18 +223,19 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements new File(locationIndex, snapshotIndexCommit.getSegmentsFileName())); } if (translogDirty) { - translogFile.seek(0); - translogFile.writeInt(translogSnapshot.size()); - translogFile.seek(translogFile.length()); - translogFile.getFD().sync(); + translogRaf.seek(0); + translogRaf.writeInt(translogSnapshot.size()); + translogRaf.close(); + + // now, sync the translog + syncFile(translogFile); } } catch (Exception e) { try { - translogFile.close(); + translogRaf.close(); } catch (IOException e1) { // ignore } - translogFile = null; throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize snapshot", e); }