Use Writeable for TransportReplAction derivatives (#40905)

Relates #34389, backport of #40894.
This commit is contained in:
David Turner 2019-04-05 19:10:10 +01:00 committed by GitHub
parent a8dbb07546
commit 2ff19bc1b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 297 additions and 288 deletions

View File

@ -222,13 +222,9 @@ public interface DocWriteRequest<T> extends IndicesRequest {
byte type = in.readByte(); byte type = in.readByte();
DocWriteRequest<?> docWriteRequest; DocWriteRequest<?> docWriteRequest;
if (type == 0) { if (type == 0) {
IndexRequest indexRequest = new IndexRequest(); docWriteRequest = new IndexRequest(in);
indexRequest.readFrom(in);
docWriteRequest = indexRequest;
} else if (type == 1) { } else if (type == 1) {
DeleteRequest deleteRequest = new DeleteRequest(); docWriteRequest = new DeleteRequest(in);
deleteRequest.readFrom(in);
docWriteRequest = deleteRequest;
} else if (type == 2) { } else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest(); UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in); updateRequest.readFrom(in);

View File

@ -136,9 +136,11 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
public static class ShardRequest extends ReplicationRequest<ShardRequest> { public static class ShardRequest extends ReplicationRequest<ShardRequest> {
private ClusterBlock clusterBlock; private final ClusterBlock clusterBlock;
ShardRequest(){ ShardRequest(StreamInput in) throws IOException {
super(in);
clusterBlock = new ClusterBlock(in);
} }
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
@ -153,9 +155,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
} }
@Override @Override
public void readFrom(final StreamInput in) throws IOException { public void readFrom(final StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
clusterBlock = new ClusterBlock(in);
} }
@Override @Override

View File

@ -52,6 +52,12 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
super(indices); super(indices);
} }
public FlushRequest(StreamInput in) throws IOException {
super(in);
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
}
/** /**
* Returns {@code true} iff a flush should block * Returns {@code true} iff a flush should block
* if a another flush operation is already running. Otherwise {@code false} * if a another flush operation is already running. Otherwise {@code false}
@ -103,9 +109,7 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
} }
@Override @Override

View File

@ -29,7 +29,7 @@ import java.io.IOException;
public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> { public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
private FlushRequest request = new FlushRequest(); private final FlushRequest request;
public ShardFlushRequest(FlushRequest request, ShardId shardId) { public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId); super(shardId);
@ -37,7 +37,9 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
} }
public ShardFlushRequest() { public ShardFlushRequest(StreamInput in) throws IOException {
super(in);
request = new FlushRequest(in);
} }
FlushRequest getRequest() { FlushRequest getRequest() {
@ -46,8 +48,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
request.readFrom(in);
} }
@Override @Override

View File

@ -55,7 +55,7 @@ public class TransportShardFlushAction
IndexShard primary) { IndexShard primary) {
primary.flush(shardRequest.getRequest()); primary.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", primary.shardId()); logger.trace("{} flush request executed on primary", primary.shardId());
return new PrimaryResult<ShardFlushRequest, ReplicationResponse>(shardRequest, new ReplicationResponse()); return new PrimaryResult<>(shardRequest, new ReplicationResponse());
} }
@Override @Override

View File

@ -20,6 +20,9 @@
package org.elasticsearch.action.admin.indices.refresh; package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.support.broadcast.BroadcastRequest; import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/** /**
* A refresh request making all operations performed since the last refresh available for search. The (near) real-time * A refresh request making all operations performed since the last refresh available for search. The (near) real-time
@ -35,4 +38,8 @@ public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
public RefreshRequest(String... indices) { public RefreshRequest(String... indices) {
super(indices); super(indices);
} }
public RefreshRequest(StreamInput in) throws IOException {
super(in);
}
} }

View File

@ -33,7 +33,14 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
private BulkItemRequest[] items; private BulkItemRequest[] items;
public BulkShardRequest() { public BulkShardRequest(StreamInput in) throws IOException {
super(in);
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
} }
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
@ -60,7 +67,7 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
indices.add(item.index()); indices.add(item.index());
} }
} }
return indices.toArray(new String[indices.size()]); return indices.toArray(new String[0]);
} }
@Override @Override
@ -78,14 +85,8 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
} }
@Override @Override

View File

@ -28,11 +28,10 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
/** use transport bulk action directly */ /** use transport bulk action directly */
@Deprecated @Deprecated
public abstract class TransportSingleItemBulkWriteAction< public abstract class TransportSingleItemBulkWriteAction<
@ -43,8 +42,8 @@ public abstract class TransportSingleItemBulkWriteAction<
private final TransportBulkAction bulkAction; private final TransportBulkAction bulkAction;
protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters, protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
Supplier<Request> request, TransportBulkAction bulkAction) { Writeable.Reader<Request> requestReader, TransportBulkAction bulkAction) {
super(actionName, transportService, actionFilters, request); super(actionName, transportService, actionFilters, requestReader);
this.bulkAction = bulkAction; this.bulkAction = bulkAction;
} }

View File

@ -53,6 +53,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest { implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
private static final ShardId NO_SHARD_ID = null;
// Set to null initially so we can know to override in bulk requests that have a default type. // Set to null initially so we can know to override in bulk requests that have a default type.
private String type; private String type;
private String id; private String id;
@ -63,7 +65,27 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public DeleteRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
}
public DeleteRequest() { public DeleteRequest() {
super(NO_SHARD_ID);
} }
/** /**
@ -71,6 +93,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
* must be set. * must be set.
*/ */
public DeleteRequest(String index) { public DeleteRequest(String index) {
super(NO_SHARD_ID);
this.index = index; this.index = index;
} }
@ -85,6 +108,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
*/ */
@Deprecated @Deprecated
public DeleteRequest(String index, String type, String id) { public DeleteRequest(String index, String type, String id) {
super(NO_SHARD_ID);
this.index = index; this.index = index;
this.type = type; this.type = type;
this.id = id; this.id = id;
@ -97,6 +121,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
* @param id The id of the document * @param id The id of the document
*/ */
public DeleteRequest(String index, String id) { public DeleteRequest(String index, String id) {
super(NO_SHARD_ID);
this.index = index; this.index = index;
this.id = id; this.id = id;
} }
@ -274,23 +299,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
} }
@Override @Override
@ -321,14 +331,4 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
public String toString() { public String toString() {
return "delete {[" + index + "][" + type() + "][" + id + "]}"; return "delete {[" + index + "][" + type() + "][" + id + "]}";
} }
/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public DeleteRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on DeleteRequest");
}
} }

View File

@ -83,6 +83,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
*/ */
static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048; static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;
private static final ShardId NO_SHARD_ID = null;
// Set to null initially so we can know to override in bulk requests that have a default type. // Set to null initially so we can know to override in bulk requests that have a default type.
private String type; private String type;
private String id; private String id;
@ -112,8 +114,41 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public IndexRequest(StreamInput in) throws IOException {
super(in);
type = in.readOptionalString();
id = in.readOptionalString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readOptionalString(); // timestamp
in.readOptionalTimeValue(); // ttl
}
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.readBoolean()) {
contentType = in.readEnum(XContentType.class);
} else {
contentType = null;
}
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
}
public IndexRequest() { public IndexRequest() {
super(NO_SHARD_ID);
} }
/** /**
@ -121,6 +156,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
* {@link #source(byte[], XContentType)} must be set. * {@link #source(byte[], XContentType)} must be set.
*/ */
public IndexRequest(String index) { public IndexRequest(String index) {
super(NO_SHARD_ID);
this.index = index; this.index = index;
} }
@ -131,6 +167,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
*/ */
@Deprecated @Deprecated
public IndexRequest(String index, String type) { public IndexRequest(String index, String type) {
super(NO_SHARD_ID);
this.index = index; this.index = index;
this.type = type; this.type = type;
} }
@ -146,6 +183,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
*/ */
@Deprecated @Deprecated
public IndexRequest(String index, String type, String id) { public IndexRequest(String index, String type, String id) {
super(NO_SHARD_ID);
this.index = index; this.index = index;
this.type = type; this.type = type;
this.id = id; this.id = id;
@ -593,37 +631,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
type = in.readOptionalString();
id = in.readOptionalString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readOptionalString(); // timestamp
in.readOptionalTimeValue(); // ttl
}
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.readBoolean()) {
contentType = in.readEnum(XContentType.class);
} else {
contentType = null;
}
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
} }
@Override @Override
@ -704,15 +713,4 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
public long getAutoGeneratedTimestamp() { public long getAutoGeneratedTimestamp() {
return autoGeneratedTimestamp; return autoGeneratedTimestamp;
} }
/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public IndexRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on IndexRequest");
}
} }

View File

@ -36,12 +36,32 @@ import java.util.Objects;
*/ */
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> { public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
private long trimAboveSeqNo; private final long trimAboveSeqNo;
private Translog.Operation[] operations; private final Translog.Operation[] operations;
private long maxSeenAutoIdTimestampOnPrimary; private final long maxSeenAutoIdTimestampOnPrimary;
ResyncReplicationRequest() { ResyncReplicationRequest(StreamInput in) throws IOException {
super(); super(in);
assert Version.CURRENT.major <= 7;
if (in.getVersion().equals(Version.V_6_0_0)) {
/*
* Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a
* byte indicating the type of the operation.
*/
// TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x.
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
trimAboveSeqNo = in.readZLong();
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
} }
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary, public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary,
@ -65,28 +85,8 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
} }
@Override @Override
public void readFrom(final StreamInput in) throws IOException { public void readFrom(final StreamInput in) {
assert Version.CURRENT.major <= 7; throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
if (in.getVersion().equals(Version.V_6_0_0)) {
/*
* Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a
* byte indicating the type of the operation.
*/
// TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x.
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
trimAboveSeqNo = in.readZLong();
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
} }
@Override @Override

View File

@ -36,6 +36,12 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
public BroadcastRequest() { public BroadcastRequest() {
} }
public BroadcastRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
protected BroadcastRequest(String[] indices) { protected BroadcastRequest(String[] indices) {
this.indices = indices; this.indices = indices;
} }

View File

@ -19,8 +19,11 @@
package org.elasticsearch.action.support.replication; package org.elasticsearch.action.support.replication;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/** /**
* A replication request that has no more information than ReplicationRequest. * A replication request that has no more information than ReplicationRequest.
* Unfortunately ReplicationRequest can't be declared as a type parameter * Unfortunately ReplicationRequest can't be declared as a type parameter
@ -28,9 +31,6 @@ import org.elasticsearch.index.shard.ShardId;
* instead. * instead.
*/ */
public class BasicReplicationRequest extends ReplicationRequest<BasicReplicationRequest> { public class BasicReplicationRequest extends ReplicationRequest<BasicReplicationRequest> {
public BasicReplicationRequest() {
}
/** /**
* Creates a new request with resolved shard id * Creates a new request with resolved shard id
*/ */
@ -38,6 +38,10 @@ public class BasicReplicationRequest extends ReplicationRequest<BasicReplication
super(shardId); super(shardId);
} }
public BasicReplicationRequest(StreamInput in) throws IOException {
super(in);
}
@Override @Override
public String toString() { public String toString() {
return "BasicReplicationRequest{" + shardId + "}"; return "BasicReplicationRequest{" + shardId + "}";

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -39,10 +40,12 @@ public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>
/** /**
* Constructor for deserialization. * Constructor for deserialization.
*/ */
public ReplicatedWriteRequest() { public ReplicatedWriteRequest(StreamInput in) throws IOException {
super(in);
refreshPolicy = RefreshPolicy.readFrom(in);
} }
public ReplicatedWriteRequest(ShardId shardId) { public ReplicatedWriteRequest(@Nullable ShardId shardId) {
super(shardId); super(shardId);
} }
@ -59,9 +62,8 @@ public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
refreshPolicy = RefreshPolicy.readFrom(in);
} }
@Override @Override

View File

@ -54,9 +54,9 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
* shard id gets resolved by the transport action before performing request operation * shard id gets resolved by the transport action before performing request operation
* and at request creation time for shard-level bulk, refresh and flush requests. * and at request creation time for shard-level bulk, refresh and flush requests.
*/ */
protected ShardId shardId; protected final ShardId shardId;
protected TimeValue timeout = DEFAULT_TIMEOUT; protected TimeValue timeout;
protected String index; protected String index;
/** /**
@ -66,16 +66,26 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
private long routedBasedOnClusterVersion = 0; private long routedBasedOnClusterVersion = 0;
public ReplicationRequest() { public ReplicationRequest(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
shardId = ShardId.readShardId(in);
} else {
shardId = null;
}
waitForActiveShards = ActiveShardCount.readFrom(in);
timeout = in.readTimeValue();
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
} }
/** /**
* Creates a new request with resolved shard id * Creates a new request with resolved shard id
*/ */
public ReplicationRequest(ShardId shardId) { public ReplicationRequest(@Nullable ShardId shardId) {
this.index = shardId.getIndexName(); this.index = shardId == null ? null : shardId.getIndexName();
this.shardId = shardId; this.shardId = shardId;
this.timeout = DEFAULT_TIMEOUT;
} }
/** /**
@ -179,27 +189,13 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
if (in.readBoolean()) {
shardId = ShardId.readShardId(in);
} else {
shardId = null;
}
waitForActiveShards = ActiveShardCount.readFrom(in);
timeout = in.readTimeValue();
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
if (shardId != null) { out.writeOptionalWriteable(shardId);
out.writeBoolean(true);
shardId.writeTo(out);
} else {
out.writeBoolean(false);
}
waitForActiveShards.writeTo(out); waitForActiveShards.writeTo(out);
out.writeTimeValue(timeout); out.writeTimeValue(timeout);
out.writeString(index); out.writeString(index);
@ -211,16 +207,6 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers); return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers);
} }
/**
* Sets the target shard id for the request. The shard id is set when a
* index/delete request is resolved by the transport action
*/
@SuppressWarnings("unchecked")
public Request setShardId(ShardId shardId) {
this.shardId = shardId;
return (Request) this;
}
@Override @Override
public abstract String toString(); // force a proper to string to ease debugging public abstract String toString(); // force a proper to string to ease debugging

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.support.replication; package org.elasticsearch.action.support.replication;
import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
@ -35,6 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
@ -44,7 +44,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
/** /**
* Base class for requests that should be executed on all shards of an index or several indices. * Base class for requests that should be executed on all shards of an index or several indices.
@ -58,11 +57,11 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
private final ClusterService clusterService; private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexNameExpressionResolver indexNameExpressionResolver;
public TransportBroadcastReplicationAction(String name, Supplier<Request> request, ClusterService clusterService, public TransportBroadcastReplicationAction(String name, Writeable.Reader<Request> requestReader, ClusterService clusterService,
TransportService transportService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportReplicationAction replicatedBroadcastShardAction) { TransportReplicationAction replicatedBroadcastShardAction) {
super(name, transportService, actionFilters, request); super(name, transportService, actionFilters, requestReader);
this.replicatedBroadcastShardAction = replicatedBroadcastShardAction; this.replicatedBroadcastShardAction = replicatedBroadcastShardAction;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexNameExpressionResolver = indexNameExpressionResolver;

View File

@ -83,9 +83,6 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
/** /**
* Base class for requests that should be executed on a primary copy followed by replica copies. * Base class for requests that should be executed on a primary copy followed by replica copies.
@ -120,10 +117,10 @@ public abstract class TransportReplicationAction<
ClusterService clusterService, IndicesService indicesService, ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
Supplier<ReplicaRequest> replicaRequest, String executor) { Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false, false); indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
} }
@ -131,8 +128,8 @@ public abstract class TransportReplicationAction<
ClusterService clusterService, IndicesService indicesService, ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
Supplier<ReplicaRequest> replicaRequest, String executor, Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) { boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager()); super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool; this.threadPool = threadPool;
@ -146,14 +143,14 @@ public abstract class TransportReplicationAction<
this.transportPrimaryAction = actionName + "[p]"; this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]"; this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest); in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas // we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
executor, true, true, this::handleReplicaRequest); in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);
this.transportOptions = transportOptions(settings); this.transportOptions = transportOptions(settings);
@ -1089,17 +1086,14 @@ public abstract class TransportReplicationAction<
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest { public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {
/** {@link AllocationId#getId()} of the shard this request is sent to **/ /** {@link AllocationId#getId()} of the shard this request is sent to **/
private String targetAllocationID; private final String targetAllocationID;
private final long primaryTerm;
private final R request;
private long primaryTerm; public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
targetAllocationID = in.readString();
private R request; primaryTerm = in.readVLong();
request = requestReader.read(in);
public ConcreteShardRequest(Supplier<R> requestSupplier) {
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
} }
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {
@ -1135,10 +1129,8 @@ public abstract class TransportReplicationAction<
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) {
targetAllocationID = in.readString(); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
primaryTerm = in.readVLong();
request.readFrom(in);
} }
@Override @Override
@ -1168,23 +1160,11 @@ public abstract class TransportReplicationAction<
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> { protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
private long globalCheckpoint; private final long globalCheckpoint;
private long maxSeqNoOfUpdatesOrDeletes; private final long maxSeqNoOfUpdatesOrDeletes;
public ConcreteReplicaRequest(final Supplier<R> requestSupplier) { public ConcreteReplicaRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
super(requestSupplier); super(requestReader, in);
}
public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
super(request, targetAllocationID, primaryTerm);
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong(); globalCheckpoint = in.readZLong();
} else { } else {
@ -1199,6 +1179,18 @@ public abstract class TransportReplicationAction<
} }
} }
public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
super(request, targetAllocationID, primaryTerm);
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}
@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperParsingException;
@ -47,7 +48,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/** /**
* Base class for transport actions that modify data in some shard like index, delete, and shardBulk. * Base class for transport actions that modify data in some shard like index, delete, and shardBulk.
@ -62,8 +62,8 @@ public abstract class TransportWriteAction<
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters, ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) { Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary); indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
} }

View File

@ -845,8 +845,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
retryOnConflict = in.readVInt(); retryOnConflict = in.readVInt();
refreshPolicy = RefreshPolicy.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in);
if (in.readBoolean()) { if (in.readBoolean()) {
doc = new IndexRequest(); doc = new IndexRequest(in);
doc.readFrom(in);
} }
if (in.getVersion().before(Version.V_7_0_0)) { if (in.getVersion().before(Version.V_7_0_0)) {
String[] fields = in.readOptionalStringArray(); String[] fields = in.readOptionalStringArray();
@ -856,8 +855,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
} }
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
if (in.readBoolean()) { if (in.readBoolean()) {
upsertRequest = new IndexRequest(); upsertRequest = new IndexRequest(in);
upsertRequest.readFrom(in);
} }
docAsUpsert = in.readBoolean(); docAsUpsert = in.readBoolean();
if (in.getVersion().before(Version.V_7_0_0)) { if (in.getVersion().before(Version.V_7_0_0)) {

View File

@ -69,8 +69,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
public ReindexRequest(StreamInput in) throws IOException { public ReindexRequest(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
destination = new IndexRequest(); destination = new IndexRequest(in);
destination.readFrom(in);
remoteInfo = in.readOptionalWriteable(RemoteInfo::new); remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -122,8 +123,8 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
public static final class Request extends ReplicationRequest<Request> { public static final class Request extends ReplicationRequest<Request> {
private Request() { private Request(StreamInput in) throws IOException {
super(); super(in);
} }
public Request(final ShardId shardId) { public Request(final ShardId shardId) {

View File

@ -148,8 +148,9 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
return retentionLeases; return retentionLeases;
} }
public Request() { public Request(StreamInput in) throws IOException {
super(in);
retentionLeases = new RetentionLeases(in);
} }
public Request(final ShardId shardId, final RetentionLeases retentionLeases) { public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
@ -159,9 +160,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
} }
@Override @Override
public void readFrom(final StreamInput in) throws IOException { public void readFrom(final StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
retentionLeases = new RetentionLeases(in);
} }
@Override @Override

View File

@ -157,8 +157,9 @@ public class RetentionLeaseSyncAction extends
return retentionLeases; return retentionLeases;
} }
public Request() { public Request(StreamInput in) throws IOException {
super(in);
retentionLeases = new RetentionLeases(in);
} }
public Request(final ShardId shardId, final RetentionLeases retentionLeases) { public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
@ -168,9 +169,8 @@ public class RetentionLeaseSyncAction extends
} }
@Override @Override
public void readFrom(final StreamInput in) throws IOException { public void readFrom(final StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
retentionLeases = new RetentionLeases(in);
} }
@Override @Override

View File

@ -163,8 +163,7 @@ public class IndexRequestTests extends ESTestCase {
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
indexRequest.writeTo(out); indexRequest.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes);
IndexRequest serialized = new IndexRequest(); IndexRequest serialized = new IndexRequest(in);
serialized.readFrom(in);
assertEquals(XContentType.JSON, serialized.getContentType()); assertEquals(XContentType.JSON, serialized.getContentType());
assertEquals(new BytesArray("{}"), serialized.source()); assertEquals(new BytesArray("{}"), serialized.source());
} }
@ -173,14 +172,14 @@ public class IndexRequestTests extends ESTestCase {
public void testSerializationOfEmptyRequestWorks() throws IOException { public void testSerializationOfEmptyRequestWorks() throws IOException {
IndexRequest request = new IndexRequest("index"); IndexRequest request = new IndexRequest("index");
assertNull(request.getContentType()); assertNull(request.getContentType());
assertEquals("index", request.index());
try (BytesStreamOutput out = new BytesStreamOutput()) { try (BytesStreamOutput out = new BytesStreamOutput()) {
request.writeTo(out); request.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) { try (StreamInput in = out.bytes().streamInput()) {
IndexRequest serialized = new IndexRequest(); IndexRequest serialized = new IndexRequest(in);
serialized.readFrom(in); assertNull(serialized.getContentType());
assertNull(request.getContentType()); assertEquals("index", serialized.index());
assertEquals("index", request.index());
} }
} }
} }

View File

@ -44,8 +44,7 @@ public class ResyncReplicationRequestTests extends ESTestCase {
before.writeTo(out); before.writeTo(out);
final StreamInput in = out.bytes().streamInput(); final StreamInput in = out.bytes().streamInput();
final ResyncReplicationRequest after = new ResyncReplicationRequest(); final ResyncReplicationRequest after = new ResyncReplicationRequest(in);
after.readFrom(in);
assertThat(after, equalTo(before)); assertThat(after, equalTo(before));
} }

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.PageCacheRecycler;
@ -118,13 +119,13 @@ public class BroadcastReplicationTests extends ESTestCase {
threadPool = null; threadPool = null;
} }
public void testNotStartedPrimary() throws InterruptedException, ExecutionException, IOException { public void testNotStartedPrimary() throws InterruptedException, ExecutionException {
final String index = "test"; final String index = "test";
setState(clusterService, state(index, randomBoolean(), setState(clusterService, state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state()); logger.debug("--> using initial state:\n{}", clusterService.state());
PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture(); PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture();
broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); broadcastReplicationAction.execute(new DummyBroadcastRequest(index), response);
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) { for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
if (randomBoolean()) { if (randomBoolean()) {
shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1())); shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1()));
@ -138,13 +139,13 @@ public class BroadcastReplicationTests extends ESTestCase {
assertBroadcastResponse(2, 0, 0, response.get(), null); assertBroadcastResponse(2, 0, 0, response.get(), null);
} }
public void testStartedPrimary() throws InterruptedException, ExecutionException, IOException { public void testStartedPrimary() throws InterruptedException, ExecutionException {
final String index = "test"; final String index = "test";
setState(clusterService, state(index, randomBoolean(), setState(clusterService, state(index, randomBoolean(),
ShardRoutingState.STARTED)); ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state()); logger.debug("--> using initial state:\n{}", clusterService.state());
PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture(); PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture();
broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); broadcastReplicationAction.execute(new DummyBroadcastRequest(index), response);
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) { for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
ReplicationResponse replicationResponse = new ReplicationResponse(); ReplicationResponse replicationResponse = new ReplicationResponse();
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1)); replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1));
@ -225,7 +226,7 @@ public class BroadcastReplicationTests extends ESTestCase {
@Override @Override
protected BasicReplicationRequest newShardRequest(DummyBroadcastRequest request, ShardId shardId) { protected BasicReplicationRequest newShardRequest(DummyBroadcastRequest request, ShardId shardId) {
return new BasicReplicationRequest().setShardId(shardId); return new BasicReplicationRequest(shardId);
} }
@Override @Override
@ -269,6 +270,12 @@ public class BroadcastReplicationTests extends ESTestCase {
} }
public static class DummyBroadcastRequest extends BroadcastRequest<DummyBroadcastRequest> { public static class DummyBroadcastRequest extends BroadcastRequest<DummyBroadcastRequest> {
DummyBroadcastRequest(String... indices) {
super(indices);
}
DummyBroadcastRequest(StreamInput in) throws IOException {
super(in);
}
} }
} }

View File

@ -440,12 +440,8 @@ public class ReplicationOperationTests extends ESTestCase {
public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public Set<ShardRouting> processedOnReplicas = ConcurrentCollections.newConcurrentSet(); public Set<ShardRouting> processedOnReplicas = ConcurrentCollections.newConcurrentSet();
public Request() {
}
Request(ShardId shardId) { Request(ShardId shardId) {
this(); super(shardId);
this.shardId = shardId;
this.index = shardId.getIndexName(); this.index = shardId.getIndexName();
this.waitForActiveShards = ActiveShardCount.NONE; this.waitForActiveShards = ActiveShardCount.NONE;
// keep things simple // keep things simple

View File

@ -130,6 +130,8 @@ import static org.mockito.Mockito.when;
public class TransportReplicationActionTests extends ESTestCase { public class TransportReplicationActionTests extends ESTestCase {
private static final ShardId NO_SHARD_ID = null;
/** /**
* takes a request that was sent by a {@link TransportReplicationAction} and captured * takes a request that was sent by a {@link TransportReplicationAction} and captured
* and returns the underlying request if it's wrapped or the original (cast to the expected type). * and returns the underlying request if it's wrapped or the original (cast to the expected type).
@ -231,7 +233,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{ {
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
Request request = globalBlock ? new Request() : new Request().index("index"); Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask(); ReplicationTask task = maybeTask();
@ -246,7 +248,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{ {
setStateWithBlock(clusterService, retryableBlock, globalBlock); setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request requestWithTimeout = (globalBlock ? new Request() : new Request().index("index")).timeout("5ms"); Request requestWithTimeout = (globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index")).timeout("5ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask(); ReplicationTask task = maybeTask();
@ -262,7 +264,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{ {
setStateWithBlock(clusterService, retryableBlock, globalBlock); setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request request = globalBlock ? new Request() : new Request().index("index"); Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask(); ReplicationTask task = maybeTask();
@ -281,7 +283,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertIndexShardUninitialized(); assertIndexShardUninitialized();
} }
{ {
Request requestWithTimeout = new Request().index("unknown").setShardId(new ShardId("unknown", "_na_", 0)).timeout("5ms"); Request requestWithTimeout = new Request(new ShardId("unknown", "_na_", 0)).index("unknown").timeout("5ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask(); ReplicationTask task = maybeTask();
@ -688,8 +690,8 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
}; };
TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
final Request request = new Request(); final Request request = new Request(NO_SHARD_ID);
Request replicaRequest = (Request) primary.perform(request).replicaRequest; primary.perform(request);
final ElasticsearchException exception = new ElasticsearchException("testing"); final ElasticsearchException exception = new ElasticsearchException("testing");
primary.failShard("test", exception); primary.failShard("test", exception);
@ -716,7 +718,7 @@ public class TransportReplicationActionTests extends ESTestCase {
proxy.performOn( proxy.performOn(
TestShardRouting.newShardRouting(shardId, "NOT THERE", TestShardRouting.newShardRouting(shardId, "NOT THERE",
routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState), routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState),
new Request(), new Request(NO_SHARD_ID),
randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(),
listener); listener);
@ -727,7 +729,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
.filter(ShardRouting::assignedToNode).collect(Collectors.toList())); .filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
listener = new PlainActionFuture<>(); listener = new PlainActionFuture<>();
proxy.performOn(replica, new Request(), randomNonNegativeLong(), randomNonNegativeLong(), listener); proxy.performOn(replica, new Request(NO_SHARD_ID), randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertFalse(listener.isDone()); assertFalse(listener.isDone());
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
@ -888,7 +890,7 @@ public class TransportReplicationActionTests extends ESTestCase {
try { try {
action.handleReplicaRequest( action.handleReplicaRequest(
new TransportReplicationAction.ConcreteReplicaRequest<>( new TransportReplicationAction.ConcreteReplicaRequest<>(
new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), new Request(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong()), randomNonNegativeLong(), randomNonNegativeLong()),
createTransportChannel(new PlainActionFuture<>()), task); createTransportChannel(new PlainActionFuture<>()), task);
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
@ -1020,7 +1022,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
}; };
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId); final Request request = new Request(shardId);
final long checkpoint = randomNonNegativeLong(); final long checkpoint = randomNonNegativeLong();
final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong();
action.handleReplicaRequest( action.handleReplicaRequest(
@ -1088,7 +1090,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
}; };
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId); final Request request = new Request(shardId);
final long checkpoint = randomNonNegativeLong(); final long checkpoint = randomNonNegativeLong();
final long maxSeqNoOfUpdates = randomNonNegativeLong(); final long maxSeqNoOfUpdates = randomNonNegativeLong();
action.handleReplicaRequest( action.handleReplicaRequest(
@ -1166,13 +1168,12 @@ public class TransportReplicationActionTests extends ESTestCase {
public AtomicInteger processedOnReplicas = new AtomicInteger(); public AtomicInteger processedOnReplicas = new AtomicInteger();
public AtomicBoolean isRetrySet = new AtomicBoolean(false); public AtomicBoolean isRetrySet = new AtomicBoolean(false);
public Request() { Request(StreamInput in) throws IOException {
super(in);
} }
Request(ShardId shardId) { Request(@Nullable ShardId shardId) {
this(); super(shardId);
this.shardId = shardId;
this.index = shardId.getIndexName();
this.waitForActiveShards = ActiveShardCount.NONE; this.waitForActiveShards = ActiveShardCount.NONE;
// keep things simple // keep things simple
} }
@ -1184,7 +1185,7 @@ public class TransportReplicationActionTests extends ESTestCase {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
} }
@Override @Override

View File

@ -400,7 +400,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
} }
private Request request() { private Request request() {
return new Request().setShardId(primary.shardId()); return new Request(primary.shardId());
} }
/** /**
@ -558,6 +558,14 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
} }
static class Request extends ReplicationRequest<Request> { static class Request extends ReplicationRequest<Request> {
Request(StreamInput in) throws IOException {
super(in);
}
Request(ShardId shardId) {
super(shardId);
}
@Override @Override
public String toString() { public String toString() {
return getTestClass().getName() + ".Request"; return getTestClass().getName() + ".Request";

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -62,6 +63,7 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Locale; import java.util.Locale;
@ -522,8 +524,12 @@ public class TransportWriteActionTests extends ESTestCase {
} }
private static class TestRequest extends ReplicatedWriteRequest<TestRequest> { private static class TestRequest extends ReplicatedWriteRequest<TestRequest> {
TestRequest(StreamInput in) throws IOException {
super(in);
}
TestRequest() { TestRequest() {
setShardId(new ShardId("test", "test", 1)); super(new ShardId("test", "test", 1));
} }
@Override @Override

View File

@ -145,15 +145,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return metaData.build(); return metaData.build();
} }
protected IndexRequest copyIndexRequest(IndexRequest inRequest) throws IOException { IndexRequest copyIndexRequest(IndexRequest inRequest) throws IOException {
final IndexRequest outRequest = new IndexRequest();
try (BytesStreamOutput out = new BytesStreamOutput()) { try (BytesStreamOutput out = new BytesStreamOutput()) {
inRequest.writeTo(out); inRequest.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) { try (StreamInput in = out.bytes().streamInput()) {
outRequest.readFrom(in); return new IndexRequest(in);
} }
} }
return outRequest;
} }
protected DiscoveryNode getDiscoveryNode(String id) { protected DiscoveryNode getDiscoveryNode(String id) {

View File

@ -16,11 +16,15 @@ import java.util.List;
public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> { public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {
private String historyUUID; private final String historyUUID;
private List<Translog.Operation> operations; private final List<Translog.Operation> operations;
private long maxSeqNoOfUpdatesOrDeletes; private final long maxSeqNoOfUpdatesOrDeletes;
public BulkShardOperationsRequest() { public BulkShardOperationsRequest(StreamInput in) throws IOException {
super(in);
historyUUID = in.readString();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readList(Translog.Operation::readOperation);
} }
public BulkShardOperationsRequest(final ShardId shardId, public BulkShardOperationsRequest(final ShardId shardId,
@ -47,11 +51,8 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<Bul
} }
@Override @Override
public void readFrom(final StreamInput in) throws IOException { public void readFrom(final StreamInput in) {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
historyUUID = in.readString();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readList(Translog.Operation::readOperation);
} }
@Override @Override