Applied review comments from @mikemccand

- Renamed TranslogSnapshot to MultiSnapshot
 - moved legacy logic for trucation into LegacyTranslogReaderBase
 - made several methods private and pkg private where applicable
 - renamed arguments for consistency
This commit is contained in:
Simon Willnauer 2015-05-13 20:32:25 +02:00
parent 43ff544117
commit 96df7ba7eb
5 changed files with 55 additions and 53 deletions

View File

@ -55,14 +55,14 @@ class Checkpoint {
generation = in.readLong(); generation = in.readLong();
} }
void write(FileChannel channel) throws IOException { private void write(FileChannel channel) throws IOException {
byte[] buffer = new byte[BUFFER_SIZE]; byte[] buffer = new byte[BUFFER_SIZE];
final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer); final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
write(out); write(out);
Channels.writeToChannel(buffer, channel); Channels.writeToChannel(buffer, channel);
} }
public void write(DataOutput out) throws IOException { private void write(DataOutput out) throws IOException {
out.writeLong(offset); out.writeLong(offset);
out.writeInt(numOps); out.writeInt(numOps);
out.writeLong(generation); out.writeLong(generation);
@ -70,7 +70,7 @@ class Checkpoint {
@Override @Override
public String toString() { public String toString() {
return "TranslogInfo{" + return "Checkpoint{" +
"offset=" + offset + "offset=" + offset +
", numOps=" + numOps + ", numOps=" + numOps +
", translogFileGeneration= " + generation + ", translogFileGeneration= " + generation +

View File

@ -37,8 +37,8 @@ public class ImmutableTranslogReader extends TranslogReader {
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot. * at the end of the last operation in this snapshot.
*/ */
public ImmutableTranslogReader(long generation, ChannelReference channelReference, long offset, long length, int totalOperations) { public ImmutableTranslogReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) {
super(generation, channelReference, offset); super(generation, channelReference, firstOperationOffset);
this.length = length; this.length = length;
this.totalOperations = totalOperations; this.totalOperations = totalOperations;
} }

View File

@ -47,7 +47,11 @@ class LegacyTranslogReaderBase extends ImmutableTranslogReader {
if (position >= sizeInBytes()) { // this is the legacy case.... if (position >= sizeInBytes()) { // this is the legacy case....
return null; return null;
} }
return readOperation(); try {
return readOperation();
} catch (TruncatedTranslogException ex) {
return null; // legacy case
}
} }
}; };
} }

View File

@ -20,42 +20,40 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class TranslogSnapshot implements Translog.Snapshot { /**
* A snapshot composed out of multiple snapshots
*/
final class MultiSnapshot implements Translog.Snapshot {
private final List<Translog.Snapshot> orderedTranslogs; private final Translog.Snapshot[] translogs;
private AtomicBoolean closed = new AtomicBoolean(false); private AtomicBoolean closed = new AtomicBoolean(false);
private final int estimatedTotalOperations; private final int estimatedTotalOperations;
private int currentTranslog; private int index;
/** /**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
* at the end of the last operation in this snapshot.
*/ */
public TranslogSnapshot(List<Translog.Snapshot> orderedTranslogs) { MultiSnapshot(Translog.Snapshot[] translogs) {
this.orderedTranslogs = orderedTranslogs; this.translogs = translogs;
int ops = 0; int ops = 0;
for (Translog.Snapshot translog : orderedTranslogs) { for (Translog.Snapshot translog : translogs) {
final int tops = translog.estimatedTotalOperations(); final int tops = translog.estimatedTotalOperations();
if (tops < 0) { if (tops == TranslogReader.UNKNOWN_OP_COUNT) {
ops = TranslogReader.UNKNOWN_OP_COUNT; ops = TranslogReader.UNKNOWN_OP_COUNT;
break; break;
} }
assert tops >= 0 : "tops must be positive but was: " + tops;
ops += tops; ops += tops;
} }
estimatedTotalOperations = ops; estimatedTotalOperations = ops;
currentTranslog = 0; index = 0;
} }
@ -67,20 +65,10 @@ public class TranslogSnapshot implements Translog.Snapshot {
@Override @Override
public Translog.Operation next() throws IOException { public Translog.Operation next() throws IOException {
ensureOpen(); ensureOpen();
for (; currentTranslog < orderedTranslogs.size(); currentTranslog++) { for (; index < translogs.length; index++) {
final Translog.Snapshot current = orderedTranslogs.get(currentTranslog); final Translog.Snapshot current = translogs[index];
Translog.Operation op = null; Translog.Operation op = current.next();
try { if (op != null) { // if we are null we move to the next snapshot
op = current.next();
} catch (TruncatedTranslogException e) {
if (estimatedTotalOperations == TranslogReader.UNKNOWN_OP_COUNT) {
// legacy translog file - can have UNKNOWN_OP_COUNT
// file is empty or header has been half-written and should be ignored
} else {
throw e;
}
}
if (op != null) {
return op; return op;
} }
} }
@ -96,7 +84,7 @@ public class TranslogSnapshot implements Translog.Snapshot {
@Override @Override
public void close() throws ElasticsearchException { public void close() throws ElasticsearchException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
Releasables.close(orderedTranslogs); Releasables.close(translogs);
} }
} }
} }

View File

@ -433,7 +433,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
/** /**
* Adds a create operation to the transaction log. * Adds a created / delete / index operations to the transaction log.
*
* @see org.elasticsearch.index.translog.Translog.Operation
* @see org.elasticsearch.index.translog.Translog.Create
* @see org.elasticsearch.index.translog.Translog.Index
* @see org.elasticsearch.index.translog.Translog.Delete
*/ */
public Location add(Operation operation) throws TranslogException { public Location add(Operation operation) throws TranslogException {
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
@ -467,23 +472,24 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
toOpen.add(currentCommittingTranslog); toOpen.add(currentCommittingTranslog);
} }
toOpen.add(current); toOpen.add(current);
return createdSnapshot(toOpen.toArray(new TranslogReader[toOpen.size()])); return createSnapshot(toOpen.toArray(new TranslogReader[toOpen.size()]));
} }
} }
private Snapshot createdSnapshot(TranslogReader... translogs) { private Snapshot createSnapshot(TranslogReader... translogs) {
ArrayList<Translog.Snapshot> channelSnapshots = new ArrayList<>(); Snapshot[] snapshots = new Snapshot[translogs.length];
boolean success = false; boolean success = false;
try { try {
for (TranslogReader translog : translogs) { for (int i = 0; i < translogs.length; i++) {
channelSnapshots.add(translog.newSnapshot()); snapshots[i] = translogs[i].newSnapshot();
} }
Snapshot snapshot = new TranslogSnapshot(channelSnapshots);
Snapshot snapshot = new MultiSnapshot(snapshots);
success = true; success = true;
return snapshot; return snapshot;
} finally { } finally {
if (success == false) { if (success == false) {
Releasables.close(channelSnapshots); Releasables.close(snapshots);
} }
} }
} }
@ -672,7 +678,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public synchronized Snapshot snapshot() { public synchronized Snapshot snapshot() {
ensureOpen(); ensureOpen();
return createdSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()]));
} }
@ -1568,15 +1574,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException { static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException {
int opSize = in.readInt(); final int opSize = in.readInt();
if (opSize < 4) { // 4byte for the checksum
throw new TranslogCorruptedException("operation size must be at least 4 but was: " + opSize);
}
Translog.Operation operation; Translog.Operation operation;
try { try {
in.resetDigest(); // size is not part of the checksum? in.resetDigest(); // size is not part of the checksum!
if (in.markSupported()) { // if we can we validate the checksum first if (in.markSupported()) { // if we can we validate the checksum first
// we are sometimes called when mark is not supported this is the case when
// we are sending translogs across the network - currently there is no way to prevent this unfortunately.
in.mark(opSize); in.mark(opSize);
if (opSize < 4) { // 4byte for the checksum
throw new TranslogCorruptedException("operation size must be at least 4 but was: " + opSize);
}
in.skip(opSize-4); in.skip(opSize-4);
verifyChecksum(in); verifyChecksum(in);
in.reset(); in.reset();
@ -1594,6 +1603,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException { public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException {
//TODO lets get rid of this crazy double writing here.
// We first write to a NoopStreamOutput to get the size of the // We first write to a NoopStreamOutput to get the size of the
// operation. We could write to a byte array and then send that as an // operation. We could write to a byte array and then send that as an
// alternative, but here we choose to use CPU over allocating new // alternative, but here we choose to use CPU over allocating new
@ -1637,12 +1648,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override @Override
public void prepareCommit() throws IOException { public void prepareCommit() throws IOException {
ensureOpen(); ensureOpen();
TranslogWriter writer = null;
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
if (currentCommittingTranslog != null) { if (currentCommittingTranslog != null) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
} }
writer = current; final TranslogWriter writer = current;
writer.sync(); writer.sync();
currentCommittingTranslog = current.immutableReader(); currentCommittingTranslog = current.immutableReader();
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
@ -1664,7 +1674,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
assert writer.syncNeeded() == false : "old translog writer must not need a sync"; assert writer.syncNeeded() == false : "old translog writer must not need a sync";
} catch (Throwable t) { } catch (Throwable t) {
close(); // tragic event IOUtils.closeWhileHandlingException(this); // tragic event
throw t; throw t;
} }
} }