Save Shard ID Serializations in Bulk Requests (#56209) (#58414)

Just like #56094 but for the request side.
Removes a lot of redundant `ShardId` instances from bulk shard requests as well as stops serializing index names when they're not needed because they're not different from what is in the shard id.

Even ignoring the index name serialization savings here, this change saves one `ShardId` instance per bulk shard request at least. This means it saves approximately:

* 8 bytes for the `ShardId` object (itself + one field)
   * + another 4 bytes for the `int` in the `ShardId`
* 16 bytes (two fields + the instance itself + the padding) for the `Index` object
   * + 30 bytes for the `Index` uuid string
   * + all the bytes in the index name string

=> 60+ bytes per bulk request item saved on heap and over the wire
This commit is contained in:
Armin Braun 2020-06-23 12:35:52 +02:00 committed by GitHub
parent 256b660f0a
commit 943efb78fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 203 additions and 36 deletions

View File

@ -22,10 +22,12 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Locale;
@ -216,16 +218,22 @@ public interface DocWriteRequest<T> extends IndicesRequest {
}
}
/** read a document write (index/delete/update) request */
static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException {
/**
* Read a document write (index/delete/update) request
*
* @param shardId shard id of the request. {@code null} when reading as part of a {@link org.elasticsearch.action.bulk.BulkRequest}
* that does not have a unique shard id or when reading from a stream of version older than
* {@link org.elasticsearch.action.bulk.BulkShardRequest#COMPACT_SHARD_ID_VERSION}
*/
static DocWriteRequest<?> readDocumentRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
byte type = in.readByte();
DocWriteRequest<?> docWriteRequest;
if (type == 0) {
docWriteRequest = new IndexRequest(in);
docWriteRequest = new IndexRequest(shardId, in);
} else if (type == 1) {
docWriteRequest = new DeleteRequest(in);
docWriteRequest = new DeleteRequest(shardId, in);
} else if (type == 2) {
docWriteRequest = new UpdateRequest(in);
docWriteRequest = new UpdateRequest(shardId, in);
} else {
throw new IllegalStateException("invalid request type [" + type+ " ]");
}
@ -248,6 +256,22 @@ public interface DocWriteRequest<T> extends IndicesRequest {
}
}
/** write a document write (index/delete/update) request without shard id*/
static void writeDocumentRequestThin(StreamOutput out, DocWriteRequest<?> request) throws IOException {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
((IndexRequest) request).writeThin(out);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
((DeleteRequest) request).writeThin(out);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
((UpdateRequest) request).writeThin(out);
} else {
throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]");
}
}
static ActionRequestValidationException validateSeqNoBasedCASParams(
DocWriteRequest request, ActionRequestValidationException validationException) {
final long version = request.version();

View File

@ -20,10 +20,12 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Objects;
@ -34,11 +36,19 @@ public class BulkItemRequest implements Writeable {
private DocWriteRequest<?> request;
private volatile BulkItemResponse primaryResponse;
BulkItemRequest(StreamInput in) throws IOException {
/**
* @param shardId {@code null} if reading from a stream before {@link BulkShardRequest#COMPACT_SHARD_ID_VERSION} to force BwC read
* that includes shard id
*/
BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
id = in.readVInt();
request = DocWriteRequest.readDocumentRequest(in);
request = DocWriteRequest.readDocumentRequest(shardId, in);
if (in.readBoolean()) {
primaryResponse = new BulkItemResponse(in);
if (shardId == null) {
primaryResponse = new BulkItemResponse(in);
} else {
primaryResponse = new BulkItemResponse(shardId, in);
}
}
}
@ -99,4 +109,10 @@ public class BulkItemRequest implements Writeable {
DocWriteRequest.writeDocumentRequest(out, request);
out.writeOptionalWriteable(primaryResponse);
}
public void writeThin(StreamOutput out) throws IOException {
out.writeVInt(id);
DocWriteRequest.writeDocumentRequestThin(out, request);
out.writeOptionalWriteable(primaryResponse == null ? null : primaryResponse::writeThin);
}
}

View File

@ -85,7 +85,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
requests.add(DocWriteRequest.readDocumentRequest(in));
requests.add(DocWriteRequest.readDocumentRequest(null, in));
}
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = in.readTimeValue();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
@ -31,14 +32,17 @@ import java.util.Set;
public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
public static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0;
private BulkItemRequest[] items;
public BulkShardRequest(StreamInput in) throws IOException {
super(in);
items = new BulkItemRequest[in.readVInt()];
final ShardId itemShardId = in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION) ? shardId : null;
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = new BulkItemRequest(in);
items[i] = new BulkItemRequest(itemShardId, in);
}
}
}
@ -74,12 +78,18 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(items.length);
for (BulkItemRequest item : items) {
if (item != null) {
out.writeBoolean(true);
item.writeTo(out);
} else {
out.writeBoolean(false);
if (out.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
for (BulkItemRequest item : items) {
if (item != null) {
out.writeBoolean(true);
item.writeThin(out);
} else {
out.writeBoolean(false);
}
}
} else {
for (BulkItemRequest item : items) {
out.writeOptionalWriteable(item);
}
}
}

View File

@ -66,7 +66,11 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public DeleteRequest(StreamInput in) throws IOException {
super(in);
this(null, in);
}
public DeleteRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
@ -301,8 +305,18 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
writeBody(out);
}
@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeThin(out);
writeBody(out);
}
private void writeBody(StreamOutput out) throws IOException {
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
out.writeString(type());
out.writeString(id);
out.writeOptionalString(routing());

View File

@ -119,7 +119,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public IndexRequest(StreamInput in) throws IOException {
super(in);
this(null, in);
}
public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
type = in.readOptionalString();
id = in.readOptionalString();
routing = in.readOptionalString();
@ -699,6 +703,17 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
public void writeTo(StreamOutput out) throws IOException {
checkAutoIdWithOpTypeCreateSupportedByVersion(out.getVersion());
super.writeTo(out);
writeBody(out);
}
@Override
public void writeThin(StreamOutput out) throws IOException {
checkAutoIdWithOpTypeCreateSupportedByVersion(out.getVersion());
super.writeThin(out);
writeBody(out);
}
private void writeBody(StreamOutput out) throws IOException {
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
out.writeOptionalString(type());

View File

@ -37,6 +37,14 @@ import java.io.IOException;
public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>> extends ReplicationRequest<R> implements WriteRequest<R> {
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
/**
* Constructor for thin deserialization.
*/
public ReplicatedWriteRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
refreshPolicy = RefreshPolicy.readFrom(in);
}
/**
* Constructor for deserialization.
*/
@ -66,4 +74,10 @@ public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>
super.writeTo(out);
refreshPolicy.writeTo(out);
}
@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeThin(out);
refreshPolicy.writeTo(out);
}
}

View File

@ -67,15 +67,28 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
private long routedBasedOnClusterVersion = 0;
public ReplicationRequest(StreamInput in) throws IOException {
this(null, in);
}
public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
shardId = new ShardId(in);
final boolean thinRead = shardId != null;
if (thinRead) {
this.shardId = shardId;
} else {
shardId = null;
}
this.shardId = in.readOptionalWriteable(ShardId::new);
}
waitForActiveShards = ActiveShardCount.readFrom(in);
timeout = in.readTimeValue();
index = in.readString();
if (thinRead) {
if (in.readBoolean()) {
index = in.readString();
} else {
index = shardId.getIndexName();
}
} else {
index = in.readString();
}
routedBasedOnClusterVersion = in.readVLong();
}
@ -197,6 +210,23 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
out.writeVLong(routedBasedOnClusterVersion);
}
/**
* Thin serialization that does not write {@link #shardId} and will only write {@link #index} if it is different from the index name in
* {@link #shardId}.
*/
public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
waitForActiveShards.writeTo(out);
out.writeTimeValue(timeout);
if (shardId != null && index.equals(shardId.getIndexName())) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(index);
}
out.writeVLong(routedBasedOnClusterVersion);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -32,6 +33,8 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
// TODO: This request and its associated transport action can be folded into UpdateRequest which is its only concrete production code
// implementation
public abstract class InstanceShardOperationRequest<Request extends InstanceShardOperationRequest<Request>> extends ActionRequest
implements IndicesRequest {
@ -48,13 +51,21 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
protected InstanceShardOperationRequest() {
}
protected InstanceShardOperationRequest(StreamInput in) throws IOException {
protected InstanceShardOperationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(in);
index = in.readString();
if (in.readBoolean()) {
shardId = new ShardId(in);
// Do a full read if no shard id is given (indicating that this instance isn't read as part of a BulkShardRequest or that `in` is of
// an older version) and is in the format used by #writeTo.
if (shardId == null) {
index = in.readString();
this.shardId = in.readOptionalWriteable(ShardId::new);
} else {
shardId = null;
// We know a shard id so we read the format given by #writeThin
this.shardId = shardId;
if (in.readBoolean()) {
index = in.readString();
} else {
index = shardId.getIndexName();
}
}
timeout = in.readTimeValue();
concreteIndex = in.readOptionalString();
@ -130,5 +141,16 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
out.writeOptionalString(concreteIndex);
}
public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
if (shardId != null && index.equals(shardId.getIndexName())) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(index);
}
out.writeTimeValue(timeout);
out.writeOptionalString(concreteIndex);
}
}

View File

@ -129,7 +129,11 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
public UpdateRequest() {}
public UpdateRequest(StreamInput in) throws IOException {
super(in);
this(null, in);
}
public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
waitForActiveShards = ActiveShardCount.readFrom(in);
type = in.readString();
id = in.readString();
@ -143,7 +147,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
retryOnConflict = in.readVInt();
refreshPolicy = RefreshPolicy.readFrom(in);
if (in.readBoolean()) {
doc = new IndexRequest(in);
doc = new IndexRequest(shardId, in);
}
if (in.getVersion().before(Version.V_7_0_0)) {
String[] fields = in.readOptionalStringArray();
@ -153,7 +157,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
}
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
if (in.readBoolean()) {
upsertRequest = new IndexRequest(in);
upsertRequest = new IndexRequest(shardId, in);
}
docAsUpsert = in.readBoolean();
if (in.getVersion().before(Version.V_7_0_0)) {
@ -872,6 +876,16 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
doWrite(out, false);
}
@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeThin(out);
doWrite(out, true);
}
private void doWrite(StreamOutput out, boolean thin) throws IOException {
waitForActiveShards.writeTo(out);
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
@ -897,7 +911,11 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
doc.index(index);
doc.type(type);
doc.id(id);
doc.writeTo(out);
if (thin) {
doc.writeThin(out);
} else {
doc.writeTo(out);
}
}
if (out.getVersion().before(Version.V_7_0_0)) {
out.writeOptionalStringArray(null);
@ -911,7 +929,11 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
upsertRequest.index(index);
upsertRequest.type(type);
upsertRequest.id(id);
upsertRequest.writeTo(out);
if (thin) {
upsertRequest.writeThin(out);
} else {
upsertRequest.writeTo(out);
}
}
out.writeBoolean(docAsUpsert);
if (out.getVersion().before(Version.V_7_0_0)) {

View File

@ -81,7 +81,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
super(null, in);
}
}