Cleanup translog operation serialization

We used to double write the translog operation which is not needed except
of for recovery. This commit cuts over to a big-array based temporary serialiation
and removes the crazy double writing.
This commit is contained in:
Simon Willnauer 2015-05-18 14:10:05 +02:00
parent 60b66a7235
commit 651c067fae
4 changed files with 57 additions and 56 deletions

View File

@ -70,4 +70,8 @@ public final class BufferedChecksumStreamOutput extends StreamOutput {
out.reset(); out.reset();
digest.reset(); digest.reset();
} }
public void resetDigest() {
digest.reset();
}
} }

View File

@ -450,7 +450,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public Location add(Operation operation) throws TranslogException { public Location add(Operation operation) throws TranslogException {
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try { try {
writeOperation(out, operation); final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
final long start = out.position();
out.skip(RamUsageEstimator.NUM_BYTES_INT);
writeOperationNoSize(checksumStreamOutput, operation);
long end = out.position();
int operationSize = (int) (out.position() - RamUsageEstimator.NUM_BYTES_INT - start);
out.seek(start);
out.writeInt(operationSize);
out.seek(end);
ReleasablePagedBytesReference bytes = out.bytes(); ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) { try (ReleasableLock lock = readLock.acquire()) {
Location location = current.add(bytes); Location location = current.add(bytes);
@ -1546,29 +1554,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
public static Snapshot snapshotFromStream(StreamInput input, final int numOps) { /**
* Reads a list of operations written with {@link #writeOperations(StreamOutput, List)}
*/
public static List<Operation> readOperations(StreamInput input) throws IOException {
ArrayList<Operation> operations = new ArrayList<>();
int numOps = input.readInt();
final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input); final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input);
return new Snapshot() { for (int i = 0; i < numOps; i++) {
int read = 0; operations.add(readOperation(checksumStreamInput));
@Override }
public int estimatedTotalOperations() { return operations;
return numOps;
}
@Override
public Operation next() throws IOException {
if (read < numOps) {
read++;
return readOperation(checksumStreamInput);
}
return null;
}
@Override
public void close() {
// doNothing
}
};
} }
static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException { static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException {
@ -1601,24 +1597,39 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return operation; return operation;
} }
public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException { /**
//TODO lets get rid of this crazy double writing here. * Writes all operations in the given iterable to the given output stream including the size of the array
* use {@link #readOperations(StreamInput)} to read it back.
*/
public static void writeOperations(StreamOutput outStream, List<Operation> toWrite) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(BigArrays.NON_RECYCLING_INSTANCE);
try {
outStream.writeInt(toWrite.size());
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
for (Operation op : toWrite) {
out.reset();
final long start = out.position();
out.skip(RamUsageEstimator.NUM_BYTES_INT);
writeOperationNoSize(checksumStreamOutput, op);
long end = out.position();
int operationSize = (int) (out.position() - RamUsageEstimator.NUM_BYTES_INT - start);
out.seek(start);
out.writeInt(operationSize);
out.seek(end);
ReleasablePagedBytesReference bytes = out.bytes();
bytes.writeTo(outStream);
}
} finally {
Releasables.close(out.bytes());
}
// We first write to a NoopStreamOutput to get the size of the }
// operation. We could write to a byte array and then send that as an
// alternative, but here we choose to use CPU over allocating new
// byte arrays.
NoopStreamOutput noopOut = new NoopStreamOutput();
noopOut.writeByte(op.opType().id());
op.writeTo(noopOut);
noopOut.writeInt(0); // checksum holder
int size = noopOut.getCount();
public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Translog.Operation op) throws IOException {
// This BufferedChecksumStreamOutput remains unclosed on purpose, // This BufferedChecksumStreamOutput remains unclosed on purpose,
// because closing it closes the underlying stream, which we don't // because closing it closes the underlying stream, which we don't
// want to do here. // want to do here.
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(outStream); out.resetDigest();
outStream.writeInt(size); // opSize is not checksummed
out.writeByte(op.opType().id()); out.writeByte(op.opType().id());
op.writeTo(out); op.writeTo(out);
long checksum = out.getChecksum(); long checksum = out.getChecksum();

View File

@ -70,13 +70,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
super.readFrom(in); super.readFrom(in);
recoveryId = in.readLong(); recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
int size = in.readVInt(); operations = Translog.readOperations(in);
operations = Lists.newArrayListWithExpectedSize(size);
Translog.Snapshot snapshot = Translog.snapshotFromStream(in, size);
Translog.Operation next = null;
while((next = snapshot.next()) != null) {
operations.add(next);
}
totalTranslogOps = in.readVInt(); totalTranslogOps = in.readVInt();
} }
@ -85,10 +79,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
super.writeTo(out); super.writeTo(out);
out.writeLong(recoveryId); out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
out.writeVInt(operations.size()); Translog.writeOperations(out, operations);
for (Translog.Operation operation : operations) {
Translog.writeOperation(out, operation);
}
out.writeVInt(totalTranslogOps); out.writeVInt(totalTranslogOps);
} }
} }

View File

@ -1021,23 +1021,18 @@ public class TranslogTests extends ElasticsearchTestCase {
} }
public void testSnapshotFromStreamInput() throws IOException { public void testSnapshotFromStreamInput() throws IOException {
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
List<Translog.Operation> ops = newArrayList(); List<Translog.Operation> ops = newArrayList();
int translogOperations = randomIntBetween(10, 100); int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) { for (int op = 0; op < translogOperations; op++) {
Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))); Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
Translog.writeOperation(out, test);
ops.add(test); ops.add(test);
} }
Translog.Snapshot snapshot = Translog.snapshotFromStream(StreamInput.wrap(out.bytes()), ops.size()); Translog.writeOperations(out, ops);
assertEquals(ops.size(), snapshot.estimatedTotalOperations()); final List<Translog.Operation> readOperations = Translog.readOperations(StreamInput.wrap(out.bytes()));
for (Translog.Operation op : ops) { assertEquals(ops.size(), readOperations.size());
assertEquals(op, snapshot.next()); assertEquals(ops, readOperations);
}
assertNull(snapshot.next());
// no need to close
} }
public void testLocationHashCodeEquals() throws IOException { public void testLocationHashCodeEquals() throws IOException {