Refactor the Translog.read(Location) method

It was only used by `readSource`, it has been changed to return a
Translog.Operation, which can have .getSource() called on it to return
the source. `readSource` has been removed.

This also removes the checked IOException, any exception thrown is
unexpected and should throw a runtime exception.

Moves the ReleasableBytesStreamOutput allocation into the body of the
try-catch block so the lock can be released in the event of an exception
during allocation.
This commit is contained in:
Lee Hinman 2014-09-18 12:22:31 +02:00
parent 997b94b427
commit 168b3752ef
4 changed files with 33 additions and 57 deletions

View File

@ -358,13 +358,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (!get.loadSource()) {
return new GetResult(true, versionValue.version(), null);
}
try {
Translog.Source source = translog.readSource(versionValue.translogLocation());
if (source != null) {
return new GetResult(true, versionValue.version(), source);
}
} catch (IOException e) {
// switched on us, read it from the reader
Translog.Operation op = translog.read(versionValue.translogLocation());
if (op != null) {
return new GetResult(true, versionValue.version(), op.getSource());
}
}
}

View File

@ -103,9 +103,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
*/
Location add(Operation operation) throws TranslogException;
byte[] read(Location location);
Translog.Source readSource(Location location) throws IOException;
Translog.Operation read(Location location);
/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
@ -154,6 +152,11 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_LONG + RamUsageEstimator.NUM_BYTES_INT;
}
@Override
public String toString() {
return "[id: " + translogId + ", location: " + translogLocation + ", size: " + size + "]";
}
}
/**

View File

@ -328,51 +328,37 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
/**
* Private read method that reads from either the transient translog (if
* applicable), or the current translog. Acquires the read lock
* before reading.
* @return byte array of read data
*/
public byte[] read(Location location) {
rwl.readLock().lock();
try {
FsTranslogFile trans = translogForLocation(location);
if (trans != null) {
try {
return trans.read(location);
} catch (Exception e) {
// ignore
}
}
return null;
} finally {
rwl.readLock().unlock();
}
}
/**
* Read the Source object from the given location, returns null if the
* source could not be read.
* Read the Operation object from the given location, returns null if the
* Operation could not be read.
*/
@Override
public Source readSource(Location location) throws IOException {
byte[] data = this.read(location);
if (data == null) {
return null;
}
// Return the source using the current version of the stream based on
// which translog is being read
public Translog.Operation read(Location location) {
rwl.readLock().lock();
try {
FsTranslogFile translog = translogForLocation(location);
if (translog != null) {
byte[] data = translog.read(location);
try (BytesStreamInput in = new BytesStreamInput(data, false)) {
return this.translogForLocation(location).getStream().read(in).getSource();
// Return the Operation using the current version of the
// stream based on which translog is being read
return translog.getStream().read(in);
}
}
return null;
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from traslog location " + location, e);
} finally {
rwl.readLock().unlock();
}
}
@Override
public Location add(Operation operation) throws TranslogException {
rwl.readLock().lock();
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
boolean released = false;
ReleasableBytesStreamOutput out = null;
try {
out = new ReleasableBytesStreamOutput(bigArrays);
TranslogStreams.writeTranslogOperation(out, operation);
ReleasableBytesReference bytes = out.bytes();
Location location = current.add(bytes);
@ -397,7 +383,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
rwl.readLock().unlock();
if (!released) {
if (!released && out != null) {
Releasables.close(out.bytes());
}
}

View File

@ -23,8 +23,6 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
@ -74,24 +72,19 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
protected abstract String translogFileDirectory();
private static Translog.Source readSource(byte[] bytes) throws IOException {
BytesStreamInput in = new BytesStreamInput(bytes, false);
return TranslogStreams.readTranslogOperation(in).getSource();
}
@Test
public void testRead() throws IOException {
Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1}));
Translog.Location loc2 = translog.add(new Translog.Create("test", "2", new byte[]{2}));
assertThat(readSource(translog.read(loc1)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(readSource(translog.read(loc2)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
translog.sync();
assertThat(translog.readSource(loc1).source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.readSource(loc2).source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
Translog.Location loc3 = translog.add(new Translog.Create("test", "2", new byte[]{3}));
assertThat(readSource(translog.read(loc3)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
translog.sync();
assertThat(translog.readSource(loc3).source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
}
@Test
@ -368,9 +361,7 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
}
for (LocationOperation locationOperation : writtenOperations) {
byte[] data = translog.read(locationOperation.location);
StreamInput streamInput = new BytesStreamInput(data, false);
Translog.Operation op = TranslogStreams.readTranslogOperation(streamInput);
Translog.Operation op = translog.read(locationOperation.location);
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
@ -434,7 +425,7 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
AtomicInteger corruptionsCaught = new AtomicInteger(0);
for (Translog.Location location : locations) {
try {
readSource(translog.read(location));
translog.read(location);
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
}