Convert Broadcast request and response to use writeable.reader () ()

This commit converts the request and response classes for broadcast
actions to implement ctors for Writeable.Reader and forces all future
implementations to implement the same.

relates 
This commit is contained in:
Ryan Ernst 2019-07-16 23:24:02 -07:00 committed by GitHub
parent 6b1a769638
commit 6e50bafa8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 236 additions and 326 deletions
server/src
x-pack/plugin
core/src/main/java/org/elasticsearch/xpack/core
monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.cache.clear;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class ClearIndicesCacheAction extends StreamableResponseActionType<ClearIndicesCacheResponse> {
public class ClearIndicesCacheAction extends ActionType<ClearIndicesCacheResponse> {
public static final ClearIndicesCacheAction INSTANCE = new ClearIndicesCacheAction();
public static final String NAME = "indices:admin/cache/clear";
private ClearIndicesCacheAction() {
super(NAME);
}
@Override
public ClearIndicesCacheResponse newResponse() {
return new ClearIndicesCacheResponse();
super(NAME, ClearIndicesCacheResponse::new);
}
}

@ -34,8 +34,15 @@ public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCache
private boolean requestCache = false;
private String[] fields = Strings.EMPTY_ARRAY;
public ClearIndicesCacheRequest() {
public ClearIndicesCacheRequest(StreamInput in) throws IOException {
super(in);
queryCache = in.readBoolean();
fieldDataCache = in.readBoolean();
if (in.getVersion().before(Version.V_6_0_0_beta1)) {
in.readBoolean(); // recycler
}
fields = in.readStringArray();
requestCache = in.readBoolean();
}
public ClearIndicesCacheRequest(String... indices) {
@ -78,18 +85,6 @@ public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCache
return this.fields;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
queryCache = in.readBoolean();
fieldDataCache = in.readBoolean();
if (in.getVersion().before(Version.V_6_0_0_beta1)) {
in.readBoolean(); // recycler
}
fields = in.readStringArray();
requestCache = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -21,9 +21,11 @@ package org.elasticsearch.action.admin.indices.cache.clear;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -43,8 +45,8 @@ public class ClearIndicesCacheResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}
ClearIndicesCacheResponse() {
ClearIndicesCacheResponse(StreamInput in) throws IOException {
super(in);
}
ClearIndicesCacheResponse(int totalShards, int successfulShards, int failedShards,

@ -70,9 +70,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
@Override
protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOException {
final ClearIndicesCacheRequest request = new ClearIndicesCacheRequest();
request.readFrom(in);
return request;
return new ClearIndicesCacheRequest(in);
}
@Override

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class FlushAction extends StreamableResponseActionType<FlushResponse> {
public class FlushAction extends ActionType<FlushResponse> {
public static final FlushAction INSTANCE = new FlushAction();
public static final String NAME = "indices:admin/flush";
private FlushAction() {
super(NAME);
}
@Override
public FlushResponse newResponse() {
return new FlushResponse();
super(NAME, FlushResponse::new);
}
}

@ -107,11 +107,6 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
out.writeBoolean(waitIfOngoing);
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public String toString() {
return "FlushRequest{" +

@ -21,9 +21,11 @@ package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -43,8 +45,8 @@ public class FlushResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}
FlushResponse() {
FlushResponse(StreamInput in) throws IOException {
super(in);
}
FlushResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {

@ -20,7 +20,9 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.util.Arrays;
/**
@ -43,6 +45,9 @@ public class SyncedFlushRequest extends BroadcastRequest<SyncedFlushRequest> {
super(indices);
}
public SyncedFlushRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public String toString() {

@ -37,7 +37,7 @@ public class TransportSyncedFlushAction extends HandledTransportAction<SyncedFlu
@Inject
public TransportSyncedFlushAction(TransportService transportService, ActionFilters actionFilters,
SyncedFlushService syncedFlushService) {
super(SyncedFlushAction.NAME, transportService, SyncedFlushRequest::new, actionFilters);
super(SyncedFlushAction.NAME, transportService, actionFilters, SyncedFlushRequest::new);
this.syncedFlushService = syncedFlushService;
}

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.forcemerge;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class ForceMergeAction extends StreamableResponseActionType<ForceMergeResponse> {
public class ForceMergeAction extends ActionType<ForceMergeResponse> {
public static final ForceMergeAction INSTANCE = new ForceMergeAction();
public static final String NAME = "indices:admin/forcemerge";
private ForceMergeAction() {
super(NAME);
}
@Override
public ForceMergeResponse newResponse() {
return new ForceMergeResponse();
super(NAME, ForceMergeResponse::new);
}
}

@ -58,8 +58,11 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
super(indices);
}
public ForceMergeRequest() {
public ForceMergeRequest(StreamInput in) throws IOException {
super(in);
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
}
/**
@ -111,14 +114,6 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -21,9 +21,11 @@ package org.elasticsearch.action.admin.indices.forcemerge;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -43,7 +45,8 @@ public class ForceMergeResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}
ForceMergeResponse() {
ForceMergeResponse(StreamInput in) throws IOException {
super(in);
}
ForceMergeResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {

@ -69,9 +69,7 @@ public class TransportForceMergeAction
@Override
protected ForceMergeRequest readRequestFrom(StreamInput in) throws IOException {
final ForceMergeRequest request = new ForceMergeRequest();
request.readFrom(in);
return request;
return new ForceMergeRequest(in);
}
@Override

@ -19,22 +19,17 @@
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
/**
* Recovery information action
*/
public class RecoveryAction extends StreamableResponseActionType<RecoveryResponse> {
public class RecoveryAction extends ActionType<RecoveryResponse> {
public static final RecoveryAction INSTANCE = new RecoveryAction();
public static final String NAME = "indices:monitor/recovery";
private RecoveryAction() {
super(NAME);
}
@Override
public RecoveryResponse newResponse() {
return new RecoveryResponse();
super(NAME, RecoveryResponse::new);
}
}

@ -42,6 +42,12 @@ public class RecoveryRequest extends BroadcastRequest<RecoveryRequest> {
this(Strings.EMPTY_ARRAY);
}
public RecoveryRequest(StreamInput in) throws IOException {
super(in);
detailed = in.readBoolean();
activeOnly = in.readBoolean();
}
/**
* Constructs a request for recovery information for all shards for the given indices
*
@ -95,11 +101,4 @@ public class RecoveryRequest extends BroadcastRequest<RecoveryRequest> {
out.writeBoolean(detailed);
out.writeBoolean(activeOnly);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
activeOnly = in.readBoolean();
}
}

@ -40,7 +40,19 @@ public class RecoveryResponse extends BroadcastResponse {
private Map<String, List<RecoveryState>> shardRecoveryStates = new HashMap<>();
public RecoveryResponse() { }
public RecoveryResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
String s = in.readString();
int listSize = in.readVInt();
List<RecoveryState> list = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
list.add(RecoveryState.readRecoveryState(in));
}
shardRecoveryStates.put(s, list);
}
}
/**
* Constructs recovery information for a collection of indices and associated shards. Keeps track of how many total shards
@ -103,21 +115,6 @@ public class RecoveryResponse extends BroadcastResponse {
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
String s = in.readString();
int listSize = in.readVInt();
List<RecoveryState> list = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
list.add(RecoveryState.readRecoveryState(in));
}
shardRecoveryStates.put(s, list);
}
}
@Override
public String toString() {
return Strings.toString(this, true, true);

@ -93,9 +93,7 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
@Override
protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
final RecoveryRequest recoveryRequest = new RecoveryRequest();
recoveryRequest.readFrom(in);
return recoveryRequest;
return new RecoveryRequest(in);
}
@Override

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class RefreshAction extends StreamableResponseActionType<RefreshResponse> {
public class RefreshAction extends ActionType<RefreshResponse> {
public static final RefreshAction INSTANCE = new RefreshAction();
public static final String NAME = "indices:admin/refresh";
private RefreshAction() {
super(NAME);
}
@Override
public RefreshResponse newResponse() {
return new RefreshResponse();
super(NAME, RefreshResponse::new);
}
}

@ -21,9 +21,11 @@ package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -43,7 +45,8 @@ public class RefreshResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}
RefreshResponse() {
RefreshResponse(StreamInput in) throws IOException {
super(in);
}
RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {

@ -48,8 +48,12 @@ public class IndicesSegmentResponse extends BroadcastResponse {
private Map<String, IndexSegments> indicesSegments;
IndicesSegmentResponse() {
IndicesSegmentResponse(StreamInput in) throws IOException {
super(in);
shards = new ShardSegments[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ShardSegments.readShardSegments(in);
}
}
IndicesSegmentResponse(ShardSegments[] shards, int totalShards, int successfulShards, int failedShards,
@ -82,16 +86,6 @@ public class IndicesSegmentResponse extends BroadcastResponse {
return indicesSegments;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shards = new ShardSegments[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ShardSegments.readShardSegments(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.segments;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class IndicesSegmentsAction extends StreamableResponseActionType<IndicesSegmentResponse> {
public class IndicesSegmentsAction extends ActionType<IndicesSegmentResponse> {
public static final IndicesSegmentsAction INSTANCE = new IndicesSegmentsAction();
public static final String NAME = "indices:monitor/segments";
private IndicesSegmentsAction() {
super(NAME);
}
@Override
public IndicesSegmentResponse newResponse() {
return new IndicesSegmentResponse();
super(NAME, IndicesSegmentResponse::new);
}
}

@ -34,6 +34,11 @@ public class IndicesSegmentsRequest extends BroadcastRequest<IndicesSegmentsRequ
this(Strings.EMPTY_ARRAY);
}
public IndicesSegmentsRequest(StreamInput in) throws IOException {
super(in);
verbose = in.readBoolean();
}
public IndicesSegmentsRequest(String... indices) {
super(indices);
}
@ -60,11 +65,4 @@ public class IndicesSegmentsRequest extends BroadcastRequest<IndicesSegmentsRequ
out.writeBoolean(verbose);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
verbose = in.readBoolean();
}
}

@ -87,9 +87,7 @@ public class TransportIndicesSegmentsAction
@Override
protected IndicesSegmentsRequest readRequestFrom(StreamInput in) throws IOException {
final IndicesSegmentsRequest request = new IndicesSegmentsRequest();
request.readFrom(in);
return request;
return new IndicesSegmentsRequest(in);
}
@Override

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class IndicesStatsAction extends StreamableResponseActionType<IndicesStatsResponse> {
public class IndicesStatsAction extends ActionType<IndicesStatsResponse> {
public static final IndicesStatsAction INSTANCE = new IndicesStatsAction();
public static final String NAME = "indices:monitor/stats";
private IndicesStatsAction() {
super(NAME);
}
@Override
public IndicesStatsResponse newResponse() {
return new IndicesStatsResponse();
super(NAME, IndicesStatsResponse::new);
}
}

@ -38,6 +38,15 @@ public class IndicesStatsRequest extends BroadcastRequest<IndicesStatsRequest> {
private CommonStatsFlags flags = new CommonStatsFlags();
public IndicesStatsRequest() {
super((String[])null);
}
public IndicesStatsRequest(StreamInput in) throws IOException {
super(in);
flags = new CommonStatsFlags(in);
}
/**
* Sets all flags to return all stats.
*/
@ -281,10 +290,4 @@ public class IndicesStatsRequest extends BroadcastRequest<IndicesStatsRequest> {
super.writeTo(out);
flags.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
flags = new CommonStatsFlags(in);
}
}

@ -43,8 +43,9 @@ public class IndicesStatsResponse extends BroadcastResponse {
private Map<ShardRouting, ShardStats> shardStatsMap;
IndicesStatsResponse() {
IndicesStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(ShardStats::readShardStats, (size) -> new ShardStats[size]);
}
IndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards,
@ -126,12 +127,6 @@ public class IndicesStatsResponse extends BroadcastResponse {
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shards = in.readArray(ShardStats::readShardStats, (size) -> new ShardStats[size]);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -90,9 +90,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
@Override
protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException {
IndicesStatsRequest request = new IndicesStatsRequest();
request.readFrom(in);
return request;
return new IndicesStatsRequest(in);
}
@Override

@ -89,9 +89,7 @@ public class TransportUpgradeStatusAction
@Override
protected UpgradeStatusRequest readRequestFrom(StreamInput in) throws IOException {
UpgradeStatusRequest request = new UpgradeStatusRequest();
request.readFrom(in);
return request;
return new UpgradeStatusRequest(in);
}
@Override

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.upgrade.get;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class UpgradeStatusAction extends StreamableResponseActionType<UpgradeStatusResponse> {
public class UpgradeStatusAction extends ActionType<UpgradeStatusResponse> {
public static final UpgradeStatusAction INSTANCE = new UpgradeStatusAction();
public static final String NAME = "indices:monitor/upgrade";
private UpgradeStatusAction() {
super(NAME);
}
@Override
public UpgradeStatusResponse newResponse() {
return new UpgradeStatusResponse();
super(NAME, UpgradeStatusResponse::new);
}
}

@ -21,6 +21,9 @@ package org.elasticsearch.action.admin.indices.upgrade.get;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class UpgradeStatusRequest extends BroadcastRequest<UpgradeStatusRequest> {
@ -28,6 +31,10 @@ public class UpgradeStatusRequest extends BroadcastRequest<UpgradeStatusRequest>
this(Strings.EMPTY_ARRAY);
}
public UpgradeStatusRequest(StreamInput in) throws IOException {
super(in);
}
public UpgradeStatusRequest(String... indices) {
super(indices);
}

@ -39,7 +39,12 @@ public class UpgradeStatusResponse extends BroadcastResponse {
private Map<String, IndexUpgradeStatus> indicesUpgradeStatus;
UpgradeStatusResponse() {
UpgradeStatusResponse(StreamInput in) throws IOException {
super(in);
shards = new ShardUpgradeStatus[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ShardUpgradeStatus.readShardUpgradeStatus(in);
}
}
UpgradeStatusResponse(ShardUpgradeStatus[] shards, int totalShards, int successfulShards, int failedShards,
@ -72,15 +77,6 @@ public class UpgradeStatusResponse extends BroadcastResponse {
return indicesUpgradeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shards = new ShardUpgradeStatus[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ShardUpgradeStatus.readShardUpgradeStatus(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -137,9 +137,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
@Override
protected UpgradeRequest readRequestFrom(StreamInput in) throws IOException {
UpgradeRequest request = new UpgradeRequest();
request.readFrom(in);
return request;
return new UpgradeRequest(in);
}
/**

@ -19,22 +19,17 @@
package org.elasticsearch.action.admin.indices.upgrade.post;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
/**
* Upgrade index/indices action.
*/
public class UpgradeAction extends StreamableResponseActionType<UpgradeResponse> {
public class UpgradeAction extends ActionType<UpgradeResponse> {
public static final UpgradeAction INSTANCE = new UpgradeAction();
public static final String NAME = "indices:admin/upgrade";
private UpgradeAction() {
super(NAME);
}
@Override
public UpgradeResponse newResponse() {
return new UpgradeResponse();
super(NAME, UpgradeResponse::new);
}
}

@ -49,13 +49,8 @@ public class UpgradeRequest extends BroadcastRequest<UpgradeRequest> {
super(indices);
}
public UpgradeRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public UpgradeRequest(StreamInput in) throws IOException {
super(in);
upgradeOnlyAncientSegments = in.readBoolean();
}

@ -41,19 +41,8 @@ public class UpgradeResponse extends BroadcastResponse {
private Map<String, Tuple<Version, String>> versions;
UpgradeResponse() {
}
UpgradeResponse(Map<String, Tuple<Version, String>> versions, int totalShards, int successfulShards, int failedShards,
List<DefaultShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.versions = versions;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
UpgradeResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
versions = new HashMap<>();
for (int i=0; i<size; i++) {
@ -64,6 +53,12 @@ public class UpgradeResponse extends BroadcastResponse {
}
}
UpgradeResponse(Map<String, Tuple<Version, String>> versions, int totalShards, int successfulShards, int failedShards,
List<DefaultShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.versions = versions;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.indices.validate.query;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class ValidateQueryAction extends StreamableResponseActionType<ValidateQueryResponse> {
public class ValidateQueryAction extends ActionType<ValidateQueryResponse> {
public static final ValidateQueryAction INSTANCE = new ValidateQueryAction();
public static final String NAME = "indices:admin/validate/query";
private ValidateQueryAction() {
super(NAME);
}
@Override
public ValidateQueryResponse newResponse() {
return new ValidateQueryResponse();
super(NAME, ValidateQueryResponse::new);
}
}

@ -55,6 +55,21 @@ public class ValidateQueryRequest extends BroadcastRequest<ValidateQueryRequest>
this(Strings.EMPTY_ARRAY);
}
public ValidateQueryRequest(StreamInput in) throws IOException {
super(in);
query = in.readNamedWriteable(QueryBuilder.class);
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
for (int i = 0; i < typesSize; i++) {
types[i] = in.readString();
}
}
explain = in.readBoolean();
rewrite = in.readBoolean();
allShards = in.readBoolean();
}
/**
* Constructs a new validate request against the provided indices. No indices provided means it will
* run against all indices.
@ -150,22 +165,6 @@ public class ValidateQueryRequest extends BroadcastRequest<ValidateQueryRequest>
return allShards;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
query = in.readNamedWriteable(QueryBuilder.class);
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
for (int i = 0; i < typesSize; i++) {
types[i] = in.readString();
}
}
explain = in.readBoolean();
rewrite = in.readBoolean();
allShards = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -77,8 +77,16 @@ public class ValidateQueryResponse extends BroadcastResponse {
private List<QueryExplanation> queryExplanations;
ValidateQueryResponse() {
ValidateQueryResponse(StreamInput in) throws IOException {
super(in);
valid = in.readBoolean();
int size = in.readVInt();
if (size > 0) {
queryExplanations = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
queryExplanations.add(readQueryExplanation(in));
}
}
}
ValidateQueryResponse(boolean valid, List<QueryExplanation> queryExplanations, int totalShards, int successfulShards, int failedShards,
@ -108,19 +116,6 @@ public class ValidateQueryResponse extends BroadcastResponse {
return queryExplanations;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
valid = in.readBoolean();
int size = in.readVInt();
if (size > 0) {
queryExplanations = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
queryExplanations.add(readQueryExplanation(in));
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -33,16 +33,13 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
protected String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
public BroadcastRequest() {
}
public BroadcastRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
protected BroadcastRequest(String[] indices) {
protected BroadcastRequest(String... indices) {
this.indices = indices;
}
@ -87,9 +84,7 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable " + getClass().getName());
}
}

@ -66,7 +66,19 @@ public class BroadcastResponse extends ActionResponse implements ToXContentObjec
PARSER.declareObject(constructorArg(), shardsParser, _SHARDS_FIELD);
}
public BroadcastResponse() {
public BroadcastResponse() {}
public BroadcastResponse(StreamInput in) throws IOException {
totalShards = in.readVInt();
successfulShards = in.readVInt();
failedShards = in.readVInt();
int size = in.readVInt();
if (size > 0) {
shardFailures = new DefaultShardOperationFailedException[size];
for (int i = 0; i < size; i++) {
shardFailures[i] = readShardOperationFailed(in);
}
}
}
public BroadcastResponse(int totalShards, int successfulShards, int failedShards,
@ -121,18 +133,8 @@ public class BroadcastResponse extends ActionResponse implements ToXContentObjec
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
totalShards = in.readVInt();
successfulShards = in.readVInt();
failedShards = in.readVInt();
int size = in.readVInt();
if (size > 0) {
shardFailures = new DefaultShardOperationFailedException[size];
for (int i = 0; i < size; i++) {
shardFailures[i] = readShardOperationFailed(in);
}
}
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
@ -66,9 +67,9 @@ public abstract class TransportBroadcastAction<
protected TransportBroadcastAction(String actionName, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
Supplier<ShardRequest> shardRequest, String shardExecutor) {
super(actionName, transportService, request, actionFilters);
super(actionName, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeShouldNotConnectException;
@ -61,7 +62,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
/**
* Abstraction for transporting aggregated shard-level operations in a single request (NodeRequest) per-node
@ -90,7 +90,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request,
Writeable.Reader<Request> request,
String executor) {
this(actionName, clusterService, transportService, actionFilters, indexNameExpressionResolver, request, executor, true);
}
@ -101,10 +101,10 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request,
Writeable.Reader<Request> request,
String executor,
boolean canTripCircuitBreaker) {
super(actionName, canTripCircuitBreaker, transportService, request, actionFilters);
super(actionName, canTripCircuitBreaker, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
@ -314,9 +314,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
NodeResponse nodeResponse = new NodeResponse();
nodeResponse.readFrom(in);
return nodeResponse;
return new NodeResponse(in);
}
@Override
@ -505,7 +503,16 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
protected List<BroadcastShardOperationFailedException> exceptions;
protected List<ShardOperationResult> results;
NodeResponse() {
NodeResponse(StreamInput in) throws IOException {
super(in);
nodeId = in.readString();
totalShards = in.readVInt();
results = in.readList((stream) -> stream.readBoolean() ? readShardResult(stream) : null);
if (in.readBoolean()) {
exceptions = in.readList(BroadcastShardOperationFailedException::new);
} else {
exceptions = null;
}
}
NodeResponse(String nodeId,
@ -536,15 +543,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
totalShards = in.readVInt();
results = in.readList((stream) -> stream.readBoolean() ? readShardResult(stream) : null);
if (in.readBoolean()) {
exceptions = in.readList(BroadcastShardOperationFailedException::new);
} else {
exceptions = null;
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

@ -46,7 +46,7 @@ import static org.hamcrest.object.HasToString.hasToString;
public class IndicesStatsResponseTests extends ESTestCase {
public void testInvalidLevel() {
final IndicesStatsResponse response = new IndicesStatsResponse();
final IndicesStatsResponse response = new IndicesStatsResponse(null, 0, 0, 0, null);
final String level = randomAlphaOfLength(16);
final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level));
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class,

@ -48,6 +48,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
@ -74,7 +75,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -96,16 +96,19 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
private TestTransportBroadcastByNodeAction action;
public static class Request extends BroadcastRequest<Request> {
public Request() {
public Request(StreamInput in) throws IOException {
super(in);
}
public Request(String[] indices) {
public Request(String... indices) {
super(indices);
}
}
public static class Response extends BroadcastResponse {
public Response() {
public Response(StreamInput in) throws IOException {
super(in);
}
public Response(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
@ -118,7 +121,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
private final Map<ShardRouting, Object> shards = new HashMap<>();
TestTransportBroadcastByNodeAction(TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
String executor) {
super("indices:admin/test", TransportBroadcastByNodeActionTests.this.clusterService, transportService,
actionFilters, indexNameExpressionResolver, request, executor);
@ -138,9 +141,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
@Override
protected Request readRequestFrom(StreamInput in) throws IOException {
final Request request = new Request();
request.readFrom(in);
return request;
return new Request(in);
}
@Override

@ -6,7 +6,9 @@
package org.elasticsearch.xpack.core.action;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@ -22,6 +24,10 @@ public class ReloadAnalyzersRequest extends BroadcastRequest<ReloadAnalyzersRequ
super(indices);
}
public ReloadAnalyzersRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public boolean equals(Object o) {
if (this == o) {

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.action;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -41,6 +42,12 @@ public class ReloadAnalyzersResponse extends BroadcastResponse {
reloadDetails = Collections.emptyMap();
}
public ReloadAnalyzersResponse(StreamInput in) throws IOException {
super(in);
reloadDetails = null;
// TODO: this needs to deserialize reloadDetails, see https://github.com/elastic/elasticsearch/issues/44383
}
public ReloadAnalyzersResponse(int totalShards, int successfulShards, int failedShards,
List<DefaultShardOperationFailedException> shardFailures, Map<String, ReloadDetails> reloadedIndicesNodes) {
super(totalShards, successfulShards, failedShards, shardFailures);

@ -84,9 +84,7 @@ public class TransportReloadAnalyzersAction
@Override
protected ReloadAnalyzersRequest readRequestFrom(StreamInput in) throws IOException {
final ReloadAnalyzersRequest request = new ReloadAnalyzersRequest();
request.readFrom(in);
return request;
return new ReloadAnalyzersRequest(in);
}
@Override

@ -7,7 +7,7 @@
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.ParseField;
@ -19,18 +19,13 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
public class ForgetFollowerAction extends StreamableResponseActionType<BroadcastResponse> {
public class ForgetFollowerAction extends ActionType<BroadcastResponse> {
public static final String NAME = "indices:admin/xpack/ccr/forget_follower";
public static final ForgetFollowerAction INSTANCE = new ForgetFollowerAction();
private ForgetFollowerAction() {
super(NAME);
}
@Override
public BroadcastResponse newResponse() {
return new BroadcastResponse();
super(NAME, BroadcastResponse::new);
}
/**
@ -115,8 +110,13 @@ public class ForgetFollowerAction extends StreamableResponseActionType<Broadcast
return leaderIndex;
}
public Request() {
public Request(StreamInput in) throws IOException {
super(in);
followerCluster = in.readString();
leaderIndex = in.readString();
leaderRemoteCluster = in.readString();
followerIndex = in.readString();
followerIndexUUID = in.readString();
}
/**
@ -142,15 +142,6 @@ public class ForgetFollowerAction extends StreamableResponseActionType<Broadcast
this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID);
}
public Request(final StreamInput in) throws IOException {
super.readFrom(in);
followerCluster = in.readString();
leaderIndex = in.readString();
leaderRemoteCluster = in.readString();
followerIndex = in.readString();
followerIndexUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

@ -635,7 +635,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
long intervalMillis = randomNonNegativeLong();
MonitoringDoc.Node sourceNode = MonitoringTestUtils.randomMonitoringNode(random());
return new IndexRecoveryMonitoringDoc(clusterUUID, timestamp, intervalMillis, sourceNode, new RecoveryResponse());
return new IndexRecoveryMonitoringDoc(clusterUUID, timestamp, intervalMillis, sourceNode,
new RecoveryResponse(0, 0, 0, null, null));
}
private List<MonitoringDoc> newRandomMonitoringDocs(int nb) {