From 96df7ba7eb72380be4e0a0b2cdaafd8f78173811 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 13 May 2015 20:32:25 +0200 Subject: [PATCH] Applied review comments from @mikemccand - Renamed TranslogSnapshot to MultiSnapshot - moved legacy logic for trucation into LegacyTranslogReaderBase - made several methods private and pkg private where applicable - renamed arguments for consistency --- .../index/translog/Checkpoint.java | 6 +-- .../translog/ImmutableTranslogReader.java | 4 +- .../translog/LegacyTranslogReaderBase.java | 6 ++- ...anslogSnapshot.java => MultiSnapshot.java} | 48 +++++++------------ .../index/translog/Translog.java | 44 ++++++++++------- 5 files changed, 55 insertions(+), 53 deletions(-) rename src/main/java/org/elasticsearch/index/translog/{TranslogSnapshot.java => MultiSnapshot.java} (57%) diff --git a/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index 586b1f7d8f7..9b73d0346a5 100644 --- a/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -55,14 +55,14 @@ class Checkpoint { generation = in.readLong(); } - void write(FileChannel channel) throws IOException { + private void write(FileChannel channel) throws IOException { byte[] buffer = new byte[BUFFER_SIZE]; final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer); write(out); Channels.writeToChannel(buffer, channel); } - public void write(DataOutput out) throws IOException { + private void write(DataOutput out) throws IOException { out.writeLong(offset); out.writeInt(numOps); out.writeLong(generation); @@ -70,7 +70,7 @@ class Checkpoint { @Override public String toString() { - return "TranslogInfo{" + + return "Checkpoint{" + "offset=" + offset + ", numOps=" + numOps + ", translogFileGeneration= " + generation + diff --git a/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java b/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java index 6d141108b5c..1d6d3b45a63 100644 --- a/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java +++ b/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java @@ -37,8 +37,8 @@ public class ImmutableTranslogReader extends TranslogReader { * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point * at the end of the last operation in this snapshot. */ - public ImmutableTranslogReader(long generation, ChannelReference channelReference, long offset, long length, int totalOperations) { - super(generation, channelReference, offset); + public ImmutableTranslogReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) { + super(generation, channelReference, firstOperationOffset); this.length = length; this.totalOperations = totalOperations; } diff --git a/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java b/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java index eff0d0675db..d9e9e17f792 100644 --- a/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java +++ b/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java @@ -47,7 +47,11 @@ class LegacyTranslogReaderBase extends ImmutableTranslogReader { if (position >= sizeInBytes()) { // this is the legacy case.... return null; } - return readOperation(); + try { + return readOperation(); + } catch (TruncatedTranslogException ex) { + return null; // legacy case + } } }; } diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java similarity index 57% rename from src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java rename to src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index dfd69eddf4b..b76214dc2e7 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -20,42 +20,40 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.logging.ESLogger; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -public class TranslogSnapshot implements Translog.Snapshot { +/** + * A snapshot composed out of multiple snapshots + */ +final class MultiSnapshot implements Translog.Snapshot { - private final List orderedTranslogs; + private final Translog.Snapshot[] translogs; private AtomicBoolean closed = new AtomicBoolean(false); private final int estimatedTotalOperations; - private int currentTranslog; + private int index; /** - * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point - * at the end of the last operation in this snapshot. + * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ - public TranslogSnapshot(List orderedTranslogs) { - this.orderedTranslogs = orderedTranslogs; + MultiSnapshot(Translog.Snapshot[] translogs) { + this.translogs = translogs; int ops = 0; - for (Translog.Snapshot translog : orderedTranslogs) { + for (Translog.Snapshot translog : translogs) { final int tops = translog.estimatedTotalOperations(); - if (tops < 0) { + if (tops == TranslogReader.UNKNOWN_OP_COUNT) { ops = TranslogReader.UNKNOWN_OP_COUNT; break; } + assert tops >= 0 : "tops must be positive but was: " + tops; ops += tops; } estimatedTotalOperations = ops; - currentTranslog = 0; + index = 0; } @@ -67,20 +65,10 @@ public class TranslogSnapshot implements Translog.Snapshot { @Override public Translog.Operation next() throws IOException { ensureOpen(); - for (; currentTranslog < orderedTranslogs.size(); currentTranslog++) { - final Translog.Snapshot current = orderedTranslogs.get(currentTranslog); - Translog.Operation op = null; - try { - op = current.next(); - } catch (TruncatedTranslogException e) { - if (estimatedTotalOperations == TranslogReader.UNKNOWN_OP_COUNT) { - // legacy translog file - can have UNKNOWN_OP_COUNT - // file is empty or header has been half-written and should be ignored - } else { - throw e; - } - } - if (op != null) { + for (; index < translogs.length; index++) { + final Translog.Snapshot current = translogs[index]; + Translog.Operation op = current.next(); + if (op != null) { // if we are null we move to the next snapshot return op; } } @@ -96,7 +84,7 @@ public class TranslogSnapshot implements Translog.Snapshot { @Override public void close() throws ElasticsearchException { if (closed.compareAndSet(false, true)) { - Releasables.close(orderedTranslogs); + Releasables.close(translogs); } } } diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 6116fe9f8a6..7aeff4a624c 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -433,7 +433,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } /** - * Adds a create operation to the transaction log. + * Adds a created / delete / index operations to the transaction log. + * + * @see org.elasticsearch.index.translog.Translog.Operation + * @see org.elasticsearch.index.translog.Translog.Create + * @see org.elasticsearch.index.translog.Translog.Index + * @see org.elasticsearch.index.translog.Translog.Delete */ public Location add(Operation operation) throws TranslogException { ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); @@ -467,23 +472,24 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC toOpen.add(currentCommittingTranslog); } toOpen.add(current); - return createdSnapshot(toOpen.toArray(new TranslogReader[toOpen.size()])); + return createSnapshot(toOpen.toArray(new TranslogReader[toOpen.size()])); } } - private Snapshot createdSnapshot(TranslogReader... translogs) { - ArrayList channelSnapshots = new ArrayList<>(); + private Snapshot createSnapshot(TranslogReader... translogs) { + Snapshot[] snapshots = new Snapshot[translogs.length]; boolean success = false; try { - for (TranslogReader translog : translogs) { - channelSnapshots.add(translog.newSnapshot()); + for (int i = 0; i < translogs.length; i++) { + snapshots[i] = translogs[i].newSnapshot(); } - Snapshot snapshot = new TranslogSnapshot(channelSnapshots); + + Snapshot snapshot = new MultiSnapshot(snapshots); success = true; return snapshot; } finally { if (success == false) { - Releasables.close(channelSnapshots); + Releasables.close(snapshots); } } } @@ -672,7 +678,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public synchronized Snapshot snapshot() { ensureOpen(); - return createdSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); + return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); } @@ -1568,15 +1574,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException { - int opSize = in.readInt(); + final int opSize = in.readInt(); + if (opSize < 4) { // 4byte for the checksum + throw new TranslogCorruptedException("operation size must be at least 4 but was: " + opSize); + } Translog.Operation operation; try { - in.resetDigest(); // size is not part of the checksum? + in.resetDigest(); // size is not part of the checksum! if (in.markSupported()) { // if we can we validate the checksum first + // we are sometimes called when mark is not supported this is the case when + // we are sending translogs across the network - currently there is no way to prevent this unfortunately. in.mark(opSize); - if (opSize < 4) { // 4byte for the checksum - throw new TranslogCorruptedException("operation size must be at least 4 but was: " + opSize); - } + in.skip(opSize-4); verifyChecksum(in); in.reset(); @@ -1594,6 +1603,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException { + //TODO lets get rid of this crazy double writing here. + // We first write to a NoopStreamOutput to get the size of the // operation. We could write to a byte array and then send that as an // alternative, but here we choose to use CPU over allocating new @@ -1637,12 +1648,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public void prepareCommit() throws IOException { ensureOpen(); - TranslogWriter writer = null; try (ReleasableLock lock = writeLock.acquire()) { if (currentCommittingTranslog != null) { throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); } - writer = current; + final TranslogWriter writer = current; writer.sync(); currentCommittingTranslog = current.immutableReader(); Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); @@ -1664,7 +1674,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC assert writer.syncNeeded() == false : "old translog writer must not need a sync"; } catch (Throwable t) { - close(); // tragic event + IOUtils.closeWhileHandlingException(this); // tragic event throw t; } }