diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 381d9544a16..a0151264e82 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -358,13 +358,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin if (!get.loadSource()) { return new GetResult(true, versionValue.version(), null); } - try { - Translog.Source source = translog.readSource(versionValue.translogLocation()); - if (source != null) { - return new GetResult(true, versionValue.version(), source); - } - } catch (IOException e) { - // switched on us, read it from the reader + Translog.Operation op = translog.read(versionValue.translogLocation()); + if (op != null) { + return new GetResult(true, versionValue.version(), op.getSource()); } } } diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 5e513077ffd..18143d6b42a 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -103,9 +103,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent, */ Location add(Operation operation) throws TranslogException; - byte[] read(Location location); - - Translog.Source readSource(Location location) throws IOException; + Translog.Operation read(Location location); /** * Snapshots the current transaction log allowing to safely iterate over the snapshot. @@ -154,6 +152,11 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent, public long ramBytesUsed() { return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_LONG + RamUsageEstimator.NUM_BYTES_INT; } + + @Override + public String toString() { + return "[id: " + translogId + ", location: " + translogLocation + ", size: " + size + "]"; + } } /** diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 8f09b588b98..d15e23cf846 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -328,51 +328,37 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } /** - * Private read method that reads from either the transient translog (if - * applicable), or the current translog. Acquires the read lock - * before reading. - * @return byte array of read data + * Read the Operation object from the given location, returns null if the + * Operation could not be read. */ - public byte[] read(Location location) { + @Override + public Translog.Operation read(Location location) { rwl.readLock().lock(); try { - FsTranslogFile trans = translogForLocation(location); - if (trans != null) { - try { - return trans.read(location); - } catch (Exception e) { - // ignore + FsTranslogFile translog = translogForLocation(location); + if (translog != null) { + byte[] data = translog.read(location); + try (BytesStreamInput in = new BytesStreamInput(data, false)) { + // Return the Operation using the current version of the + // stream based on which translog is being read + return translog.getStream().read(in); } } return null; + } catch (IOException e) { + throw new ElasticsearchException("failed to read source from traslog location " + location, e); } finally { rwl.readLock().unlock(); } } - /** - * Read the Source object from the given location, returns null if the - * source could not be read. - */ - @Override - public Source readSource(Location location) throws IOException { - byte[] data = this.read(location); - if (data == null) { - return null; - } - // Return the source using the current version of the stream based on - // which translog is being read - try (BytesStreamInput in = new BytesStreamInput(data, false)) { - return this.translogForLocation(location).getStream().read(in).getSource(); - } - } - @Override public Location add(Operation operation) throws TranslogException { rwl.readLock().lock(); - ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); boolean released = false; + ReleasableBytesStreamOutput out = null; try { + out = new ReleasableBytesStreamOutput(bigArrays); TranslogStreams.writeTranslogOperation(out, operation); ReleasableBytesReference bytes = out.bytes(); Location location = current.add(bytes); @@ -397,7 +383,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { rwl.readLock().unlock(); - if (!released) { + if (!released && out != null) { Releasables.close(out.bytes()); } } diff --git a/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java index f8c258f9a46..3d530e36a97 100644 --- a/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java @@ -23,8 +23,6 @@ import org.apache.lucene.index.Term; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; @@ -74,24 +72,19 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase protected abstract String translogFileDirectory(); - private static Translog.Source readSource(byte[] bytes) throws IOException { - BytesStreamInput in = new BytesStreamInput(bytes, false); - return TranslogStreams.readTranslogOperation(in).getSource(); - } - @Test public void testRead() throws IOException { Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1})); Translog.Location loc2 = translog.add(new Translog.Create("test", "2", new byte[]{2})); - assertThat(readSource(translog.read(loc1)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{1}))); - assertThat(readSource(translog.read(loc2)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{2}))); + assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1}))); + assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2}))); translog.sync(); - assertThat(translog.readSource(loc1).source.toBytesArray(), equalTo(new BytesArray(new byte[]{1}))); - assertThat(translog.readSource(loc2).source.toBytesArray(), equalTo(new BytesArray(new byte[]{2}))); + assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1}))); + assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2}))); Translog.Location loc3 = translog.add(new Translog.Create("test", "2", new byte[]{3})); - assertThat(readSource(translog.read(loc3)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{3}))); + assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3}))); translog.sync(); - assertThat(translog.readSource(loc3).source.toBytesArray(), equalTo(new BytesArray(new byte[]{3}))); + assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3}))); } @Test @@ -368,9 +361,7 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase } for (LocationOperation locationOperation : writtenOperations) { - byte[] data = translog.read(locationOperation.location); - StreamInput streamInput = new BytesStreamInput(data, false); - Translog.Operation op = TranslogStreams.readTranslogOperation(streamInput); + Translog.Operation op = translog.read(locationOperation.location); Translog.Operation expectedOp = locationOperation.operation; assertEquals(expectedOp.opType(), op.opType()); switch (op.opType()) { @@ -434,7 +425,7 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase AtomicInteger corruptionsCaught = new AtomicInteger(0); for (Translog.Location location : locations) { try { - readSource(translog.read(location)); + translog.read(location); } catch (TranslogCorruptedException e) { corruptionsCaught.incrementAndGet(); }