Remove unused Translog#read method (#20598)

Translog#read is a left-over from realtime-get that allows to read
from an arbitrary location in the transaction log. This method is unused
and can be replaced with snapshots in tests.
This commit is contained in:
Simon Willnauer 2016-09-21 14:19:49 +02:00 committed by GitHub
parent b3e5e6a0ba
commit 6dc03ecb10
4 changed files with 51 additions and 138 deletions

View File

@ -58,34 +58,21 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
return firstOperationOffset; return firstOperationOffset;
} }
public Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]";
ByteBuffer buffer = ByteBuffer.allocate(location.size);
try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) {
return read(checksumStreamInput);
}
}
/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */ /** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
protected final int readSize(ByteBuffer reusableBuffer, long position) { protected final int readSize(ByteBuffer reusableBuffer, long position) throws IOException {
// read op size from disk // read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]"; assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
try { reusableBuffer.clear();
reusableBuffer.clear(); reusableBuffer.limit(4);
reusableBuffer.limit(4); readBytes(reusableBuffer, position);
readBytes(reusableBuffer, position); reusableBuffer.flip();
reusableBuffer.flip(); // Add an extra 4 to account for the operation size integer itself
// Add an extra 4 to account for the operation size integer itself final int size = reusableBuffer.getInt() + 4;
final int size = reusableBuffer.getInt() + 4; final long maxSize = sizeInBytes() - position;
final long maxSize = sizeInBytes() - position; if (size < 0 || size > maxSize) {
if (size < 0 || size > maxSize) { throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
} catch (IOException e) {
throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e);
} }
return size;
} }
public Translog.Snapshot newSnapshot() { public Translog.Snapshot newSnapshot() {

View File

@ -384,31 +384,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return newFile; return newFile;
} }
/**
* Read the Operation object from the given location. This method will try to read the given location from
* the current or from the currently committing translog file. If the location is in a file that has already
* been closed or even removed the method will return <code>null</code> instead.
*/
Translog.Operation read(Location location) { // TODO this is only here for testing - we can remove it?
try (ReleasableLock lock = readLock.acquire()) {
final BaseTranslogReader reader;
final long currentGeneration = current.getGeneration();
if (currentGeneration == location.generation) {
reader = current;
} else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) {
reader = readers.get(readers.size() - 1);
} else if (currentGeneration < location.generation) {
throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]");
} else {
return null;
}
return reader.read(location);
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from translog location " + location, e);
}
}
/** /**
* Adds a delete / index operations to the transaction log. * Adds a delete / index operations to the transaction log.
* *
@ -432,7 +407,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try (ReleasableLock lock = readLock.acquire()) { try (ReleasableLock lock = readLock.acquire()) {
ensureOpen(); ensureOpen();
Location location = current.add(bytes); Location location = current.add(bytes);
assert assertBytesAtLocation(location, bytes);
return location; return location;
} }
} catch (AlreadyClosedException | IOException ex) { } catch (AlreadyClosedException | IOException ex) {
@ -469,12 +443,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
// tests can override this
ByteBuffer buffer = ByteBuffer.allocate(location.size);
current.readBytes(buffer, location.translogLocation);
return new BytesArray(buffer.array()).equals(expectedBytes);
}
/** /**
* Snapshots the current transaction log allowing to safely iterate over the snapshot. * Snapshots the current transaction log allowing to safely iterate over the snapshot.

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Path; import java.nio.file.Path;
public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot { final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
private final int totalOperations; private final int totalOperations;
protected final long length; protected final long length;
@ -51,7 +51,7 @@ public class TranslogSnapshot extends BaseTranslogReader implements Translog.Sna
} }
@Override @Override
public final int totalOperations() { public int totalOperations() {
return totalOperations; return totalOperations;
} }
@ -64,7 +64,7 @@ public class TranslogSnapshot extends BaseTranslogReader implements Translog.Sna
} }
} }
protected final Translog.Operation readOperation() throws IOException { protected Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position); final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse); reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse); Translog.Operation op = read(reuse);

View File

@ -85,6 +85,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
@ -206,53 +207,6 @@ public class TranslogTests extends ESTestCase {
return string; return string;
} }
public void testRead() throws IOException {
Location loc0 = translog.getLastWriteLocation();
assertNotNull(loc0);
Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1}));
assertThat(loc1, greaterThan(loc0));
assertThat(translog.getLastWriteLocation(), greaterThan(loc1));
Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2}));
assertThat(loc2, greaterThan(loc1));
assertThat(translog.getLastWriteLocation(), greaterThan(loc2));
assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));
Translog.Location lastLocBeforeSync = translog.getLastWriteLocation();
translog.sync();
assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));
Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3}));
assertThat(loc3, greaterThan(loc2));
assertThat(translog.getLastWriteLocation(), greaterThan(loc3));
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
lastLocBeforeSync = translog.getLastWriteLocation();
translog.sync();
assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
translog.prepareCommit();
/*
* The commit adds to the lastWriteLocation even though is isn't really a write. This is just an implementation artifact but it can
* safely be ignored because the lastWriteLocation continues to be greater than the Location returned from the last write operation
* and less than the location of the next write operation.
*/
assertThat(translog.getLastWriteLocation(), greaterThan(lastLocBeforeSync));
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
translog.commit();
assertNull(translog.read(loc1));
assertNull(translog.read(loc2));
assertNull(translog.read(loc3));
try {
translog.read(new Translog.Location(translog.currentFileGeneration() + 1, 17, 35));
fail("generation is greater than the current");
} catch (IllegalStateException ex) {
// expected
}
}
public void testSimpleOperations() throws IOException { public void testSimpleOperations() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>(); ArrayList<Translog.Operation> ops = new ArrayList<>();
@ -441,7 +395,7 @@ public class TranslogTests extends ESTestCase {
assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id)))); assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id))));
} }
static class LocationOperation { static class LocationOperation implements Comparable<LocationOperation> {
final Translog.Operation operation; final Translog.Operation operation;
final Translog.Location location; final Translog.Location location;
@ -450,6 +404,10 @@ public class TranslogTests extends ESTestCase {
this.location = location; this.location = location;
} }
@Override
public int compareTo(LocationOperation o) {
return location.compareTo(o.location);
}
} }
public void testConcurrentWritesWithVaryingSize() throws Throwable { public void testConcurrentWritesWithVaryingSize() throws Throwable {
@ -478,8 +436,12 @@ public class TranslogTests extends ESTestCase {
threads[i].join(60 * 1000); threads[i].join(60 * 1000);
} }
for (LocationOperation locationOperation : writtenOperations) { List<LocationOperation> collect = writtenOperations.stream().collect(Collectors.toList());
Translog.Operation op = translog.read(locationOperation.location); Collections.sort(collect);
Translog.Snapshot snapshot = translog.newSnapshot();
for (LocationOperation locationOperation : collect) {
Translog.Operation op = snapshot.next();
assertNotNull(op);
Translog.Operation expectedOp = locationOperation.operation; Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType()); assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) { switch (op.opType()) {
@ -505,6 +467,7 @@ public class TranslogTests extends ESTestCase {
} }
} }
assertNull(snapshot.next());
} }
@ -521,13 +484,16 @@ public class TranslogTests extends ESTestCase {
corruptTranslogs(translogDir); corruptTranslogs(translogDir);
AtomicInteger corruptionsCaught = new AtomicInteger(0); AtomicInteger corruptionsCaught = new AtomicInteger(0);
Translog.Snapshot snapshot = translog.newSnapshot();
for (Translog.Location location : locations) { for (Translog.Location location : locations) {
try { try {
translog.read(location); Translog.Operation next = snapshot.next();
assertNotNull(next);
} catch (TranslogCorruptedException e) { } catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet(); corruptionsCaught.incrementAndGet();
} }
} }
expectThrows(TranslogCorruptedException.class, () -> snapshot.next());
assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1)); assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
} }
@ -544,15 +510,12 @@ public class TranslogTests extends ESTestCase {
truncateTranslogs(translogDir); truncateTranslogs(translogDir);
AtomicInteger truncations = new AtomicInteger(0); AtomicInteger truncations = new AtomicInteger(0);
Translog.Snapshot snap = translog.newSnapshot();
for (Translog.Location location : locations) { for (Translog.Location location : locations) {
try { try {
translog.read(location); assertNotNull(snap.next());
} catch (ElasticsearchException e) { } catch (EOFException e) {
if (e.getCause() instanceof EOFException) { truncations.incrementAndGet();
truncations.incrementAndGet();
} else {
throw e;
}
} }
} }
assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1)); assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1));
@ -860,8 +823,14 @@ public class TranslogTests extends ESTestCase {
} }
assertEquals(max.generation, translog.currentFileGeneration()); assertEquals(max.generation, translog.currentFileGeneration());
final Translog.Operation read = translog.read(max); Translog.Snapshot snap = translog.newSnapshot();
assertEquals(read.getSource().source.utf8ToString(), Integer.toString(count)); Translog.Operation next;
Translog.Operation maxOp = null;
while ((next = snap.next()) != null) {
maxOp = next;
}
assertNotNull(maxOp);
assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count));
} }
public static Translog.Location max(Translog.Location a, Translog.Location b) { public static Translog.Location max(Translog.Location a, Translog.Location b) {
@ -884,30 +853,24 @@ public class TranslogTests extends ESTestCase {
} }
} }
assertEquals(translogOperations, translog.totalOperations()); assertEquals(translogOperations, translog.totalOperations());
final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)); final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
assertEquals(lastSynced + 1, reader.totalOperations()); assertEquals(lastSynced + 1, reader.totalOperations());
Translog.Snapshot snapshot = reader.newSnapshot();
for (int op = 0; op < translogOperations; op++) { for (int op = 0; op < translogOperations; op++) {
Translog.Location location = locations.get(op);
if (op <= lastSynced) { if (op <= lastSynced) {
final Translog.Operation read = reader.read(location); final Translog.Operation read = snapshot.next();
assertEquals(Integer.toString(op), read.getSource().source.utf8ToString()); assertEquals(Integer.toString(op), read.getSource().source.utf8ToString());
} else { } else {
try { Translog.Operation next = snapshot.next();
reader.read(location); assertNull(next);
fail("read past checkpoint");
} catch (EOFException ex) {
}
} }
} }
try { Translog.Operation next = snapshot.next();
reader.read(lastLocation); assertNull(next);
fail("read past checkpoint");
} catch (EOFException ex) {
}
} }
assertEquals(translogOperations + 1, translog.totalOperations()); assertEquals(translogOperations + 1, translog.totalOperations());
translog.close(); translog.close();
@ -1618,11 +1581,6 @@ public class TranslogTests extends ESTestCase {
} }
}; };
} }
@Override
protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException {
return true; // we don't wanna fail in the assert
}
}; };
} }