diff --git a/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java index 3c3fbf1ecbb..b99a6da5f22 100644 --- a/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java +++ b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java @@ -70,4 +70,8 @@ public final class BufferedChecksumStreamOutput extends StreamOutput { out.reset(); digest.reset(); } + + public void resetDigest() { + digest.reset(); + } } diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 3323a71006a..aca4e61f6ed 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -450,7 +450,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public Location add(Operation operation) throws TranslogException { ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { - writeOperation(out, operation); + final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); + final long start = out.position(); + out.skip(RamUsageEstimator.NUM_BYTES_INT); + writeOperationNoSize(checksumStreamOutput, operation); + long end = out.position(); + int operationSize = (int) (out.position() - RamUsageEstimator.NUM_BYTES_INT - start); + out.seek(start); + out.writeInt(operationSize); + out.seek(end); ReleasablePagedBytesReference bytes = out.bytes(); try (ReleasableLock lock = readLock.acquire()) { Location location = current.add(bytes); @@ -1546,29 +1554,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - public static Snapshot snapshotFromStream(StreamInput input, final int numOps) { + /** + * Reads a list of operations written with {@link #writeOperations(StreamOutput, List)} + */ + public static List readOperations(StreamInput input) throws IOException { + ArrayList operations = new ArrayList<>(); + int numOps = input.readInt(); final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input); - return new Snapshot() { - int read = 0; - @Override - public int estimatedTotalOperations() { - return numOps; - } - - @Override - public Operation next() throws IOException { - if (read < numOps) { - read++; - return readOperation(checksumStreamInput); - } - return null; - } - - @Override - public void close() { - // doNothing - } - }; + for (int i = 0; i < numOps; i++) { + operations.add(readOperation(checksumStreamInput)); + } + return operations; } static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException { @@ -1601,24 +1597,39 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return operation; } - public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException { - //TODO lets get rid of this crazy double writing here. + /** + * Writes all operations in the given iterable to the given output stream including the size of the array + * use {@link #readOperations(StreamInput)} to read it back. + */ + public static void writeOperations(StreamOutput outStream, List toWrite) throws IOException { + final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(BigArrays.NON_RECYCLING_INSTANCE); + try { + outStream.writeInt(toWrite.size()); + final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); + for (Operation op : toWrite) { + out.reset(); + final long start = out.position(); + out.skip(RamUsageEstimator.NUM_BYTES_INT); + writeOperationNoSize(checksumStreamOutput, op); + long end = out.position(); + int operationSize = (int) (out.position() - RamUsageEstimator.NUM_BYTES_INT - start); + out.seek(start); + out.writeInt(operationSize); + out.seek(end); + ReleasablePagedBytesReference bytes = out.bytes(); + bytes.writeTo(outStream); + } + } finally { + Releasables.close(out.bytes()); + } - // We first write to a NoopStreamOutput to get the size of the - // operation. We could write to a byte array and then send that as an - // alternative, but here we choose to use CPU over allocating new - // byte arrays. - NoopStreamOutput noopOut = new NoopStreamOutput(); - noopOut.writeByte(op.opType().id()); - op.writeTo(noopOut); - noopOut.writeInt(0); // checksum holder - int size = noopOut.getCount(); + } + public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Translog.Operation op) throws IOException { // This BufferedChecksumStreamOutput remains unclosed on purpose, // because closing it closes the underlying stream, which we don't // want to do here. - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(outStream); - outStream.writeInt(size); // opSize is not checksummed + out.resetDigest(); out.writeByte(op.opType().id()); op.writeTo(out); long checksum = out.getChecksum(); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 8e377c729c7..b320c98568a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -70,13 +70,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); - int size = in.readVInt(); - operations = Lists.newArrayListWithExpectedSize(size); - Translog.Snapshot snapshot = Translog.snapshotFromStream(in, size); - Translog.Operation next = null; - while((next = snapshot.next()) != null) { - operations.add(next); - } + operations = Translog.readOperations(in); totalTranslogOps = in.readVInt(); } @@ -85,10 +79,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { super.writeTo(out); out.writeLong(recoveryId); shardId.writeTo(out); - out.writeVInt(operations.size()); - for (Translog.Operation operation : operations) { - Translog.writeOperation(out, operation); - } + Translog.writeOperations(out, operations); out.writeVInt(totalTranslogOps); } } diff --git a/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 18c53ecf16b..e1068d9e0ab 100644 --- a/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1021,23 +1021,18 @@ public class TranslogTests extends ElasticsearchTestCase { } - public void testSnapshotFromStreamInput() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); List ops = newArrayList(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))); - Translog.writeOperation(out, test); ops.add(test); } - Translog.Snapshot snapshot = Translog.snapshotFromStream(StreamInput.wrap(out.bytes()), ops.size()); - assertEquals(ops.size(), snapshot.estimatedTotalOperations()); - for (Translog.Operation op : ops) { - assertEquals(op, snapshot.next()); - } - assertNull(snapshot.next()); - // no need to close + Translog.writeOperations(out, ops); + final List readOperations = Translog.readOperations(StreamInput.wrap(out.bytes())); + assertEquals(ops.size(), readOperations.size()); + assertEquals(ops, readOperations); } public void testLocationHashCodeEquals() throws IOException {