Convert BaseNode(s) Request/Response classes to Writeable (#44301) (#44358)

This commit converts all BaseNodeResponse and BaseNodesResponse
subclasses to implement Writeable.Reader instead of Streamable.

relates #34389
This commit is contained in:
Ryan Ernst 2019-07-15 18:07:52 -07:00 committed by GitHub
parent 7e06888bae
commit e0b82e92f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 543 additions and 754 deletions

View File

@ -30,7 +30,9 @@ public class NodeHotThreads extends BaseNodeResponse {
private String hotThreads;
NodeHotThreads() {
NodeHotThreads(StreamInput in) throws IOException {
super(in);
hotThreads = in.readString();
}
public NodeHotThreads(DiscoveryNode node, String hotThreads) {
@ -42,18 +44,6 @@ public class NodeHotThreads extends BaseNodeResponse {
return this.hotThreads;
}
public static NodeHotThreads readNodeHotThreads(StreamInput in) throws IOException {
NodeHotThreads node = new NodeHotThreads();
node.readFrom(in);
return node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
hotThreads = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class NodesHotThreadsAction extends StreamableResponseActionType<NodesHotThreadsResponse> {
public class NodesHotThreadsAction extends ActionType<NodesHotThreadsResponse> {
public static final NodesHotThreadsAction INSTANCE = new NodesHotThreadsAction();
public static final String NAME = "cluster:monitor/nodes/hot_threads";
private NodesHotThreadsAction() {
super(NAME);
}
@Override
public NodesHotThreadsResponse newResponse() {
return new NodesHotThreadsResponse();
super(NAME, NodesHotThreadsResponse::new);
}
}

View File

@ -36,8 +36,13 @@ public class NodesHotThreadsRequest extends BaseNodesRequest<NodesHotThreadsRequ
boolean ignoreIdleThreads = true;
// for serialization
public NodesHotThreadsRequest() {
public NodesHotThreadsRequest(StreamInput in) throws IOException {
super(in);
threads = in.readInt();
ignoreIdleThreads = in.readBoolean();
type = in.readString();
interval = in.readTimeValue();
snapshots = in.readInt();
}
/**
@ -93,16 +98,6 @@ public class NodesHotThreadsRequest extends BaseNodesRequest<NodesHotThreadsRequ
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
threads = in.readInt();
ignoreIdleThreads = in.readBoolean();
type = in.readString();
interval = in.readTimeValue();
snapshots = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -30,7 +30,8 @@ import java.util.List;
public class NodesHotThreadsResponse extends BaseNodesResponse<NodeHotThreads> {
NodesHotThreadsResponse() {
public NodesHotThreadsResponse(StreamInput in) throws IOException {
super(in);
}
public NodesHotThreadsResponse(ClusterName clusterName, List<NodeHotThreads> nodes, List<FailedNodeException> failures) {
@ -39,7 +40,7 @@ public class NodesHotThreadsResponse extends BaseNodesResponse<NodeHotThreads> {
@Override
protected List<NodeHotThreads> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeHotThreads::readNodeHotThreads);
return in.readList(NodeHotThreads::new);
}
@Override

View File

@ -59,8 +59,8 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
}
@Override
protected NodeHotThreads newNodeResponse() {
return new NodeHotThreads();
protected NodeHotThreads newNodeResponse(StreamInput in) throws IOException {
return new NodeHotThreads(in);
}
@Override
@ -82,20 +82,15 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
NodesHotThreadsRequest request;
public NodeRequest() {
public NodeRequest(StreamInput in) throws IOException {
super(in);
request = new NodesHotThreadsRequest(in);
}
NodeRequest(NodesHotThreadsRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesHotThreadsRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -76,7 +76,26 @@ public class NodeInfo extends BaseNodeResponse {
@Nullable
private ByteSizeValue totalIndexingBuffer;
public NodeInfo() {
public NodeInfo(StreamInput in) throws IOException {
super(in);
version = Version.readVersion(in);
build = Build.readBuild(in);
if (in.readBoolean()) {
totalIndexingBuffer = new ByteSizeValue(in.readLong());
} else {
totalIndexingBuffer = null;
}
if (in.readBoolean()) {
settings = Settings.readSettingsFromStream(in);
}
os = in.readOptionalWriteable(OsInfo::new);
process = in.readOptionalWriteable(ProcessInfo::new);
jvm = in.readOptionalWriteable(JvmInfo::new);
threadPool = in.readOptionalWriteable(ThreadPoolInfo::new);
transport = in.readOptionalWriteable(TransportInfo::new);
http = in.readOptionalWriteable(HttpInfo::new);
plugins = in.readOptionalWriteable(PluginsAndModules::new);
ingest = in.readOptionalWriteable(IngestInfo::new);
}
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
@ -182,35 +201,6 @@ public class NodeInfo extends BaseNodeResponse {
return totalIndexingBuffer;
}
public static NodeInfo readNodeInfo(StreamInput in) throws IOException {
NodeInfo nodeInfo = new NodeInfo();
nodeInfo.readFrom(in);
return nodeInfo;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
version = Version.readVersion(in);
build = Build.readBuild(in);
if (in.readBoolean()) {
totalIndexingBuffer = new ByteSizeValue(in.readLong());
} else {
totalIndexingBuffer = null;
}
if (in.readBoolean()) {
settings = Settings.readSettingsFromStream(in);
}
os = in.readOptionalWriteable(OsInfo::new);
process = in.readOptionalWriteable(ProcessInfo::new);
jvm = in.readOptionalWriteable(JvmInfo::new);
threadPool = in.readOptionalWriteable(ThreadPoolInfo::new);
transport = in.readOptionalWriteable(TransportInfo::new);
http = in.readOptionalWriteable(HttpInfo::new);
plugins = in.readOptionalWriteable(PluginsAndModules::new);
ingest = in.readOptionalWriteable(IngestInfo::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.cluster.node.info;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class NodesInfoAction extends StreamableResponseActionType<NodesInfoResponse> {
public class NodesInfoAction extends ActionType<NodesInfoResponse> {
public static final NodesInfoAction INSTANCE = new NodesInfoAction();
public static final String NAME = "cluster:monitor/nodes/info";
private NodesInfoAction() {
super(NAME);
}
@Override
public NodesInfoResponse newResponse() {
return new NodesInfoResponse();
super(NAME, NodesInfoResponse::new);
}
}

View File

@ -41,7 +41,18 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
private boolean ingest = true;
private boolean indices = true;
public NodesInfoRequest() {
public NodesInfoRequest(StreamInput in) throws IOException {
super(in);
settings = in.readBoolean();
os = in.readBoolean();
process = in.readBoolean();
jvm = in.readBoolean();
threadPool = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
plugins = in.readBoolean();
ingest = in.readBoolean();
indices = in.readBoolean();
}
/**
@ -240,21 +251,6 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
return indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
settings = in.readBoolean();
os = in.readBoolean();
process = in.readBoolean();
jvm = in.readBoolean();
threadPool = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
plugins = in.readBoolean();
ingest = in.readBoolean();
indices = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -37,7 +37,8 @@ import java.util.Map;
public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements ToXContentFragment {
public NodesInfoResponse() {
public NodesInfoResponse(StreamInput in) throws IOException {
super(in);
}
public NodesInfoResponse(ClusterName clusterName, List<NodeInfo> nodes, List<FailedNodeException> failures) {
@ -46,7 +47,7 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
@Override
protected List<NodeInfo> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeInfo::readNodeInfo);
return in.readList(NodeInfo::new);
}
@Override

View File

@ -61,8 +61,8 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
}
@Override
protected NodeInfo newNodeResponse() {
return new NodeInfo();
protected NodeInfo newNodeResponse(StreamInput in) throws IOException {
return new NodeInfo(in);
}
@Override
@ -76,20 +76,15 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
NodesInfoRequest request;
public NodeInfoRequest() {
public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
request = new NodesInfoRequest(in);
}
public NodeInfoRequest(NodesInfoRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesInfoRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -19,21 +19,14 @@
package org.elasticsearch.action.admin.cluster.node.reload;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class NodesReloadSecureSettingsAction
extends StreamableResponseActionType<NodesReloadSecureSettingsResponse> {
public class NodesReloadSecureSettingsAction extends ActionType<NodesReloadSecureSettingsResponse> {
public static final NodesReloadSecureSettingsAction INSTANCE = new NodesReloadSecureSettingsAction();
public static final String NAME = "cluster:admin/nodes/reload_secure_settings";
private NodesReloadSecureSettingsAction() {
super(NAME);
super(NAME, NodesReloadSecureSettingsResponse::new);
}
@Override
public NodesReloadSecureSettingsResponse newResponse() {
return new NodesReloadSecureSettingsResponse();
}
}

View File

@ -20,6 +20,9 @@
package org.elasticsearch.action.admin.cluster.node.reload;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* Request for a reload secure settings action.
@ -27,6 +30,11 @@ import org.elasticsearch.action.support.nodes.BaseNodesRequest;
public class NodesReloadSecureSettingsRequest extends BaseNodesRequest<NodesReloadSecureSettingsRequest> {
public NodesReloadSecureSettingsRequest() {
super((String[]) null);
}
public NodesReloadSecureSettingsRequest(StreamInput in) throws IOException {
super(in);
}
/**

View File

@ -40,7 +40,8 @@ import java.util.List;
public class NodesReloadSecureSettingsResponse extends BaseNodesResponse<NodesReloadSecureSettingsResponse.NodeResponse>
implements ToXContentFragment {
public NodesReloadSecureSettingsResponse() {
public NodesReloadSecureSettingsResponse(StreamInput in) throws IOException {
super(in);
}
public NodesReloadSecureSettingsResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
@ -49,7 +50,7 @@ public class NodesReloadSecureSettingsResponse extends BaseNodesResponse<NodesRe
@Override
protected List<NodesReloadSecureSettingsResponse.NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeResponse::readNodeResponse);
return in.readList(NodeResponse::new);
}
@Override
@ -92,7 +93,11 @@ public class NodesReloadSecureSettingsResponse extends BaseNodesResponse<NodesRe
private Exception reloadException = null;
public NodeResponse() {
public NodeResponse(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
reloadException = in.readException();
}
}
public NodeResponse(DiscoveryNode node, Exception reloadException) {
@ -104,14 +109,6 @@ public class NodesReloadSecureSettingsResponse extends BaseNodesResponse<NodesRe
return this.reloadException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
reloadException = in.readException();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -139,11 +136,5 @@ public class NodesReloadSecureSettingsResponse extends BaseNodesResponse<NodesRe
public int hashCode() {
return reloadException != null ? reloadException.hashCode() : 0;
}
public static NodeResponse readNodeResponse(StreamInput in) throws IOException {
final NodeResponse node = new NodeResponse();
node.readFrom(in);
return node;
}
}
}

View File

@ -73,8 +73,8 @@ public class TransportNodesReloadSecureSettingsAction extends TransportNodesActi
}
@Override
protected NodesReloadSecureSettingsResponse.NodeResponse newNodeResponse() {
return new NodesReloadSecureSettingsResponse.NodeResponse();
protected NodesReloadSecureSettingsResponse.NodeResponse newNodeResponse(StreamInput in) throws IOException {
return new NodesReloadSecureSettingsResponse.NodeResponse(in);
}
@Override
@ -113,20 +113,15 @@ public class TransportNodesReloadSecureSettingsAction extends TransportNodesActi
NodesReloadSecureSettingsRequest request;
public NodeRequest() {
public NodeRequest(StreamInput in) throws IOException {
super(in);
request = new NodesReloadSecureSettingsRequest(in);
}
NodeRequest(NodesReloadSecureSettingsRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesReloadSecureSettingsRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -91,7 +91,28 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private AdaptiveSelectionStats adaptiveSelectionStats;
NodeStats() {
public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
if (in.readBoolean()) {
indices = NodeIndicesStats.readIndicesStats(in);
}
os = in.readOptionalWriteable(OsStats::new);
process = in.readOptionalWriteable(ProcessStats::new);
jvm = in.readOptionalWriteable(JvmStats::new);
threadPool = in.readOptionalWriteable(ThreadPoolStats::new);
fs = in.readOptionalWriteable(FsInfo::new);
transport = in.readOptionalWriteable(TransportStats::new);
http = in.readOptionalWriteable(HttpStats::new);
breaker = in.readOptionalWriteable(AllCircuitBreakerStats::new);
scriptStats = in.readOptionalWriteable(ScriptStats::new);
discoveryStats = in.readOptionalWriteable(DiscoveryStats::new);
ingestStats = in.readOptionalWriteable(IngestStats::new);
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
} else {
adaptiveSelectionStats = null;
}
}
public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
@ -211,37 +232,6 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
return adaptiveSelectionStats;
}
public static NodeStats readNodeStats(StreamInput in) throws IOException {
NodeStats nodeInfo = new NodeStats();
nodeInfo.readFrom(in);
return nodeInfo;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
timestamp = in.readVLong();
if (in.readBoolean()) {
indices = NodeIndicesStats.readIndicesStats(in);
}
os = in.readOptionalWriteable(OsStats::new);
process = in.readOptionalWriteable(ProcessStats::new);
jvm = in.readOptionalWriteable(JvmStats::new);
threadPool = in.readOptionalWriteable(ThreadPoolStats::new);
fs = in.readOptionalWriteable(FsInfo::new);
transport = in.readOptionalWriteable(TransportStats::new);
http = in.readOptionalWriteable(HttpStats::new);
breaker = in.readOptionalWriteable(AllCircuitBreakerStats::new);
scriptStats = in.readOptionalWriteable(ScriptStats::new);
discoveryStats = in.readOptionalWriteable(DiscoveryStats::new);
ingestStats = in.readOptionalWriteable(IngestStats::new);
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
} else {
adaptiveSelectionStats = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.cluster.node.stats;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class NodesStatsAction extends StreamableResponseActionType<NodesStatsResponse> {
public class NodesStatsAction extends ActionType<NodesStatsResponse> {
public static final NodesStatsAction INSTANCE = new NodesStatsAction();
public static final String NAME = "cluster:monitor/nodes/stats";
private NodesStatsAction() {
super(NAME);
}
@Override
public NodesStatsResponse newResponse() {
return new NodesStatsResponse();
super(NAME, NodesStatsResponse::new);
}
}

View File

@ -47,6 +47,28 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
private boolean adaptiveSelection;
public NodesStatsRequest() {
super((String[]) null);
}
public NodesStatsRequest(StreamInput in) throws IOException {
super(in);
indices = new CommonStatsFlags(in);
os = in.readBoolean();
process = in.readBoolean();
jvm = in.readBoolean();
threadPool = in.readBoolean();
fs = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
breaker = in.readBoolean();
script = in.readBoolean();
discovery = in.readBoolean();
ingest = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
adaptiveSelection = in.readBoolean();
} else {
adaptiveSelection = false;
}
}
/**
@ -281,28 +303,6 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = new CommonStatsFlags(in);
os = in.readBoolean();
process = in.readBoolean();
jvm = in.readBoolean();
threadPool = in.readBoolean();
fs = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
breaker = in.readBoolean();
script = in.readBoolean();
discovery = in.readBoolean();
ingest = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
adaptiveSelection = in.readBoolean();
} else {
adaptiveSelection = false;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -34,7 +34,8 @@ import java.util.List;
public class NodesStatsResponse extends BaseNodesResponse<NodeStats> implements ToXContentFragment {
NodesStatsResponse() {
public NodesStatsResponse(StreamInput in) throws IOException {
super(in);
}
public NodesStatsResponse(ClusterName clusterName, List<NodeStats> nodes, List<FailedNodeException> failures) {
@ -43,7 +44,7 @@ public class NodesStatsResponse extends BaseNodesResponse<NodeStats> implements
@Override
protected List<NodeStats> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeStats::readNodeStats);
return in.readList(NodeStats::new);
}
@Override

View File

@ -60,8 +60,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
}
@Override
protected NodeStats newNodeResponse() {
return new NodeStats();
protected NodeStats newNodeResponse(StreamInput in) throws IOException {
return new NodeStats(in);
}
@Override
@ -76,20 +76,15 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
NodesStatsRequest request;
public NodeStatsRequest() {
public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
request = new NodesStatsRequest(in);
}
NodeStatsRequest(NodesStatsRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesStatsRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -36,13 +35,11 @@ public class NodeUsage extends BaseNodeResponse implements ToXContentFragment {
private long sinceTime;
private Map<String, Long> restUsage;
NodeUsage() {
}
public static NodeUsage readNodeStats(StreamInput in) throws IOException {
NodeUsage nodeInfo = new NodeUsage();
nodeInfo.readFrom(in);
return nodeInfo;
public NodeUsage(StreamInput in) throws IOException {
super(in);
timestamp = in.readLong();
sinceTime = in.readLong();
restUsage = (Map<String, Long>) in.readGenericValue();
}
/**
@ -96,15 +93,6 @@ public class NodeUsage extends BaseNodeResponse implements ToXContentFragment {
return builder;
}
@SuppressWarnings("unchecked")
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
timestamp = in.readLong();
sinceTime = in.readLong();
restUsage = (Map<String, Long>) in.readGenericValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -19,20 +19,14 @@
package org.elasticsearch.action.admin.cluster.node.usage;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class NodesUsageAction extends StreamableResponseActionType<NodesUsageResponse> {
public class NodesUsageAction extends ActionType<NodesUsageResponse> {
public static final NodesUsageAction INSTANCE = new NodesUsageAction();
public static final String NAME = "cluster:monitor/nodes/usage";
protected NodesUsageAction() {
super(NAME);
super(NAME, NodesUsageResponse::new);
}
@Override
public NodesUsageResponse newResponse() {
return new NodesUsageResponse();
}
}

View File

@ -29,8 +29,9 @@ public class NodesUsageRequest extends BaseNodesRequest<NodesUsageRequest> {
private boolean restActions;
public NodesUsageRequest() {
super();
public NodesUsageRequest(StreamInput in) throws IOException {
super(in);
this.restActions = in.readBoolean();
}
/**
@ -72,12 +73,6 @@ public class NodesUsageRequest extends BaseNodesRequest<NodesUsageRequest> {
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.restActions = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -38,7 +38,8 @@ import java.util.List;
*/
public class NodesUsageResponse extends BaseNodesResponse<NodeUsage> implements ToXContentFragment {
NodesUsageResponse() {
public NodesUsageResponse(StreamInput in) throws IOException {
super(in);
}
public NodesUsageResponse(ClusterName clusterName, List<NodeUsage> nodes, List<FailedNodeException> failures) {
@ -47,7 +48,7 @@ public class NodesUsageResponse extends BaseNodesResponse<NodeUsage> implements
@Override
protected List<NodeUsage> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeUsage::readNodeStats);
return in.readList(NodeUsage::new);
}
@Override

View File

@ -58,8 +58,8 @@ public class TransportNodesUsageAction
}
@Override
protected NodeUsage newNodeResponse() {
return new NodeUsage();
protected NodeUsage newNodeResponse(StreamInput in) throws IOException {
return new NodeUsage(in);
}
@Override
@ -72,20 +72,15 @@ public class TransportNodesUsageAction
NodesUsageRequest request;
public NodeUsageRequest() {
public NodeUsageRequest(StreamInput in) throws IOException {
super(in);
request = new NodesUsageRequest(in);
}
NodeUsageRequest(NodesUsageRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesUsageRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.snapshots.status;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -56,6 +57,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
TransportNodesSnapshotsStatus.NodeSnapshotStatus> {
public static final String ACTION_NAME = SnapshotsStatusAction.NAME + "[nodes]";
public static final ActionType<NodesSnapshotStatus> TYPE = new ActionType<>(ACTION_NAME, NodesSnapshotStatus::new);
private final SnapshotShardsService snapshotShardsService;
@ -73,8 +75,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
}
@Override
protected NodeSnapshotStatus newNodeResponse() {
return new NodeSnapshotStatus();
protected NodeSnapshotStatus newNodeResponse(StreamInput in) throws IOException {
return new NodeSnapshotStatus(in);
}
@Override
@ -118,7 +120,10 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
private Snapshot[] snapshots;
public Request() {
public Request(StreamInput in) throws IOException {
super(in);
// This operation is never executed remotely
throw new UnsupportedOperationException("shouldn't be here");
}
public Request(String[] nodesIds) {
@ -130,12 +135,6 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
// This operation is never executed remotely
throw new UnsupportedOperationException("shouldn't be here");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
// This operation is never executed remotely
@ -145,13 +144,17 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public static class NodesSnapshotStatus extends BaseNodesResponse<NodeSnapshotStatus> {
public NodesSnapshotStatus(StreamInput in) throws IOException {
super(in);
}
public NodesSnapshotStatus(ClusterName clusterName, List<NodeSnapshotStatus> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeSnapshotStatus> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeSnapshotStatus::new);
return in.readList(NodeSnapshotStatus::new);
}
@Override
@ -165,19 +168,15 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
private List<Snapshot> snapshots;
public NodeRequest() {
public NodeRequest(StreamInput in) throws IOException {
super(in);
snapshots = in.readList(Snapshot::new);
}
NodeRequest(TransportNodesSnapshotsStatus.Request request) {
snapshots = Arrays.asList(request.snapshots);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
snapshots = in.readList(Snapshot::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -189,21 +188,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
private Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status;
NodeSnapshotStatus() {
}
public NodeSnapshotStatus(DiscoveryNode node, Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status) {
super(node);
this.status = status;
}
public Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status() {
return status;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public NodeSnapshotStatus(StreamInput in) throws IOException {
super(in);
int numberOfSnapshots = in.readVInt();
Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> snapshotMapBuilder = new HashMap<>(numberOfSnapshots);
for (int i = 0; i < numberOfSnapshots; i++) {
@ -220,6 +206,15 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
status = unmodifiableMap(snapshotMapBuilder);
}
public NodeSnapshotStatus(DiscoveryNode node, Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status) {
super(node);
this.status = status;
}
public Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status() {
return status;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.cluster.stats;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class ClusterStatsAction extends StreamableResponseActionType<ClusterStatsResponse> {
public class ClusterStatsAction extends ActionType<ClusterStatsResponse> {
public static final ClusterStatsAction INSTANCE = new ClusterStatsAction();
public static final String NAME = "cluster:monitor/stats";
private ClusterStatsAction() {
super(NAME);
}
@Override
public ClusterStatsResponse newResponse() {
return new ClusterStatsResponse();
super(NAME, ClusterStatsResponse::new);
}
}

View File

@ -38,7 +38,19 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
private ShardStats[] shardsStats;
private ClusterHealthStatus clusterStatus;
ClusterStatsNodeResponse() {
public ClusterStatsNodeResponse(StreamInput in) throws IOException {
super(in);
clusterStatus = null;
if (in.readBoolean()) {
clusterStatus = ClusterHealthStatus.fromValue(in.readByte());
}
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
int size = in.readVInt();
shardsStats = new ShardStats[size];
for (int i = 0; i < size; i++) {
shardsStats[i] = ShardStats.readShardStats(in);
}
}
public ClusterStatsNodeResponse(DiscoveryNode node, @Nullable ClusterHealthStatus clusterStatus,
@ -71,25 +83,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
}
public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
ClusterStatsNodeResponse nodeResponse = new ClusterStatsNodeResponse();
nodeResponse.readFrom(in);
return nodeResponse;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterStatus = null;
if (in.readBoolean()) {
clusterStatus = ClusterHealthStatus.fromValue(in.readByte());
}
this.nodeInfo = NodeInfo.readNodeInfo(in);
this.nodeStats = NodeStats.readNodeStats(in);
int size = in.readVInt();
shardsStats = new ShardStats[size];
for (int i = 0; i < size; i++) {
shardsStats[i] = ShardStats.readShardStats(in);
}
return new ClusterStatsNodeResponse(in);
}
@Override

View File

@ -30,7 +30,8 @@ import java.io.IOException;
*/
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {
public ClusterStatsRequest() {
public ClusterStatsRequest(StreamInput in) throws IOException {
super(in);
}
/**
@ -41,11 +42,6 @@ public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {
super(nodesIds);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -42,7 +42,11 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
long timestamp;
String clusterUUID;
ClusterStatsResponse() {
public ClusterStatsResponse(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
// it may be that the master switched on us while doing the operation. In this case the status may be null.
status = in.readOptionalWriteable(ClusterHealthStatus::readFrom);
}
public ClusterStatsResponse(long timestamp,
@ -84,14 +88,6 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
return indicesStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
timestamp = in.readVLong();
// it may be that the master switched on us while doing the operation. In this case the status may be null.
status = in.readOptionalWriteable(ClusterHealthStatus::readFrom);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -86,8 +86,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
}
@Override
protected ClusterStatsNodeResponse newNodeResponse() {
return new ClusterStatsNodeResponse();
protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new ClusterStatsNodeResponse(in);
}
@Override
@ -139,20 +139,15 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
ClusterStatsRequest request;
public ClusterStatsNodeRequest() {
public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
request = new ClusterStatsRequest(in);
}
ClusterStatsNodeRequest(ClusterStatsRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new ClusterStatsRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -31,14 +31,18 @@ public abstract class BaseNodeRequest extends TransportRequest {
public BaseNodeRequest() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public BaseNodeRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().before(Version.V_7_3_0)) {
in.readString(); // previously nodeId
}
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -33,7 +33,9 @@ public abstract class BaseNodeResponse extends TransportResponse {
private DiscoveryNode node;
protected BaseNodeResponse() {
protected BaseNodeResponse(StreamInput in) throws IOException {
super(in);
node = new DiscoveryNode(in);
}
protected BaseNodeResponse(DiscoveryNode node) {
@ -49,9 +51,8 @@ public abstract class BaseNodeResponse extends TransportResponse {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
node = new DiscoveryNode(in);
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -51,8 +51,11 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
private TimeValue timeout;
protected BaseNodesRequest() {
protected BaseNodesRequest(StreamInput in) throws IOException {
super(in);
nodesIds = in.readStringArray();
concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new);
timeout = in.readOptionalTimeValue();
}
protected BaseNodesRequest(String... nodesIds) {
@ -103,11 +106,8 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodesIds = in.readStringArray();
concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new);
timeout = in.readOptionalTimeValue();
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -38,7 +38,11 @@ public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse>
private List<TNodeResponse> nodes;
private Map<String, TNodeResponse> nodesMap;
protected BaseNodesResponse() {
protected BaseNodesResponse(StreamInput in) throws IOException {
super(in);
clusterName = new ClusterName(in);
nodes = readNodesFrom(in);
failures = in.readList(FailedNodeException::new);
}
protected BaseNodesResponse(ClusterName clusterName, List<TNodeResponse> nodes, List<FailedNodeException> failures) {
@ -101,11 +105,8 @@ public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse>
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = new ClusterName(in);
nodes = readNodesFrom(in);
failures = in.readList(FailedNodeException::new);
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeShouldNotConnectException;
@ -46,7 +47,6 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest<NodesRequest>,
NodesResponse extends BaseNodesResponse,
@ -63,9 +63,9 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
protected TransportNodesAction(String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Supplier<NodesRequest> request, Supplier<NodeRequest> nodeRequest, String nodeExecutor,
Writeable.Reader<NodesRequest> request, Writeable.Reader<NodeRequest> nodeRequest, String nodeExecutor,
Class<NodeResponse> nodeResponseClass) {
super(actionName, transportService, request, actionFilters);
super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
@ -74,7 +74,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
this.transportNodeAction = actionName + "[n]";
transportService.registerRequestHandler(
transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());
transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}
@Override
@ -121,7 +121,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
protected abstract NodeRequest newNodeRequest(NodesRequest request);
protected abstract NodeResponse newNodeResponse();
protected abstract NodeResponse newNodeResponse(StreamInput in) throws IOException;
protected abstract NodeResponse nodeOperation(NodeRequest request);
@ -183,9 +183,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
NodeResponse nodeResponse = newNodeResponse();
nodeResponse.readFrom(in);
return nodeResponse;
return newNodeResponse(in);
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.gateway;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
@ -49,6 +50,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
TransportNodesListGatewayMetaState.NodeGatewayMetaState> {
public static final String ACTION_NAME = "internal:gateway/local/meta_state";
public static final ActionType<NodesGatewayMetaState> TYPE = new ActionType<>(ACTION_NAME, NodesGatewayMetaState::new);
private final GatewayMetaState metaState;
@ -72,8 +74,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
}
@Override
protected NodeGatewayMetaState newNodeResponse() {
return new NodeGatewayMetaState();
protected NodeGatewayMetaState newNodeResponse(StreamInput in) throws IOException {
return new NodeGatewayMetaState(in);
}
@Override
@ -88,7 +90,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
public static class Request extends BaseNodesRequest<Request> {
public Request() {
public Request(StreamInput in) throws IOException {
super(in);
}
public Request(String... nodesIds) {
@ -98,13 +101,17 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
public static class NodesGatewayMetaState extends BaseNodesResponse<NodeGatewayMetaState> {
public NodesGatewayMetaState(StreamInput in) throws IOException {
super(in);
}
public NodesGatewayMetaState(ClusterName clusterName, List<NodeGatewayMetaState> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeGatewayMetaState> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeGatewayMetaState::new);
return in.readList(NodeGatewayMetaState::new);
}
@Override
@ -114,13 +121,21 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
}
public static class NodeRequest extends BaseNodeRequest {
NodeRequest() {}
NodeRequest(StreamInput in) throws IOException {
super(in);
}
}
public static class NodeGatewayMetaState extends BaseNodeResponse {
private MetaData metaData;
NodeGatewayMetaState() {
public NodeGatewayMetaState(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
metaData = MetaData.readFrom(in);
}
}
public NodeGatewayMetaState(DiscoveryNode node, MetaData metaData) {
@ -132,14 +147,6 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
return metaData;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
metaData = MetaData.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -68,6 +69,8 @@ public class TransportNodesListGatewayStartedShards extends
TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> {
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
public static final ActionType<NodesGatewayStartedShards> TYPE = new ActionType<>(ACTION_NAME, NodesGatewayStartedShards::new);
private final Settings settings;
private final NodeEnvironment nodeEnv;
private final IndicesService indicesService;
@ -98,8 +101,8 @@ public class TransportNodesListGatewayStartedShards extends
}
@Override
protected NodeGatewayStartedShards newNodeResponse() {
return new NodeGatewayStartedShards();
protected NodeGatewayStartedShards newNodeResponse(StreamInput in) throws IOException {
return new NodeGatewayStartedShards(in);
}
@Override
@ -171,7 +174,9 @@ public class TransportNodesListGatewayStartedShards extends
private ShardId shardId;
public Request() {
public Request(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
}
public Request(ShardId shardId, DiscoveryNode[] nodes) {
@ -184,12 +189,6 @@ public class TransportNodesListGatewayStartedShards extends
return this.shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -199,6 +198,10 @@ public class TransportNodesListGatewayStartedShards extends
public static class NodesGatewayStartedShards extends BaseNodesResponse<NodeGatewayStartedShards> {
public NodesGatewayStartedShards(StreamInput in) throws IOException {
super(in);
}
public NodesGatewayStartedShards(ClusterName clusterName, List<NodeGatewayStartedShards> nodes,
List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
@ -206,7 +209,7 @@ public class TransportNodesListGatewayStartedShards extends
@Override
protected List<NodeGatewayStartedShards> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeGatewayStartedShards::new);
return in.readList(NodeGatewayStartedShards::new);
}
@Override
@ -220,19 +223,15 @@ public class TransportNodesListGatewayStartedShards extends
private ShardId shardId;
public NodeRequest() {
public NodeRequest(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
}
public NodeRequest(Request request) {
this.shardId = request.shardId();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -250,7 +249,17 @@ public class TransportNodesListGatewayStartedShards extends
private boolean primary = false;
private Exception storeException = null;
public NodeGatewayStartedShards() {
public NodeGatewayStartedShards(StreamInput in) throws IOException {
super(in);
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
// legacy version
in.readLong();
}
allocationId = in.readOptionalString();
primary = in.readBoolean();
if (in.readBoolean()) {
storeException = in.readException();
}
}
public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary) {
@ -276,20 +285,6 @@ public class TransportNodesListGatewayStartedShards extends
return this.storeException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
// legacy version
in.readLong();
}
allocationId = in.readOptionalString();
primary = in.readBoolean();
if (in.readBoolean()) {
storeException = in.readException();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.store;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -66,6 +67,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> {
public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store";
public static final ActionType<NodesStoreFilesMetaData> TYPE = new ActionType<>(ACTION_NAME, NodesStoreFilesMetaData::new);
private final Settings settings;
private final IndicesService indicesService;
@ -96,8 +98,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
}
@Override
protected NodeStoreFilesMetaData newNodeResponse() {
return new NodeStoreFilesMetaData();
protected NodeStoreFilesMetaData newNodeResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetaData(in);
}
@Override
@ -244,7 +246,9 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
private ShardId shardId;
public Request() {
public Request(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
}
public Request(ShardId shardId, DiscoveryNode[] nodes) {
@ -252,12 +256,6 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
this.shardId = shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -267,6 +265,10 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
public static class NodesStoreFilesMetaData extends BaseNodesResponse<NodeStoreFilesMetaData> {
public NodesStoreFilesMetaData(StreamInput in) throws IOException {
super(in);
}
public NodesStoreFilesMetaData(ClusterName clusterName, List<NodeStoreFilesMetaData> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@ -287,19 +289,15 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
private ShardId shardId;
public NodeRequest() {
public NodeRequest(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
}
NodeRequest(TransportNodesListShardStoreMetaData.Request request) {
this.shardId = request.shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -311,7 +309,9 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
private StoreFilesMetaData storeFilesMetaData;
NodeStoreFilesMetaData() {
public NodeStoreFilesMetaData(StreamInput in) throws IOException {
super(in);
storeFilesMetaData = StoreFilesMetaData.readStoreFilesMetaData(in);
}
public NodeStoreFilesMetaData(DiscoveryNode node, StoreFilesMetaData storeFilesMetaData) {
@ -324,15 +324,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
}
public static NodeStoreFilesMetaData readListShardStoreNodeOperationResponse(StreamInput in) throws IOException {
NodeStoreFilesMetaData resp = new NodeStoreFilesMetaData();
resp.readFrom(in);
return resp;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
storeFilesMetaData = StoreFilesMetaData.readStoreFilesMetaData(in);
return new NodeStoreFilesMetaData(in);
}
@Override

View File

@ -58,7 +58,7 @@ public class NodeStatsTests extends ESTestCase {
try (BytesStreamOutput out = new BytesStreamOutput()) {
nodeStats.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
NodeStats deserializedNodeStats = NodeStats.readNodeStats(in);
NodeStats deserializedNodeStats = new NodeStats(in);
assertEquals(nodeStats.getNode(), deserializedNodeStats.getNode());
assertEquals(nodeStats.getTimestamp(), deserializedNodeStats.getTimestamp());
if (nodeStats.getOs() == null) {

View File

@ -65,14 +65,13 @@ public class CancellableTasksTests extends TaskManagerTestCase {
super();
}
public CancellableNodeRequest(CancellableNodesRequest request) {
requestName = request.requestName;
public CancellableNodeRequest(StreamInput in) throws IOException {
super(in);
requestName = in.readString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
public CancellableNodeRequest(CancellableNodesRequest request) {
requestName = request.requestName;
}
@Override
@ -100,8 +99,9 @@ public class CancellableTasksTests extends TaskManagerTestCase {
public static class CancellableNodesRequest extends BaseNodesRequest<CancellableNodesRequest> {
private String requestName;
private CancellableNodesRequest() {
super();
private CancellableNodesRequest(StreamInput in) throws IOException {
super(in);
requestName = in.readString();
}
public CancellableNodesRequest(String requestName, String... nodesIds) {
@ -109,12 +109,6 @@ public class CancellableTasksTests extends TaskManagerTestCase {
this.requestName = requestName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -60,7 +61,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -101,8 +101,8 @@ public abstract class TaskManagerTestCase extends ESTestCase {
static class NodeResponse extends BaseNodeResponse {
protected NodeResponse() {
super();
protected NodeResponse(StreamInput in) throws IOException {
super(in);
}
protected NodeResponse(DiscoveryNode node) {
@ -118,7 +118,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
@Override
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeResponse::new);
return in.readList(NodeResponse::new);
}
@Override
@ -138,8 +138,8 @@ public abstract class TaskManagerTestCase extends ESTestCase {
extends TransportNodesAction<NodesRequest, NodesResponse, NodeRequest, NodeResponse> {
AbstractTestNodesAction(String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, Supplier<NodesRequest> request,
Supplier<NodeRequest> nodeRequest) {
ClusterService clusterService, TransportService transportService, Writeable.Reader<NodesRequest> request,
Writeable.Reader<NodeRequest> nodeRequest) {
super(actionName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()),
request, nodeRequest, ThreadPool.Names.GENERIC, NodeResponse.class);
@ -151,8 +151,8 @@ public abstract class TaskManagerTestCase extends ESTestCase {
}
@Override
protected NodeResponse newNodeResponse() {
return new NodeResponse();
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeResponse(in);
}
@Override

View File

@ -18,14 +18,13 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -128,8 +127,8 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
public static class NodeResponse extends BaseNodeResponse {
protected NodeResponse() {
super();
public NodeResponse(StreamInput in) throws IOException {
super(in);
}
public NodeResponse(DiscoveryNode node) {
@ -139,8 +138,8 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
public static class NodesResponse extends BaseNodesResponse<NodeResponse> implements ToXContentFragment {
NodesResponse() {
public NodesResponse(StreamInput in) throws IOException {
super(in);
}
public NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
@ -149,7 +148,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
@Override
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeResponse::new);
return in.readList(NodeResponse::new);
}
@Override
@ -172,8 +171,10 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
protected String requestName;
protected boolean shouldBlock;
public NodeRequest() {
super();
public NodeRequest(StreamInput in) throws IOException {
super(in);
requestName = in.readString();
shouldBlock = in.readBoolean();
}
public NodeRequest(NodesRequest request, boolean shouldBlock) {
@ -181,13 +182,6 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
this.shouldBlock = shouldBlock;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
shouldBlock = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -212,8 +206,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
private boolean shouldBlock = true;
private boolean shouldFail = false;
NodesRequest() {
super();
NodesRequest(StreamInput in) throws IOException {
super(in);
requestName = in.readString();
shouldStoreResult = in.readBoolean();
shouldBlock = in.readBoolean();
shouldFail = in.readBoolean();
}
public NodesRequest(String requestName, String... nodesIds) {
@ -246,15 +244,6 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
return shouldFail;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
shouldStoreResult = in.readBoolean();
shouldBlock = in.readBoolean();
shouldFail = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -302,8 +291,8 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
}
@Override
protected NodeResponse newNodeResponse() {
return new NodeResponse();
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeResponse(in);
}
@Override
@ -337,18 +326,13 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
}
public static class TestTaskAction extends StreamableResponseActionType<NodesResponse> {
public static class TestTaskAction extends ActionType<NodesResponse> {
public static final TestTaskAction INSTANCE = new TestTaskAction();
public static final String NAME = "cluster:admin/tasks/test";
private TestTaskAction() {
super(NAME);
}
@Override
public NodesResponse newResponse() {
return new NodesResponse();
super(NAME, NodesResponse::new);
}
}

View File

@ -80,20 +80,15 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
public NodeRequest() {
super();
public NodeRequest(StreamInput in) throws IOException {
super(in);
requestName = in.readString();
}
public NodeRequest(NodesRequest request) {
requestName = request.requestName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -114,8 +109,9 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
private String requestName;
NodesRequest() {
super();
NodesRequest(StreamInput in) throws IOException {
super(in);
requestName = in.readString();
}
public NodesRequest(String requestName, String... nodesIds) {
@ -123,12 +119,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
this.requestName = requestName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -161,8 +151,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
}
@Override
protected NodeResponse newNodeResponse() {
return new NodeResponse();
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeResponse(in);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
@ -56,6 +57,7 @@ import java.util.function.Supplier;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.mockito.Mockito.mock;
public class TransportNodesActionTests extends ESTestCase {
@ -244,8 +246,8 @@ public class TransportNodesActionTests extends ESTestCase {
extends TransportNodesAction<TestNodesRequest, TestNodesResponse, TestNodeRequest, TestNodeResponse> {
TestTransportNodesAction(ThreadPool threadPool, ClusterService clusterService, TransportService
transportService, ActionFilters actionFilters, Supplier<TestNodesRequest> request,
Supplier<TestNodeRequest> nodeRequest, String nodeExecutor) {
transportService, ActionFilters actionFilters, Writeable.Reader<TestNodesRequest> request,
Writeable.Reader<TestNodeRequest> nodeRequest, String nodeExecutor) {
super("indices:admin/test", threadPool, clusterService, transportService, actionFilters,
request, nodeRequest, nodeExecutor, TestNodeResponse.class);
}
@ -262,8 +264,8 @@ public class TransportNodesActionTests extends ESTestCase {
}
@Override
protected TestNodeResponse newNodeResponse() {
return new TestNodeResponse();
protected TestNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new TestNodeResponse(in);
}
@Override
@ -277,8 +279,8 @@ public class TransportNodesActionTests extends ESTestCase {
extends TestTransportNodesAction {
DataNodesOnlyTransportNodesAction(ThreadPool threadPool, ClusterService clusterService, TransportService
transportService, ActionFilters actionFilters, Supplier<TestNodesRequest> request,
Supplier<TestNodeRequest> nodeRequest, String nodeExecutor) {
transportService, ActionFilters actionFilters, Writeable.Reader<TestNodesRequest> request,
Writeable.Reader<TestNodeRequest> nodeRequest, String nodeExecutor) {
super(threadPool, clusterService, transportService, actionFilters, request, nodeRequest, nodeExecutor);
}
@ -289,6 +291,9 @@ public class TransportNodesActionTests extends ESTestCase {
}
private static class TestNodesRequest extends BaseNodesRequest<TestNodesRequest> {
TestNodesRequest(StreamInput in) throws IOException {
super(in);
}
TestNodesRequest(String... nodesIds) {
super(nodesIds);
}
@ -306,7 +311,7 @@ public class TransportNodesActionTests extends ESTestCase {
@Override
protected List<TestNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(TestNodeResponse::new);
return in.readList(TestNodeResponse::new);
}
@Override
@ -315,10 +320,30 @@ public class TransportNodesActionTests extends ESTestCase {
}
}
private static class TestNodeRequest extends BaseNodeRequest { }
private static class TestNodeRequest extends BaseNodeRequest {
TestNodeRequest() {}
TestNodeRequest(StreamInput in) throws IOException {
super(in);
}
}
private static class TestNodeResponse extends BaseNodeResponse { }
private static class TestNodeResponse extends BaseNodeResponse {
TestNodeResponse() {
super(mock(DiscoveryNode.class));
}
protected TestNodeResponse(StreamInput in) throws IOException {
super(in);
}
}
private static class OtherNodeResponse extends BaseNodeResponse { }
private static class OtherNodeResponse extends BaseNodeResponse {
OtherNodeResponse() {
super(mock(DiscoveryNode.class));
}
protected OtherNodeResponse(StreamInput in) throws IOException {
super(in);
}
}
}

View File

@ -64,7 +64,7 @@ public class NodeInfoStreamingTests extends ESTestCase {
try (BytesStreamOutput out = new BytesStreamOutput()) {
nodeInfo.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
NodeInfo readNodeInfo = NodeInfo.readNodeInfo(in);
NodeInfo readNodeInfo = new NodeInfo(in);
assertExpectedUnchanged(nodeInfo, readNodeInfo);
}
}

View File

@ -7,7 +7,6 @@
package org.elasticsearch.xpack.core.deprecation;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
@ -24,35 +23,26 @@ import java.util.Objects;
* Runs deprecation checks on each node. Deprecation checks are performed locally so that filtered settings
* can be accessed in the deprecation checks.
*/
public class NodesDeprecationCheckAction extends StreamableResponseActionType<NodesDeprecationCheckResponse> {
public class NodesDeprecationCheckAction extends ActionType<NodesDeprecationCheckResponse> {
public static final NodesDeprecationCheckAction INSTANCE = new NodesDeprecationCheckAction();
public static final String NAME = "cluster:admin/xpack/deprecation/nodes/info";
private NodesDeprecationCheckAction() {
super(NAME);
}
@Override
public NodesDeprecationCheckResponse newResponse() {
return new NodesDeprecationCheckResponse();
super(NAME, NodesDeprecationCheckResponse::new);
}
public static class NodeRequest extends BaseNodeRequest {
NodesDeprecationCheckRequest request;
public NodeRequest() {}
public NodeRequest(StreamInput in) throws IOException {
super(in);
request = new NodesDeprecationCheckRequest(in);
}
public NodeRequest(NodesDeprecationCheckRequest request) {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesDeprecationCheckRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -63,8 +53,9 @@ public class NodesDeprecationCheckAction extends StreamableResponseActionType<No
public static class NodeResponse extends BaseNodeResponse {
private List<DeprecationIssue> deprecationIssues;
public NodeResponse() {
super();
public NodeResponse(StreamInput in) throws IOException {
super(in);
deprecationIssues = in.readList(DeprecationIssue::new);
}
public NodeResponse(DiscoveryNode node, List<DeprecationIssue> deprecationIssues) {
@ -72,24 +63,12 @@ public class NodesDeprecationCheckAction extends StreamableResponseActionType<No
this.deprecationIssues = deprecationIssues;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
deprecationIssues = in.readList(DeprecationIssue::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(this.deprecationIssues);
}
public static NodeResponse readNodeResponse(StreamInput in) throws IOException {
NodeResponse nodeResponse = new NodeResponse();
nodeResponse.readFrom(in);
return nodeResponse;
}
public List<DeprecationIssue> getDeprecationIssues() {
return deprecationIssues;
}

View File

@ -15,17 +15,14 @@ import java.util.Arrays;
import java.util.Objects;
public class NodesDeprecationCheckRequest extends BaseNodesRequest<NodesDeprecationCheckRequest> {
public NodesDeprecationCheckRequest() {}
public NodesDeprecationCheckRequest(StreamInput in) throws IOException {
super(in);
}
public NodesDeprecationCheckRequest(String... nodesIds) {
super(nodesIds);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -18,7 +18,9 @@ import java.util.Objects;
public class NodesDeprecationCheckResponse extends BaseNodesResponse<NodesDeprecationCheckAction.NodeResponse> {
public NodesDeprecationCheckResponse() {}
public NodesDeprecationCheckResponse(StreamInput in) throws IOException {
super(in);
}
public NodesDeprecationCheckResponse(ClusterName clusterName,
List<NodesDeprecationCheckAction.NodeResponse> nodes,
@ -28,7 +30,7 @@ public class NodesDeprecationCheckResponse extends BaseNodesResponse<NodesDeprec
@Override
protected List<NodesDeprecationCheckAction.NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodesDeprecationCheckAction.NodeResponse::readNodeResponse);
return in.readList(NodesDeprecationCheckAction.NodeResponse::new);
}
@Override

View File

@ -5,19 +5,14 @@
*/
package org.elasticsearch.xpack.core.security.action.realm;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class ClearRealmCacheAction extends StreamableResponseActionType<ClearRealmCacheResponse> {
public class ClearRealmCacheAction extends ActionType<ClearRealmCacheResponse> {
public static final ClearRealmCacheAction INSTANCE = new ClearRealmCacheAction();
public static final String NAME = "cluster:admin/xpack/security/realm/cache/clear";
protected ClearRealmCacheAction() {
super(NAME);
}
@Override
public ClearRealmCacheResponse newResponse() {
return new ClearRealmCacheResponse();
super(NAME, ClearRealmCacheResponse::new);
}
}

View File

@ -17,6 +17,17 @@ public class ClearRealmCacheRequest extends BaseNodesRequest<ClearRealmCacheRequ
String[] realms;
String[] usernames;
public ClearRealmCacheRequest() {
super((String[]) null);
}
public ClearRealmCacheRequest(StreamInput in) throws IOException {
super(in);
realms = in.readStringArray();
usernames = in.readStringArray();
}
/**
* @return {@code true} if this request targets realms, {@code false} otherwise.
*/
@ -67,13 +78,6 @@ public class ClearRealmCacheRequest extends BaseNodesRequest<ClearRealmCacheRequ
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
realms = in.readStringArray();
usernames = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -86,7 +90,10 @@ public class ClearRealmCacheRequest extends BaseNodesRequest<ClearRealmCacheRequ
private String[] realms;
private String[] usernames;
public Node() {
public Node(StreamInput in) throws IOException {
super(in);
realms = in.readStringArray();
usernames = in.readStringArray();
}
public Node(ClearRealmCacheRequest request) {
@ -96,13 +103,6 @@ public class ClearRealmCacheRequest extends BaseNodesRequest<ClearRealmCacheRequ
public String[] getRealms() { return realms; }
public String[] getUsernames() { return usernames; }
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
realms = in.readStringArray();
usernames = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -22,7 +22,8 @@ import java.util.List;
public class ClearRealmCacheResponse extends BaseNodesResponse<ClearRealmCacheResponse.Node> implements ToXContentFragment {
public ClearRealmCacheResponse() {
public ClearRealmCacheResponse(StreamInput in) throws IOException {
super(in);
}
public ClearRealmCacheResponse(ClusterName clusterName, List<Node> nodes, List<FailedNodeException> failures) {
@ -31,7 +32,7 @@ public class ClearRealmCacheResponse extends BaseNodesResponse<ClearRealmCacheRe
@Override
protected List<ClearRealmCacheResponse.Node> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Node::readNodeResponse);
return in.readList(Node::new);
}
@Override
@ -67,18 +68,13 @@ public class ClearRealmCacheResponse extends BaseNodesResponse<ClearRealmCacheRe
public static class Node extends BaseNodeResponse {
public Node() {
public Node(StreamInput in) throws IOException {
super(in);
}
public Node(DiscoveryNode node) {
super(node);
}
public static Node readNodeResponse(StreamInput in) throws IOException {
Node node = new Node();
node.readFrom(in);
return node;
}
}
}

View File

@ -5,22 +5,17 @@
*/
package org.elasticsearch.xpack.core.security.action.role;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
/**
* The action for clearing the cache used by native roles that are stored in an index.
*/
public class ClearRolesCacheAction extends StreamableResponseActionType<ClearRolesCacheResponse> {
public class ClearRolesCacheAction extends ActionType<ClearRolesCacheResponse> {
public static final ClearRolesCacheAction INSTANCE = new ClearRolesCacheAction();
public static final String NAME = "cluster:admin/xpack/security/roles/cache/clear";
protected ClearRolesCacheAction() {
super(NAME);
}
@Override
public ClearRolesCacheResponse newResponse() {
return new ClearRolesCacheResponse();
super(NAME, ClearRolesCacheResponse::new);
}
}

View File

@ -19,6 +19,14 @@ public class ClearRolesCacheRequest extends BaseNodesRequest<ClearRolesCacheRequ
String[] names;
public ClearRolesCacheRequest() {
super((String[]) null);
}
public ClearRolesCacheRequest(StreamInput in) throws IOException {
super(in);
names = in.readOptionalStringArray();
}
/**
* Sets the roles for which caches will be evicted. When not set all the roles will be evicted from the cache.
*
@ -36,12 +44,6 @@ public class ClearRolesCacheRequest extends BaseNodesRequest<ClearRolesCacheRequ
return names;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
names = in.readOptionalStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -51,7 +53,9 @@ public class ClearRolesCacheRequest extends BaseNodesRequest<ClearRolesCacheRequ
public static class Node extends BaseNodeRequest {
private String[] names;
public Node() {
public Node(StreamInput in) throws IOException {
super(in);
names = in.readOptionalStringArray();
}
public Node(ClearRolesCacheRequest request) {
@ -60,12 +64,6 @@ public class ClearRolesCacheRequest extends BaseNodesRequest<ClearRolesCacheRequ
public String[] getNames() { return names; }
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
names = in.readOptionalStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -25,7 +25,8 @@ import java.util.List;
*/
public class ClearRolesCacheResponse extends BaseNodesResponse<ClearRolesCacheResponse.Node> implements ToXContentFragment {
public ClearRolesCacheResponse() {
public ClearRolesCacheResponse(StreamInput in) throws IOException {
super(in);
}
public ClearRolesCacheResponse(ClusterName clusterName, List<Node> nodes, List<FailedNodeException> failures) {
@ -34,7 +35,7 @@ public class ClearRolesCacheResponse extends BaseNodesResponse<ClearRolesCacheRe
@Override
protected List<ClearRolesCacheResponse.Node> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Node::readNodeResponse);
return in.readList(Node::new);
}
@Override
@ -70,17 +71,12 @@ public class ClearRolesCacheResponse extends BaseNodesResponse<ClearRolesCacheRe
public static class Node extends BaseNodeResponse {
public Node() {
public Node(StreamInput in) throws IOException {
super(in);
}
public Node(DiscoveryNode node) {
super(node);
}
public static Node readNodeResponse(StreamInput in) throws IOException {
Node node = new Node();
node.readFrom(in);
return node;
}
}
}

View File

@ -5,22 +5,17 @@
*/
package org.elasticsearch.xpack.core.watcher.transport.actions.stats;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
/**
* This ActionType gets the stats for the watcher plugin
*/
public class WatcherStatsAction extends StreamableResponseActionType<WatcherStatsResponse> {
public class WatcherStatsAction extends ActionType<WatcherStatsResponse> {
public static final WatcherStatsAction INSTANCE = new WatcherStatsAction();
public static final String NAME = "cluster:monitor/xpack/watcher/stats/dist";
private WatcherStatsAction() {
super(NAME);
}
@Override
public WatcherStatsResponse newResponse() {
return new WatcherStatsResponse();
super(NAME, WatcherStatsResponse::new);
}
}

View File

@ -23,6 +23,14 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
private boolean includeStats;
public WatcherStatsRequest() {
super((String[]) null);
}
public WatcherStatsRequest(StreamInput in) throws IOException {
super(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
includeStats = in.readBoolean();
}
public boolean includeCurrentWatches() {
@ -54,14 +62,6 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
includeStats = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -81,7 +81,12 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
private boolean includeQueuedWatches;
private boolean includeStats;
public Node() {}
public Node(StreamInput in) throws IOException {
super(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
includeStats = in.readBoolean();
}
public Node(WatcherStatsRequest request) {
includeCurrentWatches = request.includeCurrentWatches();
@ -101,14 +106,6 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
return includeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
includeStats = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -30,7 +30,9 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
private WatcherMetaData watcherMetaData;
public WatcherStatsResponse() {
public WatcherStatsResponse(StreamInput in) throws IOException {
super(in);
watcherMetaData = new WatcherMetaData(in.readBoolean());
}
public WatcherStatsResponse(ClusterName clusterName, WatcherMetaData watcherMetaData,
@ -45,15 +47,9 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
out.writeBoolean(watcherMetaData.manuallyStopped());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
watcherMetaData = new WatcherMetaData(in.readBoolean());
}
@Override
protected List<Node> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Node::readNodeResponse);
return in.readList(Node::new);
}
@Override
@ -96,7 +92,22 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
private List<QueuedWatch> queuedWatches;
private Counters stats;
public Node() {
public Node(StreamInput in) throws IOException {
super(in);
watchesCount = in.readLong();
threadPoolQueueSize = in.readLong();
threadPoolMaxSize = in.readLong();
watcherState = WatcherState.fromId(in.readByte());
if (in.readBoolean()) {
snapshots = in.readStreamableList(WatchExecutionSnapshot::new);
}
if (in.readBoolean()) {
queuedWatches = in.readStreamableList(QueuedWatch::new);
}
if (in.readBoolean()) {
stats = Counters.read(in);
}
}
public Node(DiscoveryNode node) {
@ -173,25 +184,6 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
this.stats = stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
watchesCount = in.readLong();
threadPoolQueueSize = in.readLong();
threadPoolMaxSize = in.readLong();
watcherState = WatcherState.fromId(in.readByte());
if (in.readBoolean()) {
snapshots = in.readStreamableList(WatchExecutionSnapshot::new);
}
if (in.readBoolean()) {
queuedWatches = in.readStreamableList(QueuedWatch::new);
}
if (in.readBoolean()) {
stats = Counters.read(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -247,12 +239,5 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
builder.endObject();
return builder;
}
static WatcherStatsResponse.Node readNodeResponse(StreamInput in)
throws IOException {
WatcherStatsResponse.Node node = new WatcherStatsResponse.Node();
node.readFrom(in);
return node;
}
}
}

View File

@ -6,16 +6,17 @@
package org.elasticsearch.xpack.core.deprecation;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
public class NodesDeprecationCheckRequestTests
extends AbstractStreamableTestCase<NodesDeprecationCheckRequest> {
extends AbstractWireSerializingTestCase<NodesDeprecationCheckRequest> {
@Override
protected NodesDeprecationCheckRequest createBlankInstance() {
return new NodesDeprecationCheckRequest();
protected Writeable.Reader<NodesDeprecationCheckRequest> instanceReader() {
return NodesDeprecationCheckRequest::new;
}
@Override

View File

@ -10,8 +10,9 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.net.InetAddress;
@ -21,11 +22,11 @@ import java.util.Collections;
import java.util.List;
public class NodesDeprecationCheckResponseTests
extends AbstractStreamableTestCase<NodesDeprecationCheckResponse> {
extends AbstractWireSerializingTestCase<NodesDeprecationCheckResponse> {
@Override
protected NodesDeprecationCheckResponse createBlankInstance() {
return new NodesDeprecationCheckResponse();
protected Writeable.Reader<NodesDeprecationCheckResponse> instanceReader() {
return NodesDeprecationCheckResponse::new;
}
@Override

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;
@ -21,6 +22,7 @@ import org.elasticsearch.xpack.core.deprecation.NodesDeprecationCheckAction;
import org.elasticsearch.xpack.core.deprecation.NodesDeprecationCheckRequest;
import org.elasticsearch.xpack.core.deprecation.NodesDeprecationCheckResponse;
import java.io.IOException;
import java.util.List;
public class TransportNodeDeprecationCheckAction extends TransportNodesAction<NodesDeprecationCheckRequest,
@ -57,8 +59,8 @@ public class TransportNodeDeprecationCheckAction extends TransportNodesAction<No
}
@Override
protected NodesDeprecationCheckAction.NodeResponse newNodeResponse() {
return new NodesDeprecationCheckAction.NodeResponse();
protected NodesDeprecationCheckAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
return new NodesDeprecationCheckAction.NodeResponse(in);
}
@Override

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.security.action.realm.ClearRealmCacheAction;
@ -21,6 +22,7 @@ import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authc.Realms;
import org.elasticsearch.xpack.security.authc.support.CachingRealm;
import java.io.IOException;
import java.util.List;
public class TransportClearRealmCacheAction extends TransportNodesAction<ClearRealmCacheRequest, ClearRealmCacheResponse,
@ -51,8 +53,8 @@ public class TransportClearRealmCacheAction extends TransportNodesAction<ClearRe
}
@Override
protected ClearRealmCacheResponse.Node newNodeResponse() {
return new ClearRealmCacheResponse.Node();
protected ClearRealmCacheResponse.Node newNodeResponse(StreamInput in) throws IOException {
return new ClearRealmCacheResponse.Node(in);
}
@Override

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.security.action.role.ClearRolesCacheAction;
@ -17,6 +18,7 @@ import org.elasticsearch.xpack.core.security.action.role.ClearRolesCacheRequest;
import org.elasticsearch.xpack.core.security.action.role.ClearRolesCacheResponse;
import org.elasticsearch.xpack.security.authz.store.CompositeRolesStore;
import java.io.IOException;
import java.util.List;
public class TransportClearRolesCacheAction extends TransportNodesAction<ClearRolesCacheRequest, ClearRolesCacheResponse,
@ -44,8 +46,8 @@ public class TransportClearRolesCacheAction extends TransportNodesAction<ClearRo
}
@Override
protected ClearRolesCacheResponse.Node newNodeResponse() {
return new ClearRolesCacheResponse.Node();
protected ClearRolesCacheResponse.Node newNodeResponse(StreamInput in) throws IOException {
return new ClearRolesCacheResponse.Node(in);
}
@Override

View File

@ -6,20 +6,14 @@
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class SqlStatsAction extends StreamableResponseActionType<SqlStatsResponse> {
public class SqlStatsAction extends ActionType<SqlStatsResponse> {
public static final SqlStatsAction INSTANCE = new SqlStatsAction();
public static final String NAME = "cluster:monitor/xpack/sql/stats/dist";
private SqlStatsAction() {
super(NAME);
super(NAME, SqlStatsResponse::new);
}
@Override
public SqlStatsResponse newResponse() {
return new SqlStatsResponse();
}
}

View File

@ -19,8 +19,14 @@ import java.io.IOException;
public class SqlStatsRequest extends BaseNodesRequest<SqlStatsRequest> {
private boolean includeStats;
public SqlStatsRequest() {
super((String[]) null);
}
public SqlStatsRequest(StreamInput in) throws IOException {
super(in);
includeStats = in.readBoolean();
}
public boolean includeStats() {
@ -30,12 +36,6 @@ public class SqlStatsRequest extends BaseNodesRequest<SqlStatsRequest> {
public void includeStats(boolean includeStats) {
this.includeStats = includeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeStats = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
@ -51,7 +51,10 @@ public class SqlStatsRequest extends BaseNodesRequest<SqlStatsRequest> {
static class NodeStatsRequest extends BaseNodeRequest {
boolean includeStats;
NodeStatsRequest() {}
NodeStatsRequest(StreamInput in) throws IOException {
super(in);
includeStats = in.readBoolean();
}
NodeStatsRequest(SqlStatsRequest request) {
includeStats = request.includeStats();
@ -61,12 +64,6 @@ public class SqlStatsRequest extends BaseNodesRequest<SqlStatsRequest> {
return includeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeStats = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -22,7 +22,8 @@ import java.util.List;
public class SqlStatsResponse extends BaseNodesResponse<SqlStatsResponse.NodeStatsResponse> implements ToXContentObject {
public SqlStatsResponse() {
public SqlStatsResponse(StreamInput in) throws IOException {
super(in);
}
public SqlStatsResponse(ClusterName clusterName, List<NodeStatsResponse> nodes, List<FailedNodeException> failures) {
@ -54,7 +55,11 @@ public class SqlStatsResponse extends BaseNodesResponse<SqlStatsResponse.NodeSta
private Counters stats;
public NodeStatsResponse() {
public NodeStatsResponse(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
stats = Counters.read(in);
}
}
public NodeStatsResponse(DiscoveryNode node) {
@ -68,14 +73,6 @@ public class SqlStatsResponse extends BaseNodesResponse<SqlStatsResponse.NodeSta
public void setStats(Counters stats) {
this.stats = stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
stats = Counters.read(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
@ -97,9 +94,7 @@ public class SqlStatsResponse extends BaseNodesResponse<SqlStatsResponse.NodeSta
}
static SqlStatsResponse.NodeStatsResponse readNodeResponse(StreamInput in) throws IOException {
SqlStatsResponse.NodeStatsResponse node = new SqlStatsResponse.NodeStatsResponse();
node.readFrom(in);
return node;
return new SqlStatsResponse.NodeStatsResponse(in);
}
}

View File

@ -10,10 +10,12 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import java.io.IOException;
import java.util.List;
/**
@ -46,8 +48,8 @@ public class TransportSqlStatsAction extends TransportNodesAction<SqlStatsReques
}
@Override
protected SqlStatsResponse.NodeStatsResponse newNodeResponse() {
return new SqlStatsResponse.NodeStatsResponse();
protected SqlStatsResponse.NodeStatsResponse newNodeResponse(StreamInput in) throws IOException {
return new SqlStatsResponse.NodeStatsResponse(in);
}
@Override

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
@ -21,6 +22,7 @@ import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -57,8 +59,8 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
}
@Override
protected WatcherStatsResponse.Node newNodeResponse() {
return new WatcherStatsResponse.Node();
protected WatcherStatsResponse.Node newNodeResponse(StreamInput in) throws IOException {
return new WatcherStatsResponse.Node(in);
}
@Override