diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 49e8249e9d3..bf61febb741 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -58,34 +58,21 @@ public abstract class BaseTranslogReader implements Comparable= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]"; - try { - reusableBuffer.clear(); - reusableBuffer.limit(4); - readBytes(reusableBuffer, position); - reusableBuffer.flip(); - // Add an extra 4 to account for the operation size integer itself - final int size = reusableBuffer.getInt() + 4; - final long maxSize = sizeInBytes() - position; - if (size < 0 || size > maxSize) { - throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size); - } - - return size; - } catch (IOException e) { - throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e); + reusableBuffer.clear(); + reusableBuffer.limit(4); + readBytes(reusableBuffer, position); + reusableBuffer.flip(); + // Add an extra 4 to account for the operation size integer itself + final int size = reusableBuffer.getInt() + 4; + final long maxSize = sizeInBytes() - position; + if (size < 0 || size > maxSize) { + throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size); } + return size; } public Translog.Snapshot newSnapshot() { diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 198d9b4cd45..056716a29bd 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -384,31 +384,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return newFile; } - - /** - * Read the Operation object from the given location. This method will try to read the given location from - * the current or from the currently committing translog file. If the location is in a file that has already - * been closed or even removed the method will return null instead. - */ - Translog.Operation read(Location location) { // TODO this is only here for testing - we can remove it? - try (ReleasableLock lock = readLock.acquire()) { - final BaseTranslogReader reader; - final long currentGeneration = current.getGeneration(); - if (currentGeneration == location.generation) { - reader = current; - } else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) { - reader = readers.get(readers.size() - 1); - } else if (currentGeneration < location.generation) { - throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]"); - } else { - return null; - } - return reader.read(location); - } catch (IOException e) { - throw new ElasticsearchException("failed to read source from translog location " + location, e); - } - } - /** * Adds a delete / index operations to the transaction log. * @@ -432,7 +407,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); Location location = current.add(bytes); - assert assertBytesAtLocation(location, bytes); return location; } } catch (AlreadyClosedException | IOException ex) { @@ -469,12 +443,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException { - // tests can override this - ByteBuffer buffer = ByteBuffer.allocate(location.size); - current.readBytes(buffer, location.translogLocation); - return new BytesArray(buffer.array()).equals(expectedBytes); - } /** * Snapshots the current transaction log allowing to safely iterate over the snapshot. diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index f33ec1bd607..a08259ef32d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; -public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot { +final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot { private final int totalOperations; protected final long length; @@ -51,7 +51,7 @@ public class TranslogSnapshot extends BaseTranslogReader implements Translog.Sna } @Override - public final int totalOperations() { + public int totalOperations() { return totalOperations; } @@ -64,7 +64,7 @@ public class TranslogSnapshot extends BaseTranslogReader implements Translog.Sna } } - protected final Translog.Operation readOperation() throws IOException { + protected Translog.Operation readOperation() throws IOException { final int opSize = readSize(reusableBuffer, position); reuse = checksummedStream(reusableBuffer, position, opSize, reuse); Translog.Operation op = read(reuse); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 9961637c323..cb76763363e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -85,6 +85,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -206,53 +207,6 @@ public class TranslogTests extends ESTestCase { return string; } - public void testRead() throws IOException { - Location loc0 = translog.getLastWriteLocation(); - assertNotNull(loc0); - - Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1})); - assertThat(loc1, greaterThan(loc0)); - assertThat(translog.getLastWriteLocation(), greaterThan(loc1)); - Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2})); - assertThat(loc2, greaterThan(loc1)); - assertThat(translog.getLastWriteLocation(), greaterThan(loc2)); - assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1}))); - assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2}))); - - Translog.Location lastLocBeforeSync = translog.getLastWriteLocation(); - translog.sync(); - assertEquals(lastLocBeforeSync, translog.getLastWriteLocation()); - assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1}))); - assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2}))); - - Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3})); - assertThat(loc3, greaterThan(loc2)); - assertThat(translog.getLastWriteLocation(), greaterThan(loc3)); - assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3}))); - - lastLocBeforeSync = translog.getLastWriteLocation(); - translog.sync(); - assertEquals(lastLocBeforeSync, translog.getLastWriteLocation()); - assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3}))); - translog.prepareCommit(); - /* - * The commit adds to the lastWriteLocation even though is isn't really a write. This is just an implementation artifact but it can - * safely be ignored because the lastWriteLocation continues to be greater than the Location returned from the last write operation - * and less than the location of the next write operation. - */ - assertThat(translog.getLastWriteLocation(), greaterThan(lastLocBeforeSync)); - assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3}))); - translog.commit(); - assertNull(translog.read(loc1)); - assertNull(translog.read(loc2)); - assertNull(translog.read(loc3)); - try { - translog.read(new Translog.Location(translog.currentFileGeneration() + 1, 17, 35)); - fail("generation is greater than the current"); - } catch (IllegalStateException ex) { - // expected - } - } public void testSimpleOperations() throws IOException { ArrayList ops = new ArrayList<>(); @@ -441,7 +395,7 @@ public class TranslogTests extends ESTestCase { assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id)))); } - static class LocationOperation { + static class LocationOperation implements Comparable { final Translog.Operation operation; final Translog.Location location; @@ -450,6 +404,10 @@ public class TranslogTests extends ESTestCase { this.location = location; } + @Override + public int compareTo(LocationOperation o) { + return location.compareTo(o.location); + } } public void testConcurrentWritesWithVaryingSize() throws Throwable { @@ -478,8 +436,12 @@ public class TranslogTests extends ESTestCase { threads[i].join(60 * 1000); } - for (LocationOperation locationOperation : writtenOperations) { - Translog.Operation op = translog.read(locationOperation.location); + List collect = writtenOperations.stream().collect(Collectors.toList()); + Collections.sort(collect); + Translog.Snapshot snapshot = translog.newSnapshot(); + for (LocationOperation locationOperation : collect) { + Translog.Operation op = snapshot.next(); + assertNotNull(op); Translog.Operation expectedOp = locationOperation.operation; assertEquals(expectedOp.opType(), op.opType()); switch (op.opType()) { @@ -505,6 +467,7 @@ public class TranslogTests extends ESTestCase { } } + assertNull(snapshot.next()); } @@ -521,13 +484,16 @@ public class TranslogTests extends ESTestCase { corruptTranslogs(translogDir); AtomicInteger corruptionsCaught = new AtomicInteger(0); + Translog.Snapshot snapshot = translog.newSnapshot(); for (Translog.Location location : locations) { try { - translog.read(location); + Translog.Operation next = snapshot.next(); + assertNotNull(next); } catch (TranslogCorruptedException e) { corruptionsCaught.incrementAndGet(); } } + expectThrows(TranslogCorruptedException.class, () -> snapshot.next()); assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1)); } @@ -544,15 +510,12 @@ public class TranslogTests extends ESTestCase { truncateTranslogs(translogDir); AtomicInteger truncations = new AtomicInteger(0); + Translog.Snapshot snap = translog.newSnapshot(); for (Translog.Location location : locations) { try { - translog.read(location); - } catch (ElasticsearchException e) { - if (e.getCause() instanceof EOFException) { - truncations.incrementAndGet(); - } else { - throw e; - } + assertNotNull(snap.next()); + } catch (EOFException e) { + truncations.incrementAndGet(); } } assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1)); @@ -860,8 +823,14 @@ public class TranslogTests extends ESTestCase { } assertEquals(max.generation, translog.currentFileGeneration()); - final Translog.Operation read = translog.read(max); - assertEquals(read.getSource().source.utf8ToString(), Integer.toString(count)); + Translog.Snapshot snap = translog.newSnapshot(); + Translog.Operation next; + Translog.Operation maxOp = null; + while ((next = snap.next()) != null) { + maxOp = next; + } + assertNotNull(maxOp); + assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count)); } public static Translog.Location max(Translog.Location a, Translog.Location b) { @@ -884,30 +853,24 @@ public class TranslogTests extends ESTestCase { } } assertEquals(translogOperations, translog.totalOperations()); - final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)); try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { assertEquals(lastSynced + 1, reader.totalOperations()); + Translog.Snapshot snapshot = reader.newSnapshot(); + for (int op = 0; op < translogOperations; op++) { - Translog.Location location = locations.get(op); if (op <= lastSynced) { - final Translog.Operation read = reader.read(location); + final Translog.Operation read = snapshot.next(); assertEquals(Integer.toString(op), read.getSource().source.utf8ToString()); } else { - try { - reader.read(location); - fail("read past checkpoint"); - } catch (EOFException ex) { - - } + Translog.Operation next = snapshot.next(); + assertNull(next); } } - try { - reader.read(lastLocation); - fail("read past checkpoint"); - } catch (EOFException ex) { - } + Translog.Operation next = snapshot.next(); + assertNull(next); } assertEquals(translogOperations + 1, translog.totalOperations()); translog.close(); @@ -1618,11 +1581,6 @@ public class TranslogTests extends ESTestCase { } }; } - - @Override - protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException { - return true; // we don't wanna fail in the assert - } }; }