Add Failure Details to every NodesResponse

Most of the current implementations of BaseNodesResponse (plural Nodes) ignore FailedNodeExceptions.

- This adds a helper function to do the grouping to TransportNodesAction
- Requires a non-null array of FailedNodeExceptions within the BaseNodesResponse constructor
- Reads/writes the array to output
- Also adds StreamInput and StreamOutput methods for generically reading and writing arrays
This commit is contained in:
Chris Earle 2016-04-25 15:55:25 -04:00
parent 0eaa831f48
commit 5be79ed02c
60 changed files with 726 additions and 821 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -47,7 +48,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_UUID_NA_VAL
/**
* A base class for all elasticsearch exceptions.
*/
public class ElasticsearchException extends RuntimeException implements ToXContent {
public class ElasticsearchException extends RuntimeException implements ToXContent, Writeable {
public static final String REST_EXCEPTION_SKIP_CAUSE = "rest.exception.cause.skip";
public static final String REST_EXCEPTION_SKIP_STACK_TRACE = "rest.exception.stacktrace.skip";
@ -235,6 +236,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(this.getMessage());
out.writeThrowable(this.getCause());

View File

@ -19,12 +19,14 @@
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
/**
*/
@ -33,26 +35,18 @@ public class NodesHotThreadsResponse extends BaseNodesResponse<NodeHotThreads> {
NodesHotThreadsResponse() {
}
public NodesHotThreadsResponse(ClusterName clusterName, NodeHotThreads[] nodes) {
super(clusterName, nodes);
public NodesHotThreadsResponse(ClusterName clusterName, List<NodeHotThreads> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeHotThreads[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeHotThreads.readNodeHotThreads(in);
}
protected List<NodeHotThreads> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeHotThreads::readNodeHotThreads);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeHotThreads node : nodes) {
node.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeHotThreads> nodes) throws IOException {
out.writeStreamableList(nodes);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
@ -35,33 +36,28 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
*/
public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHotThreadsRequest, NodesHotThreadsResponse, TransportNodesHotThreadsAction.NodeRequest, NodeHotThreads> {
public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHotThreadsRequest,
NodesHotThreadsResponse,
TransportNodesHotThreadsAction.NodeRequest,
NodeHotThreads> {
@Inject
public TransportNodesHotThreadsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC);
indexNameExpressionResolver, NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeHotThreads.class);
}
@Override
protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request, AtomicReferenceArray responses) {
final List<NodeHotThreads> nodes = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeHotThreads) {
nodes.add((NodeHotThreads) resp);
}
}
return new NodesHotThreadsResponse(clusterName, nodes.toArray(new NodeHotThreads[nodes.size()]));
protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request,
List<NodeHotThreads> responses, List<FailedNodeException> failures) {
return new NodesHotThreadsResponse(clusterName, responses, failures);
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.node.info;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -30,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -40,34 +42,24 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
public NodesInfoResponse() {
}
public NodesInfoResponse(ClusterName clusterName, NodeInfo[] nodes) {
super(clusterName, nodes);
public NodesInfoResponse(ClusterName clusterName, List<NodeInfo> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeInfo[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeInfo.readNodeInfo(in);
}
protected List<NodeInfo> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeInfo::readNodeInfo);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeInfo node : nodes) {
node.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeInfo> nodes) throws IOException {
out.writeStreamableList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("cluster_name", getClusterName().value());
builder.startObject("nodes");
for (NodeInfo nodeInfo : this) {
for (NodeInfo nodeInfo : getNodes()) {
builder.startObject(nodeInfo.getNode().getId());
builder.field("name", nodeInfo.getNode().getName());

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.node.info;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
@ -34,36 +35,32 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
*/
public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequest, NodesInfoResponse, TransportNodesInfoAction.NodeInfoRequest, NodeInfo> {
public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequest,
NodesInfoResponse,
TransportNodesInfoAction.NodeInfoRequest,
NodeInfo> {
private final NodeService nodeService;
@Inject
public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
NodeService nodeService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT);
indexNameExpressionResolver, NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT, NodeInfo.class);
this.nodeService = nodeService;
}
@Override
protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest, AtomicReferenceArray responses) {
final List<NodeInfo> nodesInfos = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeInfo) {
nodesInfos.add((NodeInfo) resp);
}
}
return new NodesInfoResponse(clusterName, nodesInfos.toArray(new NodeInfo[nodesInfos.size()]));
protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest,
List<NodeInfo> responses, List<FailedNodeException> failures) {
return new NodesInfoResponse(clusterName, responses, failures);
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.node.stats;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.List;
/**
*
@ -37,34 +39,24 @@ public class NodesStatsResponse extends BaseNodesResponse<NodeStats> implements
NodesStatsResponse() {
}
public NodesStatsResponse(ClusterName clusterName, NodeStats[] nodes) {
super(clusterName, nodes);
public NodesStatsResponse(ClusterName clusterName, List<NodeStats> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeStats[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeStats.readNodeStats(in);
}
protected List<NodeStats> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeStats::readNodeStats);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeStats node : nodes) {
node.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeStats> nodes) throws IOException {
out.writeStreamableList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("cluster_name", getClusterName().value());
builder.startObject("nodes");
for (NodeStats nodeStats : this) {
for (NodeStats nodeStats : getNodes()) {
builder.startObject(nodeStats.getNode().getId());
builder.field("timestamp", nodeStats.getTimestamp());
nodeStats.toXContent(builder, params);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.node.stats;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
@ -34,36 +35,31 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
*/
public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRequest, NodesStatsResponse, TransportNodesStatsAction.NodeStatsRequest, NodeStats> {
public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRequest,
NodesStatsResponse,
TransportNodesStatsAction.NodeStatsRequest,
NodeStats> {
private final NodeService nodeService;
@Inject
public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT);
NodeService nodeService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT, NodeStats.class);
this.nodeService = nodeService;
}
@Override
protected NodesStatsResponse newResponse(NodesStatsRequest nodesInfoRequest, AtomicReferenceArray responses) {
final List<NodeStats> nodeStats = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeStats) {
nodeStats.add((NodeStats) resp);
}
}
return new NodesStatsResponse(clusterName, nodeStats.toArray(new NodeStats[nodeStats.size()]));
protected NodesStatsResponse newResponse(NodesStatsRequest request, List<NodeStats> responses, List<FailedNodeException> failures) {
return new NodesStatsResponse(clusterName, responses, failures);
}
@Override

View File

@ -43,18 +43,20 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static java.util.Collections.unmodifiableMap;
/**
* Transport client that collects snapshot shard statuses from data nodes
*/
public class TransportNodesSnapshotsStatus extends TransportNodesAction<TransportNodesSnapshotsStatus.Request, TransportNodesSnapshotsStatus.NodesSnapshotStatus, TransportNodesSnapshotsStatus.NodeRequest, TransportNodesSnapshotsStatus.NodeSnapshotStatus> {
public class TransportNodesSnapshotsStatus extends TransportNodesAction<TransportNodesSnapshotsStatus.Request,
TransportNodesSnapshotsStatus.NodesSnapshotStatus,
TransportNodesSnapshotsStatus.NodeRequest,
TransportNodesSnapshotsStatus.NodeSnapshotStatus> {
public static final String ACTION_NAME = SnapshotsStatusAction.NAME + "[nodes]";
@ -66,7 +68,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
SnapshotShardsService snapshotShardsService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, NodeRequest::new, ThreadPool.Names.GENERIC);
Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeSnapshotStatus.class);
this.snapshotShardsService = snapshotShardsService;
}
@ -86,21 +88,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
}
@Override
protected NodesSnapshotStatus newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeSnapshotStatus> nodesList = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeSnapshotStatus) { // will also filter out null response for unallocated ones
nodesList.add((NodeSnapshotStatus) resp);
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
} else {
logger.warn("unknown response type [{}], expected NodeSnapshotStatus or FailedNodeException", resp);
}
}
return new NodesSnapshotStatus(clusterName, nodesList.toArray(new NodeSnapshotStatus[nodesList.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
protected NodesSnapshotStatus newResponse(Request request, List<NodeSnapshotStatus> responses, List<FailedNodeException> failures) {
return new NodesSnapshotStatus(clusterName, responses, failures);
}
@Override
@ -169,75 +158,47 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public static class NodesSnapshotStatus extends BaseNodesResponse<NodeSnapshotStatus> {
private FailedNodeException[] failures;
NodesSnapshotStatus() {
}
public NodesSnapshotStatus(ClusterName clusterName, NodeSnapshotStatus[] nodes, FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
public NodesSnapshotStatus(ClusterName clusterName, List<NodeSnapshotStatus> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public FailedNodeException[] failures() {
return failures;
protected List<NodeSnapshotStatus> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeSnapshotStatus::new);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeSnapshotStatus[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NodeSnapshotStatus();
nodes[i].readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeSnapshotStatus response : nodes) {
response.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeSnapshotStatus> nodes) throws IOException {
out.writeStreamableList(nodes);
}
}
public static class NodeRequest extends BaseNodeRequest {
private SnapshotId[] snapshotIds;
private List<SnapshotId> snapshotIds;
public NodeRequest() {
}
NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) {
super(nodeId);
snapshotIds = request.snapshotIds;
snapshotIds = Arrays.asList(request.snapshotIds);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int n = in.readVInt();
snapshotIds = new SnapshotId[n];
for (int i = 0; i < n; i++) {
snapshotIds[i] = SnapshotId.readSnapshotId(in);
}
snapshotIds = in.readList(SnapshotId::readSnapshotId);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (snapshotIds != null) {
out.writeVInt(snapshotIds.length);
for (int i = 0; i < snapshotIds.length; i++) {
snapshotIds[i].writeTo(out);
}
} else {
out.writeVInt(0);
}
out.writeStreamableList(snapshotIds);
}
}

View File

@ -22,9 +22,6 @@ package org.elasticsearch.action.admin.cluster.stats;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.cache.query.QueryCacheStats;
@ -36,8 +33,9 @@ import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import java.io.IOException;
import java.util.List;
public class ClusterStatsIndices implements ToXContent, Streamable {
public class ClusterStatsIndices implements ToXContent {
private int indexCount;
private ShardStats shards;
@ -49,10 +47,7 @@ public class ClusterStatsIndices implements ToXContent, Streamable {
private SegmentsStats segments;
private PercolatorQueryCacheStats percolatorCache;
private ClusterStatsIndices() {
}
public ClusterStatsIndices(ClusterStatsNodeResponse[] nodeResponses) {
public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses) {
ObjectObjectHashMap<String, ShardStats> countsPerIndex = new ObjectObjectHashMap<>();
this.docs = new DocsStats();
@ -131,38 +126,6 @@ public class ClusterStatsIndices implements ToXContent, Streamable {
return percolatorCache;
}
@Override
public void readFrom(StreamInput in) throws IOException {
indexCount = in.readVInt();
shards = ShardStats.readShardStats(in);
docs = DocsStats.readDocStats(in);
store = StoreStats.readStoreStats(in);
fieldData = FieldDataStats.readFieldDataStats(in);
queryCache = QueryCacheStats.readQueryCacheStats(in);
completion = CompletionStats.readCompletionStats(in);
segments = SegmentsStats.readSegmentsStats(in);
percolatorCache = PercolatorQueryCacheStats.readPercolateStats(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexCount);
shards.writeTo(out);
docs.writeTo(out);
store.writeTo(out);
fieldData.writeTo(out);
queryCache.writeTo(out);
completion.writeTo(out);
segments.writeTo(out);
percolatorCache.writeTo(out);
}
public static ClusterStatsIndices readIndicesStats(StreamInput in) throws IOException {
ClusterStatsIndices indicesStats = new ClusterStatsIndices();
indicesStats.readFrom(in);
return indicesStats;
}
static final class Fields {
static final String COUNT = "count";
}
@ -181,7 +144,7 @@ public class ClusterStatsIndices implements ToXContent, Streamable {
return builder;
}
public static class ShardStats implements ToXContent, Streamable {
public static class ShardStats implements ToXContent {
int indices;
int total;
@ -326,40 +289,6 @@ public class ClusterStatsIndices implements ToXContent, Streamable {
}
}
public static ShardStats readShardStats(StreamInput in) throws IOException {
ShardStats c = new ShardStats();
c.readFrom(in);
return c;
}
@Override
public void readFrom(StreamInput in) throws IOException {
indices = in.readVInt();
total = in.readVInt();
primaries = in.readVInt();
minIndexShards = in.readVInt();
maxIndexShards = in.readVInt();
minIndexPrimaryShards = in.readVInt();
maxIndexPrimaryShards = in.readVInt();
minIndexReplication = in.readDouble();
totalIndexReplication = in.readDouble();
maxIndexReplication = in.readDouble();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indices);
out.writeVInt(total);
out.writeVInt(primaries);
out.writeVInt(minIndexShards);
out.writeVInt(maxIndexShards);
out.writeVInt(minIndexPrimaryShards);
out.writeVInt(maxIndexPrimaryShards);
out.writeDouble(minIndexReplication);
out.writeDouble(totalIndexReplication);
out.writeDouble(maxIndexReplication);
}
static final class Fields {
static final String SHARDS = "shards";
static final String TOTAL = "total";

View File

@ -26,9 +26,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -48,7 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
public class ClusterStatsNodes implements ToXContent, Writeable {
public class ClusterStatsNodes implements ToXContent {
private final Counts counts;
private final Set<Version> versions;
@ -58,33 +55,12 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
private final FsInfo.Path fs;
private final Set<PluginInfo> plugins;
ClusterStatsNodes(StreamInput in) throws IOException {
this.counts = new Counts(in);
int size = in.readVInt();
this.versions = new HashSet<>(size);
for (int i = 0; i < size; i++) {
this.versions.add(Version.readVersion(in));
}
this.os = new OsStats(in);
this.process = new ProcessStats(in);
this.jvm = new JvmStats(in);
this.fs = new FsInfo.Path(in);
size = in.readVInt();
this.plugins = new HashSet<>(size);
for (int i = 0; i < size; i++) {
this.plugins.add(PluginInfo.readFromStream(in));
}
}
ClusterStatsNodes(ClusterStatsNodeResponse[] nodeResponses) {
ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
this.fs = new FsInfo.Path();
this.plugins = new HashSet<>();
Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.length);
Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.size());
List<NodeInfo> nodeInfos = new ArrayList<>();
List<NodeStats> nodeStats = new ArrayList<>();
for (ClusterStatsNodeResponse nodeResponse : nodeResponses) {
@ -140,21 +116,6 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
return plugins;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
counts.writeTo(out);
out.writeVInt(versions.size());
for (Version v : versions) Version.writeVersion(v, out);
os.writeTo(out);
process.writeTo(out);
jvm.writeTo(out);
fs.writeTo(out);
out.writeVInt(plugins.size());
for (PluginInfo p : plugins) {
p.writeTo(out);
}
}
static final class Fields {
static final String COUNT = "count";
static final String VERSIONS = "versions";
@ -200,18 +161,12 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
return builder;
}
public static class Counts implements Writeable, ToXContent {
public static class Counts implements ToXContent {
static final String COORDINATING_ONLY = "coordinating_only";
private final int total;
private final Map<String, Integer> roles;
@SuppressWarnings("unchecked")
private Counts(StreamInput in) throws IOException {
this.total = in.readVInt();
this.roles = (Map<String, Integer>)in.readGenericValue();
}
private Counts(List<NodeInfo> nodeInfos) {
this.roles = new HashMap<>();
for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) {
@ -243,12 +198,6 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
return roles;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeGenericValue(roles);
}
static final class Fields {
static final String TOTAL = "total";
}
@ -263,7 +212,7 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
}
}
public static class OsStats implements ToXContent, Writeable {
public static class OsStats implements ToXContent {
final int availableProcessors;
final int allocatedProcessors;
final ObjectIntHashMap<String> names;
@ -287,30 +236,6 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
this.allocatedProcessors = allocatedProcessors;
}
/**
* Read from a stream.
*/
private OsStats(StreamInput in) throws IOException {
this.availableProcessors = in.readVInt();
this.allocatedProcessors = in.readVInt();
int size = in.readVInt();
this.names = new ObjectIntHashMap<>();
for (int i = 0; i < size; i++) {
names.addTo(in.readString(), in.readVInt());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(availableProcessors);
out.writeVInt(allocatedProcessors);
out.writeVInt(names.size());
for (ObjectIntCursor<String> name : names) {
out.writeString(name.key);
out.writeVInt(name.value);
}
}
public int getAvailableProcessors() {
return availableProcessors;
}
@ -343,7 +268,7 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
}
}
public static class ProcessStats implements ToXContent, Writeable {
public static class ProcessStats implements ToXContent {
final int count;
final int cpuPercent;
@ -384,27 +309,6 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
this.maxOpenFileDescriptors = maxOpenFileDescriptors;
}
/**
* Read from a stream.
*/
private ProcessStats(StreamInput in) throws IOException {
this.count = in.readVInt();
this.cpuPercent = in.readVInt();
this.totalOpenFileDescriptors = in.readVLong();
this.minOpenFileDescriptors = in.readLong();
this.maxOpenFileDescriptors = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(count);
out.writeVInt(cpuPercent);
out.writeVLong(totalOpenFileDescriptors);
out.writeLong(minOpenFileDescriptors);
out.writeLong(maxOpenFileDescriptors);
}
/**
* Cpu usage in percentages - 100 is 1 core.
*/
@ -456,7 +360,7 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
}
}
public static class JvmStats implements Writeable, ToXContent {
public static class JvmStats implements ToXContent {
private final ObjectIntHashMap<JvmVersion> versions;
private final long threads;
@ -497,34 +401,6 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
this.heapMax = heapMax;
}
/**
* Read from a stream.
*/
private JvmStats(StreamInput in) throws IOException {
int size = in.readVInt();
this.versions = new ObjectIntHashMap<>(size);
for (int i = 0; i < size; i++) {
this.versions.addTo(new JvmVersion(in), in.readVInt());
}
this.threads = in.readVLong();
this.maxUptime = in.readVLong();
this.heapUsed = in.readVLong();
this.heapMax = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(versions.size());
for (ObjectIntCursor<JvmVersion> v : versions) {
v.key.writeTo(out);
out.writeVInt(v.value);
}
out.writeVLong(threads);
out.writeVLong(maxUptime);
out.writeVLong(heapUsed);
out.writeVLong(heapMax);
}
public ObjectIntHashMap<JvmVersion> getVersions() {
return versions;
}
@ -598,7 +474,7 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
}
}
public static class JvmVersion implements Writeable {
public static class JvmVersion {
String version;
String vmName;
String vmVersion;
@ -611,27 +487,6 @@ public class ClusterStatsNodes implements ToXContent, Writeable {
vmVendor = jvmInfo.getVmVendor();
}
/**
* Read from a stream.
*/
JvmVersion(StreamInput in) throws IOException {
version = in.readString();
vmName = in.readString();
vmVersion = in.readString();
vmVendor = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(version);
out.writeString(vmName);
out.writeString(vmVersion);
out.writeString(vmVendor);
}
JvmVersion() {
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.stats;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -29,9 +30,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
*
@ -48,8 +48,9 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
ClusterStatsResponse() {
}
public ClusterStatsResponse(long timestamp, ClusterName clusterName, String clusterUUID, ClusterStatsNodeResponse[] nodes) {
super(clusterName, null);
public ClusterStatsResponse(long timestamp, ClusterName clusterName, String clusterUUID,
List<ClusterStatsNodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
this.timestamp = timestamp;
this.clusterUUID = clusterUUID;
nodesStats = new ClusterStatsNodes(nodes);
@ -79,77 +80,53 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
return indicesStats;
}
@Override
public ClusterStatsNodeResponse[] getNodes() {
throw new UnsupportedOperationException();
}
@Override
public Map<String, ClusterStatsNodeResponse> getNodesMap() {
throw new UnsupportedOperationException();
}
@Override
public ClusterStatsNodeResponse getAt(int position) {
throw new UnsupportedOperationException();
}
@Override
public Iterator<ClusterStatsNodeResponse> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
timestamp = in.readVLong();
status = null;
if (in.readBoolean()) {
// it may be that the master switched on us while doing the operation. In this case the status may be null.
status = ClusterHealthStatus.fromValue(in.readByte());
}
clusterUUID = in.readString();
nodesStats = new ClusterStatsNodes(in);
indicesStats = ClusterStatsIndices.readIndicesStats(in);
// it may be that the master switched on us while doing the operation. In this case the status may be null.
status = in.readOptionalWriteable(ClusterHealthStatus::readFrom);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(timestamp);
if (status == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(status.value());
}
out.writeString(clusterUUID);
nodesStats.writeTo(out);
indicesStats.writeTo(out);
out.writeOptionalWriteable(status);
}
static final class Fields {
static final String NODES = "nodes";
static final String INDICES = "indices";
static final String UUID = "uuid";
static final String CLUSTER_NAME = "cluster_name";
static final String STATUS = "status";
@Override
protected List<ClusterStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
List<ClusterStatsNodeResponse> nodes = in.readList(ClusterStatsNodeResponse::readNodeResponse);
// built from nodes rather than from the stream directly
nodesStats = new ClusterStatsNodes(nodes);
indicesStats = new ClusterStatsIndices(nodes);
return nodes;
}
@Override
protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException {
// nodeStats and indicesStats are rebuilt from nodes
out.writeStreamableList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("timestamp", getTimestamp());
builder.field(Fields.CLUSTER_NAME, getClusterName().value());
if (params.paramAsBoolean("output_uuid", false)) {
builder.field(Fields.UUID, clusterUUID);
builder.field("uuid", clusterUUID);
}
if (status != null) {
builder.field(Fields.STATUS, status.name().toLowerCase(Locale.ROOT));
builder.field("status", status.name().toLowerCase(Locale.ROOT));
}
builder.startObject(Fields.INDICES);
builder.startObject("indices");
indicesStats.toXContent(builder, params);
builder.endObject();
builder.startObject(Fields.NODES);
builder.startObject("nodes");
nodesStats.toXContent(builder, params);
builder.endObject();
return builder;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.stats;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
@ -46,7 +47,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
@ -68,22 +68,17 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
NodeService nodeService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClusterStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT);
indexNameExpressionResolver, ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT,
ClusterStatsNodeResponse.class);
this.nodeService = nodeService;
this.indicesService = indicesService;
}
@Override
protected ClusterStatsResponse newResponse(ClusterStatsRequest clusterStatsRequest, AtomicReferenceArray responses) {
final List<ClusterStatsNodeResponse> nodeStats = new ArrayList<>(responses.length());
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof ClusterStatsNodeResponse) {
nodeStats.add((ClusterStatsNodeResponse) resp);
}
}
return new ClusterStatsResponse(System.currentTimeMillis(), clusterName,
clusterService.state().metaData().clusterUUID(), nodeStats.toArray(new ClusterStatsNodeResponse[nodeStats.size()]));
protected ClusterStatsResponse newResponse(ClusterStatsRequest request,
List<ClusterStatsNodeResponse> responses, List<FailedNodeException> failures) {
return new ClusterStatsResponse(System.currentTimeMillis(), clusterName, clusterService.state().metaData().clusterUUID(),
responses, failures);
}
@Override

View File

@ -53,6 +53,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -152,7 +153,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
}
@Override
protected synchronized void processAsyncFetch(ShardId shardId, NodeGatewayStartedShards[] responses, FailedNodeException[] failures) {
protected synchronized void processAsyncFetch(ShardId shardId, List<NodeGatewayStartedShards> responses, List<FailedNodeException> failures) {
fetchResponses.add(new Response(shardId, responses, failures));
if (expectedOps.countDown()) {
finish();
@ -220,10 +221,10 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
public class Response {
private final ShardId shardId;
private final NodeGatewayStartedShards[] responses;
private final FailedNodeException[] failures;
private final List<NodeGatewayStartedShards> responses;
private final List<FailedNodeException> failures;
public Response(ShardId shardId, NodeGatewayStartedShards[] responses, FailedNodeException[] failures) {
public Response(ShardId shardId, List<NodeGatewayStartedShards> responses, List<FailedNodeException> failures) {
this.shardId = shardId;
this.responses = responses;
this.failures = failures;

View File

@ -80,7 +80,7 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
public void onResponse(NodesInfoResponse nodeInfos) {
try {
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
for (NodeInfo nodeInfo : nodeInfos) {
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
}
pipelineStore.put(clusterService, ingestInfos, request, listener);

View File

@ -22,61 +22,77 @@ package org.elasticsearch.action.support.nodes;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*
*/
public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse> extends ActionResponse implements Iterable<TNodeResponse> {
public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse> extends ActionResponse {
private ClusterName clusterName;
protected TNodeResponse[] nodes;
private List<FailedNodeException> failures;
private List<TNodeResponse> nodes;
private Map<String, TNodeResponse> nodesMap;
protected BaseNodesResponse() {
}
protected BaseNodesResponse(ClusterName clusterName, TNodeResponse[] nodes) {
this.clusterName = clusterName;
this.nodes = nodes;
protected BaseNodesResponse(ClusterName clusterName, List<TNodeResponse> nodes, List<FailedNodeException> failures) {
this.clusterName = Objects.requireNonNull(clusterName);
this.failures = Objects.requireNonNull(failures);
this.nodes = Objects.requireNonNull(nodes);
}
/**
* The failed nodes, if set to be captured.
* Get the {@link ClusterName} associated with all of the nodes.
*
* @return Never {@code null}.
*/
@Nullable
public FailedNodeException[] failures() {
return null;
}
public ClusterName getClusterName() {
return this.clusterName;
return clusterName;
}
public String getClusterNameAsString() {
return this.clusterName.value();
/**
* Get the failed node exceptions.
*
* @return Never {@code null}. Can be empty.
*/
public List<FailedNodeException> failures() {
return failures;
}
public TNodeResponse[] getNodes() {
/**
* Determine if there are any node failures in {@link #failures}.
*
* @return {@code true} if {@link #failures} contains at least 1 {@link FailedNodeException}.
*/
public boolean hasFailures() {
return failures.isEmpty() == false;
}
/**
* Get the <em>successful</em> node responses.
*
* @return Never {@code null}. Can be empty.
* @see #hasFailures()
*/
public List<TNodeResponse> getNodes() {
return nodes;
}
public TNodeResponse getAt(int position) {
return nodes[position];
}
@Override
public Iterator<TNodeResponse> iterator() {
return getNodesMap().values().iterator();
}
/**
* Lazily build and get a map of Node ID to node response.
*
* @return Never {@code null}. Can be empty.
* @see #getNodes()
*/
public Map<String, TNodeResponse> getNodesMap() {
if (nodesMap == null) {
nodesMap = new HashMap<>();
@ -91,11 +107,28 @@ public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse>
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
nodes = readNodesFrom(in);
failures = in.readList(FailedNodeException::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
writeNodesTo(out, nodes);
out.writeList(failures);
}
/**
* Read the {@link #nodes} from the stream.
*
* @return Never {@code null}.
*/
protected abstract List<TNodeResponse> readNodesFrom(StreamInput in) throws IOException;
/**
* Write the {@link #nodes} to the stream.
*/
protected abstract void writeNodesTo(StreamOutput out, List<TNodeResponse> nodes) throws IOException;
}

View File

@ -43,6 +43,9 @@ import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
@ -50,22 +53,30 @@ import java.util.function.Supplier;
/**
*
*/
public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest<NodesRequest>, NodesResponse extends BaseNodesResponse, NodeRequest extends BaseNodeRequest, NodeResponse extends BaseNodeResponse> extends HandledTransportAction<NodesRequest, NodesResponse> {
public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest<NodesRequest>,
NodesResponse extends BaseNodesResponse,
NodeRequest extends BaseNodeRequest,
NodeResponse extends BaseNodeResponse>
extends HandledTransportAction<NodesRequest, NodesResponse> {
protected final ClusterName clusterName;
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Class<NodeResponse> nodeResponseClass;
final String transportNodeAction;
protected TransportNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<NodesRequest> request, Supplier<NodeRequest> nodeRequest,
String nodeExecutor) {
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<NodesRequest> request, Supplier<NodeRequest> nodeRequest,
String nodeExecutor,
Class<NodeResponse> nodeResponseClass) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.clusterName = Objects.requireNonNull(clusterName);
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);
this.transportNodeAction = actionName + "[n]";
@ -87,7 +98,46 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
return false;
}
protected abstract NodesResponse newResponse(NodesRequest request, AtomicReferenceArray nodesResponses);
/**
* Map the responses into {@code nodeResponseClass} responses and {@link FailedNodeException}s.
*
* @param request The associated request.
* @param nodesResponses All node-level responses
* @return Never {@code null}.
* @throws NullPointerException if {@code nodesResponses} is {@code null}
* @see #newResponse(BaseNodesRequest, List, List)
*/
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray nodesResponses) {
final List<NodeResponse> responses = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();
for (int i = 0; i < nodesResponses.length(); ++i) {
Object response = nodesResponses.get(i);
if (nodeResponseClass.isInstance(response)) {
responses.add(nodeResponseClass.cast(response));
} else if (response instanceof FailedNodeException) {
failures.add((FailedNodeException)response);
} else {
logger.warn("ignoring unexpected response [{}] of type [{}], expected [{}] or [{}]",
response, response != null ? response.getClass().getName() : null,
nodeResponseClass.getSimpleName(), FailedNodeException.class.getSimpleName());
}
}
return newResponse(request, responses, failures);
}
/**
* Create a new {@link NodesResponse} (multi-node response).
*
* @param request The associated request.
* @param responses All successful node-level responses.
* @param failures All node-level failures.
* @return Never {@code null}.
* @throws NullPointerException if any parameter is {@code null}.
*/
protected abstract NodesResponse newResponse(NodesRequest request, List<NodeResponse> responses, List<FailedNodeException> failures);
protected abstract NodeRequest newNodeRequest(String nodeId, NodesRequest request);
@ -165,7 +215,8 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
taskManager.registerChildTask(task, node.getId());
}
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() {
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
new BaseTransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse newInstance() {
return newNodeResponse();
@ -238,4 +289,5 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
}
}
}

View File

@ -404,7 +404,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
}
}
static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray,
static void fillDiskUsagePerNode(ESLogger logger, List<NodeStats> nodeStatsArray,
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages,
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages) {
for (NodeStats nodeStats : nodeStatsArray) {

View File

@ -20,10 +20,16 @@
package org.elasticsearch.cluster.health;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
*
*/
public enum ClusterHealthStatus {
public enum ClusterHealthStatus implements Writeable {
GREEN((byte) 0),
YELLOW((byte) 1),
RED((byte) 2);
@ -38,7 +44,21 @@ public enum ClusterHealthStatus {
return value;
}
public static ClusterHealthStatus fromValue(byte value) {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(value);
}
/**
* Read from a stream.
*
* @throws IllegalArgumentException if the value is unrecognized
*/
public static ClusterHealthStatus readFrom(StreamInput in) throws IOException {
return fromValue(in.readByte());
}
public static ClusterHealthStatus fromValue(byte value) throws IOException {
switch (value) {
case 0:
return GREEN;

View File

@ -736,6 +736,29 @@ public abstract class StreamInput extends InputStream {
return null;
}
/**
* Read a {@link List} of {@link Streamable} objects, using the {@code constructor} to instantiate each instance.
* <p>
* This is expected to take the form:
* <code>
* List&lt;MyStreamableClass&gt; list = in.readStreamList(MyStreamableClass::new);
* </code>
*
* @param constructor Streamable instance creator
* @return Never {@code null}.
* @throws IOException if any step fails
*/
public <T extends Streamable> List<T> readStreamableList(Supplier<T> constructor) throws IOException {
int count = readVInt();
List<T> builder = new ArrayList<>(count);
for (int i=0; i<count; i++) {
T instance = constructor.get();
instance.readFrom(this);
builder.add(instance);
}
return builder;
}
/**
* Reads a list of objects
*/
@ -762,4 +785,5 @@ public abstract class StreamInput extends InputStream {
public static StreamInput wrap(byte[] bytes, int offset, int length) {
return new InputStreamStreamInput(new ByteArrayInputStream(bytes, offset, length));
}
}

View File

@ -727,12 +727,22 @@ public abstract class StreamOutput extends OutputStream {
}
}
/**
* Writes a list of {@link Streamable} objects
*/
public void writeStreamableList(List<? extends Streamable> list) throws IOException {
writeVInt(list.size());
for (Streamable obj: list) {
obj.writeTo(this);
}
}
/**
* Writes a list of {@link Writeable} objects
*/
public <T extends Writeable> void writeList(List<T> list) throws IOException {
public void writeList(List<? extends Writeable> list) throws IOException {
writeVInt(list.size());
for (T obj: list) {
for (Writeable obj: list) {
obj.writeTo(this);
}
}

View File

@ -35,9 +35,11 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -57,24 +59,24 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
/**
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface List<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(ShardId shardId, String[] nodesIds, ActionListener<NodesResponse> listener);
}
protected final ESLogger logger;
protected final String type;
private final ShardId shardId;
private final List<BaseNodesResponse<T>, T> action;
private final Lister<BaseNodesResponse<T>, T> action;
private final Map<String, NodeEntry<T>> cache = new HashMap<>();
private final Set<String> nodesToIgnore = new HashSet<>();
private boolean closed;
@SuppressWarnings("unchecked")
protected AsyncShardFetch(ESLogger logger, String type, ShardId shardId, List<? extends BaseNodesResponse<T>, T> action) {
protected AsyncShardFetch(ESLogger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
this.logger = logger;
this.type = type;
this.shardId = shardId;
this.action = (List<BaseNodesResponse<T>, T>) action;
this.action = (Lister<BaseNodesResponse<T>, T>) action;
}
@Override
@ -167,7 +169,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round
* of allocations taking this new data into account.
*/
protected synchronized void processAsyncFetch(ShardId shardId, T[] responses, FailedNodeException[] failures) {
protected synchronized void processAsyncFetch(ShardId shardId, List<T> responses, List<FailedNodeException> failures) {
if (closed) {
// we are closed, no need to process this async fetch at all
logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type);
@ -276,9 +278,9 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
@Override
public void onFailure(Throwable e) {
FailedNodeException[] failures = new FailedNodeException[nodesIds.length];
for (int i = 0; i < failures.length; i++) {
failures[i] = new FailedNodeException(nodesIds[i], "total failure in fetching", e);
List<FailedNodeException> failures = new ArrayList<>(nodesIds.length);
for (String nodeId : nodesIds) {
failures.add(new FailedNodeException(nodeId, "total failure in fetching", e));
}
processAsyncFetch(shardId, null, failures);
}

View File

@ -82,7 +82,7 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
if (nodesState.failures().length > 0) {
if (nodesState.hasFailures()) {
for (FailedNodeException failedNodeException : nodesState.failures()) {
logger.warn("failed to fetch state from node", failedNodeException);
}
@ -91,7 +91,7 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
MetaData electedGlobalState = null;
int found = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
if (nodeState.metaData() == null) {
continue;
}
@ -119,7 +119,7 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
Index index = (Index) keys[i];
IndexMetaData electedIndexMetaData = null;
int indexMetaDataCount = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
if (nodeState.metaData() == null) {
continue;
}

View File

@ -125,7 +125,7 @@ public class GatewayAllocator extends AbstractComponent {
class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends BaseNodesResponse<T>, T> action) {
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
super(logger, type, shardId, action);
}

View File

@ -43,14 +43,15 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
*/
public class TransportNodesListGatewayMetaState extends TransportNodesAction<TransportNodesListGatewayMetaState.Request, TransportNodesListGatewayMetaState.NodesGatewayMetaState, TransportNodesListGatewayMetaState.NodeRequest, TransportNodesListGatewayMetaState.NodeGatewayMetaState> {
public class TransportNodesListGatewayMetaState extends TransportNodesAction<TransportNodesListGatewayMetaState.Request,
TransportNodesListGatewayMetaState.NodesGatewayMetaState,
TransportNodesListGatewayMetaState.NodeRequest,
TransportNodesListGatewayMetaState.NodeGatewayMetaState> {
public static final String ACTION_NAME = "internal:gateway/local/meta_state";
@ -61,7 +62,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC);
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);
}
TransportNodesListGatewayMetaState init(GatewayMetaState metaState) {
@ -80,7 +81,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
return new NodeRequest(nodeId);
}
@Override
@ -89,21 +90,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
}
@Override
protected NodesGatewayMetaState newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeGatewayMetaState> nodesList = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeGatewayMetaState) { // will also filter out null response for unallocated ones
nodesList.add((NodeGatewayMetaState) resp);
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesGatewayMetaState(clusterName, nodesList.toArray(new NodeGatewayMetaState[nodesList.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
protected NodesGatewayMetaState newResponse(Request request, List<NodeGatewayMetaState> responses, List<FailedNodeException> failures) {
return new NodesGatewayMetaState(clusterName, responses, failures);
}
@Override
@ -142,47 +130,30 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
public static class NodesGatewayMetaState extends BaseNodesResponse<NodeGatewayMetaState> {
private FailedNodeException[] failures;
NodesGatewayMetaState() {
}
public NodesGatewayMetaState(ClusterName clusterName, NodeGatewayMetaState[] nodes, FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
}
public FailedNodeException[] failures() {
return failures;
public NodesGatewayMetaState(ClusterName clusterName, List<NodeGatewayMetaState> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeGatewayMetaState[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NodeGatewayMetaState();
nodes[i].readFrom(in);
}
protected List<NodeGatewayMetaState> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeGatewayMetaState::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeGatewayMetaState response : nodes) {
response.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeGatewayMetaState> nodes) throws IOException {
out.writeStreamableList(nodes);
}
}
public static class NodeRequest extends BaseNodeRequest {
public NodeRequest() {
}
NodeRequest(String nodeId, TransportNodesListGatewayMetaState.Request request) {
NodeRequest(String nodeId) {
super(nodeId);
}

View File

@ -48,9 +48,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}.
@ -63,7 +61,7 @@ public class TransportNodesListGatewayStartedShards extends
TransportNodesListGatewayStartedShards.NodeRequest,
TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
implements
AsyncShardFetch.List<TransportNodesListGatewayStartedShards.NodesGatewayStartedShards,
AsyncShardFetch.Lister<TransportNodesListGatewayStartedShards.NodesGatewayStartedShards,
TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> {
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
@ -77,7 +75,8 @@ public class TransportNodesListGatewayStartedShards extends
IndexNameExpressionResolver indexNameExpressionResolver,
NodeEnvironment env) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED);
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED,
NodeGatewayStartedShards.class);
this.nodeEnv = env;
}
@ -110,23 +109,9 @@ public class TransportNodesListGatewayStartedShards extends
}
@Override
protected NodesGatewayStartedShards newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeGatewayStartedShards> nodesList = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeGatewayStartedShards) { // will also filter out null response for unallocated ones
nodesList.add((NodeGatewayStartedShards) resp);
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException",
resp);
}
}
return new NodesGatewayStartedShards(clusterName,
nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
protected NodesGatewayStartedShards newResponse(Request request,
List<NodeGatewayStartedShards> responses, List<FailedNodeException> failures) {
return new NodesGatewayStartedShards(clusterName, responses, failures);
}
@Override
@ -217,36 +202,19 @@ public class TransportNodesListGatewayStartedShards extends
public static class NodesGatewayStartedShards extends BaseNodesResponse<NodeGatewayStartedShards> {
private FailedNodeException[] failures;
public NodesGatewayStartedShards(ClusterName clusterName, NodeGatewayStartedShards[] nodes,
FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
public NodesGatewayStartedShards(ClusterName clusterName, List<NodeGatewayStartedShards> nodes,
List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public FailedNodeException[] failures() {
return failures;
protected List<NodeGatewayStartedShards> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeGatewayStartedShards::new);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeGatewayStartedShards[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NodeGatewayStartedShards();
nodes[i].readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeGatewayStartedShards response : nodes) {
response.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeGatewayStartedShards> nodes) throws IOException {
out.writeStreamableList(nodes);
}
}
@ -258,7 +226,7 @@ public class TransportNodesListGatewayStartedShards extends
public NodeRequest() {
}
NodeRequest(String nodeId, Request request) {
public NodeRequest(String nodeId, Request request) {
super(nodeId);
this.shardId = request.shardId();
}

View File

@ -54,18 +54,20 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
*/
public class TransportNodesListShardStoreMetaData extends TransportNodesAction<TransportNodesListShardStoreMetaData.Request, TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData, TransportNodesListShardStoreMetaData.NodeRequest, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>
implements AsyncShardFetch.List<TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> {
public class TransportNodesListShardStoreMetaData extends TransportNodesAction<TransportNodesListShardStoreMetaData.Request,
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData,
TransportNodesListShardStoreMetaData.NodeRequest,
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>
implements AsyncShardFetch.Lister<TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData,
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> {
public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store";
@ -74,11 +76,12 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
private final NodeEnvironment nodeEnv;
@Inject
public TransportNodesListShardStoreMetaData(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
public TransportNodesListShardStoreMetaData(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
IndicesService indicesService, NodeEnvironment nodeEnv, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STORE);
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STORE, NodeStoreFilesMetaData.class);
this.indicesService = indicesService;
this.nodeEnv = nodeEnv;
}
@ -106,21 +109,9 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
}
@Override
protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeStoreFilesMetaData> nodeStoreFilesMetaDatas = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeStoreFilesMetaData) { // will also filter out null response for unallocated ones
nodeStoreFilesMetaDatas.add((NodeStoreFilesMetaData) resp);
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
} else {
logger.warn("unknown response type [{}], expected NodeStoreFilesMetaData or FailedNodeException", resp);
}
}
return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
protected NodesStoreFilesMetaData newResponse(Request request,
List<NodeStoreFilesMetaData> responses, List<FailedNodeException> failures) {
return new NodesStoreFilesMetaData(clusterName, responses, failures);
}
@Override
@ -293,37 +284,21 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
public static class NodesStoreFilesMetaData extends BaseNodesResponse<NodeStoreFilesMetaData> {
private FailedNodeException[] failures;
NodesStoreFilesMetaData() {
}
public NodesStoreFilesMetaData(ClusterName clusterName, NodeStoreFilesMetaData[] nodes, FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
public NodesStoreFilesMetaData(ClusterName clusterName, List<NodeStoreFilesMetaData> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public FailedNodeException[] failures() {
return failures;
protected List<NodeStoreFilesMetaData> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeStoreFilesMetaData::readListShardStoreNodeOperationResponse);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeStoreFilesMetaData[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeStoreFilesMetaData.readListShardStoreNodeOperationResponse(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeStoreFilesMetaData response : nodes) {
response.writeTo(out);
}
protected void writeNodesTo(StreamOutput out, List<NodeStoreFilesMetaData> nodes) throws IOException {
out.writeStreamableList(nodes);
}
}

View File

@ -69,7 +69,7 @@ public class RestNodesHotThreadsAction extends BaseRestHandler {
@Override
public RestResponse buildResponse(NodesHotThreadsResponse response) throws Exception {
StringBuilder sb = new StringBuilder();
for (NodeHotThreads node : response) {
for (NodeHotThreads node : response.getNodes()) {
sb.append("::: ").append(node.getNode().toString()).append("\n");
Strings.spaceify(3, node.getHotThreads(), sb);
sb.append('\n');

View File

@ -20,22 +20,17 @@
package org.elasticsearch.rest.action.admin.cluster.node.info;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import org.elasticsearch.rest.action.support.RestActions.NodesResponseRestListener;
import java.util.Set;
@ -106,15 +101,6 @@ public class RestNodesInfoAction extends BaseRestHandler {
settingsFilter.addFilterSettingParams(request);
client.admin().cluster().nodesInfo(nodesInfoRequest, new RestBuilderListener<NodesInfoResponse>(channel) {
@Override
public RestResponse buildResponse(NodesInfoResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
});
client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.rest.action.admin.cluster.node.stats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.client.Client;
@ -31,7 +30,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.rest.action.support.RestActions.NodesResponseRestListener;
import java.util.Set;
@ -114,6 +113,6 @@ public class RestNodesStatsAction extends BaseRestHandler {
nodesStatsRequest.indices().includeSegmentFileSizes(true);
}
client.admin().cluster().nodesStats(nodesStatsRequest, new RestToXContentListener<>(channel));
client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
}

View File

@ -27,8 +27,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.rest.action.support.RestActions.NodesResponseRestListener;
/**
*
@ -46,6 +45,6 @@ public class RestClusterStatsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
client.admin().cluster().clusterStats(clusterStatsRequest, new RestToXContentListener<>(channel));
client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}
}

View File

@ -21,13 +21,17 @@ package org.elasticsearch.rest.action.support;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
@ -38,9 +42,14 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.List;
/**
*
@ -63,25 +72,21 @@ public class RestActions {
return (version == Versions.MATCH_ANY) ? defaultVersion : version;
}
static final class Fields {
static final String _SHARDS = "_shards";
static final String TOTAL = "total";
static final String SUCCESSFUL = "successful";
static final String FAILED = "failed";
static final String FAILURES = "failures";
public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params, BroadcastResponse response) throws IOException {
buildBroadcastShardsHeader(builder, params,
response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(),
response.getShardFailures());
}
public static void buildBroadcastShardsHeader(XContentBuilder builder, ToXContent.Params params, BroadcastResponse response) throws IOException {
buildBroadcastShardsHeader(builder, params, response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures());
}
public static void buildBroadcastShardsHeader(XContentBuilder builder, ToXContent.Params params, int total, int successful, int failed, ShardOperationFailedException[] shardFailures) throws IOException {
builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, total);
builder.field(Fields.SUCCESSFUL, successful);
builder.field(Fields.FAILED, failed);
public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params,
int total, int successful, int failed,
ShardOperationFailedException[] shardFailures) throws IOException {
builder.startObject("_shards");
builder.field("total", total);
builder.field("successful", successful);
builder.field("failed", failed);
if (shardFailures != null && shardFailures.length > 0) {
builder.startArray(Fields.FAILURES);
builder.startArray("failures");
final boolean group = params.paramAsBoolean("group_shard_failures", true); // we group by default
for (ShardOperationFailedException shardFailure : group ? ExceptionsHelper.groupBy(shardFailures) : shardFailures) {
builder.startObject();
@ -92,6 +97,94 @@ public class RestActions {
}
builder.endObject();
}
/**
* Create the XContent header for any {@link BaseNodesResponse}.
*
* @param builder XContent builder.
* @param params XContent parameters.
* @param response The response containing individual, node-level responses.
* @see #buildNodesHeader(XContentBuilder, Params, int, int, int, List)
*/
public static <NodeResponse extends BaseNodeResponse> void buildNodesHeader(final XContentBuilder builder, final Params params,
final BaseNodesResponse<NodeResponse> response)
throws IOException {
final int successful = response.getNodes().size();
final int failed = response.failures().size();
buildNodesHeader(builder, params, successful + failed, successful, failed, response.failures());
}
/**
* Create the XContent header for any {@link BaseNodesResponse}. This looks like:
* <code>
* "_nodes" : {
* "total" : 3,
* "successful" : 1,
* "failed" : 2,
* "failures" : [ { ... }, { ... } ]
* }
* </code>
* Prefer the overload that properly invokes this method to calling this directly.
*
* @param builder XContent builder.
* @param params XContent parameters.
* @param total The total number of nodes touched.
* @param successful The successful number of responses received.
* @param failed The number of failures (effectively {@code total - successful}).
* @param failures The failure exceptions related to {@code failed}.
* @see #buildNodesHeader(XContentBuilder, Params, BaseNodesResponse)
*/
public static void buildNodesHeader(final XContentBuilder builder, final Params params,
final int total, final int successful, final int failed,
final List<FailedNodeException> failures) throws IOException {
builder.startObject("_nodes");
builder.field("total", total);
builder.field("successful", successful);
builder.field("failed", failed);
if (failures.isEmpty() == false) {
builder.startArray("failures");
for (FailedNodeException failure : failures) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
/**
* Automatically transform the {@link ToXContent}-compatible, nodes-level {@code response} into a a {@link BytesRestResponse}.
* <p>
* This looks like:
* <code>
* {
* "_nodes" : { ... },
* "cluster_name" : "...",
* ...
* }
* </code>
*
* @param builder XContent builder.
* @param params XContent parameters.
* @param response The nodes-level (plural) response.
* @return Never {@code null}.
* @throws IOException if building the response causes an issue
*/
public static <NodesResponse extends BaseNodesResponse & ToXContent> BytesRestResponse nodesResponse(final XContentBuilder builder,
final Params params,
final NodesResponse response)
throws IOException {
builder.startObject();
RestActions.buildNodesHeader(builder, params, response);
builder.field("cluster_name", response.getClusterName().value());
response.toXContent(builder, params);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
public static QueryBuilder urlParamsToQueryBuilder(RestRequest request) {
String queryString = request.param("q");
@ -130,7 +223,8 @@ public class RestActions {
return content;
}
public static QueryBuilder getQueryContent(BytesReference source, IndicesQueriesRegistry indicesQueriesRegistry, ParseFieldMatcher parseFieldMatcher) {
public static QueryBuilder getQueryContent(BytesReference source, IndicesQueriesRegistry indicesQueriesRegistry,
ParseFieldMatcher parseFieldMatcher) {
try (XContentParser requestParser = XContentFactory.xContent(source).createParser(source)) {
QueryParseContext context = new QueryParseContext(indicesQueriesRegistry, requestParser, parseFieldMatcher);
return context.parseTopLevelQueryBuilder();
@ -158,4 +252,32 @@ public class RestActions {
public static boolean hasBodyContent(final RestRequest request) {
return request.hasContent() || request.hasParam("source");
}
/**
* {@code NodesResponseRestBuilderListener} automatically translates any {@link BaseNodesResponse} (multi-node) response that is
* {@link ToXContent}-compatible into a {@link RestResponse} with the necessary header info (e.g., "cluster_name").
* <p>
* This is meant to avoid a slew of anonymous classes doing (or worse):
* <code>
* client.admin().cluster().request(nodesRequest, new RestBuilderListener&lt;NodesResponse&gt;(channel) {
* public RestResponse buildResponse(NodesResponse response, XContentBuilder builder) throws Exception {
* return RestActions.nodesResponse(builder, ToXContent.EMPTY_PARAMS, response);
* }
* });
* </code>
*/
public static class NodesResponseRestListener<NodesResponse extends BaseNodesResponse & ToXContent>
extends RestBuilderListener<NodesResponse> {
public NodesResponseRestListener(RestChannel channel) {
super(channel);
}
@Override
public RestResponse buildResponse(NodesResponse response, XContentBuilder builder) throws Exception {
return RestActions.nodesResponse(builder, ToXContent.EMPTY_PARAMS, response);
}
}
}

View File

@ -86,7 +86,7 @@ public class HotThreadsIT extends ESIntegTestCase {
assertThat(nodeHotThreads, notNullValue());
Map<String, NodeHotThreads> nodesMap = nodeHotThreads.getNodesMap();
assertThat(nodesMap.size(), equalTo(cluster().size()));
for (NodeHotThreads ht : nodeHotThreads) {
for (NodeHotThreads ht : nodeHotThreads.getNodes()) {
assertNotNull(ht.getHotThreads());
//logger.info(ht.getHotThreads());
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.HashMap;
import java.util.List;
@ -37,7 +36,6 @@ import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Tests for the cluster allocation explanation
@ -54,7 +52,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
@Override
public void run() {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().length, equalTo(3));
assertThat(resp.getNodes().size(), equalTo(3));
}
});

View File

@ -255,12 +255,12 @@ public class CancellableTasksTests extends TaskManagerTestCase {
// Make sure that the request was successful
assertNull(throwableReference.get());
assertNotNull(responseReference.get());
assertEquals(nodesCount, responseReference.get().getNodes().length);
assertEquals(nodesCount, responseReference.get().getNodes().size());
assertEquals(0, responseReference.get().failureCount());
} else {
// We canceled the request, in this case it should have fail, but we should get partial response
assertNull(throwableReference.get());
assertEquals(nodesCount, responseReference.get().failureCount() + responseReference.get().getNodes().length);
assertEquals(nodesCount, responseReference.get().failureCount() + responseReference.get().getNodes().size());
// and we should have at least as many failures as the number of blocked operations
// (we might have cancelled some non-blocked operations before they even started and that's ok)
assertThat(responseReference.get().failureCount(), greaterThanOrEqualTo(blockedNodesCount));

View File

@ -50,11 +50,9 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
@ -88,7 +86,6 @@ public abstract class TaskManagerTestCase extends ESTestCase {
testNodes = new TestNode[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
testNodes[i] = new TestNode("node" + i, threadPool, settings);
;
}
}
@ -113,27 +110,22 @@ public abstract class TaskManagerTestCase extends ESTestCase {
static class NodesResponse extends BaseNodesResponse<NodeResponse> {
private int failureCount;
protected NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) {
super(clusterName, nodes);
this.failureCount = failureCount;
protected NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureCount = in.readVInt();
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeResponse::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(failureCount);
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes);
}
public int failureCount() {
return failureCount;
return failures().size();
}
}
@ -148,24 +140,12 @@ public abstract class TaskManagerTestCase extends ESTestCase {
Supplier<NodeRequest> nodeRequest) {
super(settings, actionName, clusterName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
request, nodeRequest, ThreadPool.Names.GENERIC);
request, nodeRequest, ThreadPool.Names.GENERIC, NodeResponse.class);
}
@Override
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) {
final List<NodeResponse> nodesList = new ArrayList<>();
int failureCount = 0;
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones
nodesList.add((NodeResponse) resp);
} else if (resp instanceof FailedNodeException) {
failureCount++;
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount);
protected NodesResponse newResponse(NodesRequest request, List<NodeResponse> responses, List<FailedNodeException> failures) {
return new NodesResponse(clusterName, responses, failures);
}
@Override

View File

@ -56,7 +56,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
@ -111,31 +110,26 @@ public class TestTaskPlugin extends Plugin {
public static class NodesResponse extends BaseNodesResponse<NodeResponse> {
private int failureCount;
NodesResponse() {
}
public NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) {
super(clusterName, nodes);
this.failureCount = failureCount;
public NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureCount = in.readVInt();
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(NodeResponse::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(failureCount);
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes);
}
public int failureCount() {
return failureCount;
return failures().size();
}
}
@ -219,25 +213,13 @@ public class TestTaskPlugin extends Plugin {
public TransportTestTaskAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings, TestTaskAction.NAME, clusterName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC);
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeResponse.class);
}
@Override
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) {
final List<NodeResponse> nodesList = new ArrayList<>();
int failureCount = 0;
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones
nodesList.add((NodeResponse) resp);
} else if (resp instanceof FailedNodeException) {
failureCount++;
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount);
protected NodesResponse newResponse(NodesRequest request, List<NodeResponse> responses, List<FailedNodeException> failures) {
return new NodesResponse(clusterName, responses, failures);
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.support.nodes;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeActionTests;
@ -28,6 +29,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ESTestCase;
@ -39,6 +42,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -53,6 +57,7 @@ import java.util.function.Supplier;
import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.cluster.service.ClusterServiceUtils.setState;
import static org.mockito.Mockito.mock;
public class TransportNodesActionTests extends ESTestCase {
@ -92,6 +97,42 @@ public class TransportNodesActionTests extends ESTestCase {
assertEquals(clusterService.state().nodes().resolveNodesIds(finalNodesIds).length, capturedRequests.size());
}
public void testNewResponseNullArray() {
expectThrows(NullPointerException.class, () -> action.newResponse(new TestNodesRequest(), null));
}
public void testNewResponse() {
TestNodesRequest request = new TestNodesRequest();
List<TestNodeResponse> expectedNodeResponses = mockList(TestNodeResponse.class, randomIntBetween(0, 2));
expectedNodeResponses.add(new TestNodeResponse());
List<BaseNodeResponse> nodeResponses = new ArrayList<>(expectedNodeResponses);
// This should be ignored:
nodeResponses.add(new OtherNodeResponse());
List<FailedNodeException> failures = mockList(FailedNodeException.class, randomIntBetween(0, 2));
List<Object> allResponses = new ArrayList<>(expectedNodeResponses);
allResponses.addAll(failures);
Collections.shuffle(allResponses, random());
AtomicReferenceArray<?> atomicArray = new AtomicReferenceArray<>(allResponses.toArray());
TestNodesResponse response = action.newResponse(request, atomicArray);
assertSame(request, response.request);
// note: I shuffled the overall list, so it's not possible to guarantee that it's in the right order
assertTrue(expectedNodeResponses.containsAll(response.getNodes()));
assertTrue(failures.containsAll(response.failures()));
}
private <T> List<T> mockList(Class<T> clazz, int size) {
List<T> failures = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
failures.add(mock(clazz));
}
return failures;
}
private enum NodeSelector {
LOCAL("_local"), ELECTED_MASTER("_master"), MASTER_ELIGIBLE("master:true"), DATA("data:true"), CUSTOM_ATTRIBUTE("attr:value");
@ -165,26 +206,20 @@ public class TransportNodesActionTests extends ESTestCase {
return new DiscoveryNode(node, node, DummyTransportAddress.INSTANCE, attributes, roles, Version.CURRENT);
}
private static class TestTransportNodesAction extends TransportNodesAction<TestNodesRequest, TestNodesResponse, TestNodeRequest,
TestNodeResponse> {
private static class TestTransportNodesAction
extends TransportNodesAction<TestNodesRequest, TestNodesResponse, TestNodeRequest, TestNodeResponse> {
TestTransportNodesAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService
transportService, ActionFilters actionFilters, Supplier<TestNodesRequest> request,
Supplier<TestNodeRequest> nodeRequest, String nodeExecutor) {
super(settings, "indices:admin/test", CLUSTER_NAME, threadPool, clusterService, transportService, actionFilters,
null, request, nodeRequest, nodeExecutor);
null, request, nodeRequest, nodeExecutor, TestNodeResponse.class);
}
@Override
protected TestNodesResponse newResponse(TestNodesRequest request, AtomicReferenceArray nodesResponses) {
final List<TestNodeResponse> nodeResponses = new ArrayList<>();
for (int i = 0; i < nodesResponses.length(); i++) {
Object resp = nodesResponses.get(i);
if (resp instanceof TestNodeResponse) {
nodeResponses.add((TestNodeResponse) resp);
}
}
return new TestNodesResponse(nodeResponses);
protected TestNodesResponse newResponse(TestNodesRequest request,
List<TestNodeResponse> responses, List<FailedNodeException> failures) {
return new TestNodesResponse(request, responses, failures);
}
@Override
@ -216,16 +251,28 @@ public class TransportNodesActionTests extends ESTestCase {
private static class TestNodesResponse extends BaseNodesResponse<TestNodeResponse> {
private final List<TestNodeResponse> nodeResponses;
private final TestNodesRequest request;
TestNodesResponse(List<TestNodeResponse> nodeResponses) {
this.nodeResponses = nodeResponses;
TestNodesResponse(TestNodesRequest request, List<TestNodeResponse> nodeResponses, List<FailedNodeException> failures) {
super(CLUSTER_NAME, nodeResponses, failures);
this.request = request;
}
@Override
protected List<TestNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readStreamableList(TestNodeResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<TestNodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes);
}
}
private static class TestNodeRequest extends BaseNodeRequest {
}
private static class TestNodeRequest extends BaseNodeRequest { }
private static class TestNodeResponse extends BaseNodeResponse { }
private static class OtherNodeResponse extends BaseNodeResponse { }
private static class TestNodeResponse extends BaseNodeResponse {
}
}

View File

@ -44,7 +44,7 @@ public class ClusterStateBackwardsCompatIT extends ESBackcompatTestCase {
createIndex("test");
// connect to each node with a custom TransportClient, issue a ClusterStateRequest to test serialization
for (NodeInfo n : clusterNodes()) {
for (NodeInfo n : clusterNodes().getNodes()) {
try (TransportClient tc = newTransportClient()) {
tc.addTransportAddress(n.getNode().getAddress());
ClusterStateResponse response = tc.admin().cluster().prepareState().execute().actionGet();
@ -68,7 +68,7 @@ public class ClusterStateBackwardsCompatIT extends ESBackcompatTestCase {
try {
enableIndexBlock("test-blocks", block.getKey());
for (NodeInfo n : clusterNodes()) {
for (NodeInfo n : clusterNodes().getNodes()) {
try (TransportClient tc = newTransportClient()) {
tc.addTransportAddress(n.getNode().getAddress());

View File

@ -41,6 +41,8 @@ import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.test.ESTestCase;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -196,14 +198,14 @@ public class DiskUsageTests extends ESTestCase {
new FsInfo.Path("/least", "/dev/sda", 100, 90, 70),
new FsInfo.Path("/most", "/dev/sda", 100, 90, 80),
};
NodeStats[] nodeStats = new NodeStats[] {
List<NodeStats> nodeStats = Arrays.asList(
new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null),
new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null),
new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null)
};
);
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
DiskUsage mostNode_1 = newMostAvaiableUsages.get("node_1");
@ -237,14 +239,14 @@ public class DiskUsageTests extends ESTestCase {
new FsInfo.Path("/most", "/dev/sda", 100, 90, 70),
new FsInfo.Path("/least", "/dev/sda", 10, -8, 0),
};
NodeStats[] nodeStats = new NodeStats[] {
List<NodeStats> nodeStats = Arrays.asList(
new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null),
new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null),
new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null)
};
);
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");
DiskUsage mostNode_1 = newMostAvailableUsages.get("node_1");

View File

@ -81,7 +81,7 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase {
NodesInfoResponse nodesInfo = client().admin().cluster().prepareNodesInfo().get();
String excludedNodeId = null;
for (NodeInfo nodeInfo : nodesInfo) {
for (NodeInfo nodeInfo : nodesInfo.getNodes()) {
if (nodeInfo.getNode().isDataNode()) {
excludedNodeId = nodeInfo.getNode().getId();
break;
@ -124,7 +124,7 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase {
NodesInfoResponse nodesInfo = client().admin().cluster().prepareNodesInfo().get();
String excludedNodeId = null;
for (NodeInfo nodeInfo : nodesInfo) {
for (NodeInfo nodeInfo : nodesInfo.getNodes()) {
if (nodeInfo.getNode().isDataNode()) {
excludedNodeId = nodeInfo.getNode().getId();
break;

View File

@ -59,7 +59,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
@Override
public void run() {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().length, equalTo(3));
assertThat(resp.getNodes().size(), equalTo(3));
}
});

View File

@ -22,20 +22,20 @@ package org.elasticsearch.common.io.stream;
import org.apache.lucene.util.Constants;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
@ -431,6 +431,33 @@ public class BytesStreamsTests extends ESTestCase {
endsWith(" claims to have a different name [intentionally-broken] than it was read from [test-named-writeable]."));
}
public void testWriteStreamableList() throws IOException {
final int size = randomIntBetween(0, 5);
final List<TestStreamable> expected = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
expected.add(new TestStreamable(randomBoolean()));
}
final BytesStreamOutput out = new BytesStreamOutput();
out.writeStreamableList(expected);
final StreamInput in = StreamInput.wrap(out.bytes().toBytes());
List<TestStreamable> loaded = in.readStreamableList(TestStreamable::new);
assertThat(loaded, hasSize(expected.size()));
for (int i = 0; i < expected.size(); ++i) {
assertEquals(expected.get(i).value, loaded.get(i).value);
}
assertEquals(0, in.available());
in.close();
out.close();
}
private static abstract class BaseNamedWriteable implements NamedWriteable {
}
@ -544,4 +571,25 @@ public class BytesStreamsTests extends ESTestCase {
assertEquals(point, geoPoint);
}
}
private static class TestStreamable implements Streamable {
private boolean value;
public TestStreamable() { }
public TestStreamable(boolean value) {
this.value = value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(value);
}
}
}

View File

@ -349,9 +349,9 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
logger.info("--> request node discovery stats");
NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get();
assertThat(statsResponse.getNodes().length, equalTo(1));
assertThat(statsResponse.getNodes().size(), equalTo(1));
DiscoveryStats stats = statsResponse.getNodes()[0].getDiscoveryStats();
DiscoveryStats stats = statsResponse.getNodes().get(0).getDiscoveryStats();
assertThat(stats.getQueueStats(), notNullValue());
assertThat(stats.getQueueStats().getTotal(), equalTo(0));
assertThat(stats.getQueueStats().getCommitted(), equalTo(0));

View File

@ -284,10 +284,11 @@ public class AsyncShardFetchTests extends ESTestCase {
assert entry != null;
entry.executeLatch.await();
if (entry.failure != null) {
processAsyncFetch(shardId, null, new FailedNodeException[]{new FailedNodeException(nodeId,
"unexpected", entry.failure)});
processAsyncFetch(shardId, null, Collections.singletonList(new FailedNodeException(nodeId,
"unexpected",
entry.failure)));
} else {
processAsyncFetch(shardId, new Response[]{entry.response}, null);
processAsyncFetch(shardId, Collections.singletonList(entry.response), null);
}
} catch (Throwable e) {
logger.error("unexpected failure", e);

View File

@ -63,9 +63,9 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -568,12 +568,12 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
.execute(new TransportNodesListGatewayStartedShards.Request(shardId, new String[]{node.getId()}))
.get();
assertThat(response.getNodes(), arrayWithSize(1));
assertThat(response.getNodes()[0].allocationId(), notNullValue());
assertThat(response.getNodes(), hasSize(1));
assertThat(response.getNodes().get(0).allocationId(), notNullValue());
if (corrupt) {
assertThat(response.getNodes()[0].storeException(), notNullValue());
assertThat(response.getNodes().get(0).storeException(), notNullValue());
} else {
assertThat(response.getNodes()[0].storeException(), nullValue());
assertThat(response.getNodes().get(0).storeException(), nullValue());
}
// start another node so cluster consistency checks won't time out due to the lack of state

View File

@ -578,7 +578,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
assertTrue(shardRouting.assignedToNode());
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(shardRouting.currentNodeId()).setFs(true).get();
NodeStats nodeStats = nodeStatses.getNodes()[0];
NodeStats nodeStats = nodeStatses.getNodes().get(0);
List<Path> files = new ArrayList<>();
filesToNodes.put(nodeStats.getNode().getName(), files);
for (FsInfo.Path info : nodeStats.getFs()) {
@ -615,7 +615,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
String nodeId = shardRouting.currentNodeId();
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
Set<Path> files = new TreeSet<>(); // treeset makes sure iteration order is deterministic
for (FsInfo.Path info : nodeStatses.getNodes()[0].getFs()) {
for (FsInfo.Path info : nodeStatses.getNodes().get(0).getFs()) {
String path = info.getPath();
Path file = PathUtils.get(path).resolve("indices").resolve(test.getUUID()).resolve(Integer.toString(shardRouting.getId())).resolve("index");
if (Files.exists(file)) { // multi data path might only have one path in use
@ -678,9 +678,9 @@ public class CorruptedFileIT extends ESIntegTestCase {
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()).setFs(true).get();
ClusterState state = client().admin().cluster().prepareState().get().getState();
final Index test = state.metaData().index("test").getIndex();
assertThat(routing.toString(), nodeStatses.getNodes().length, equalTo(1));
assertThat(routing.toString(), nodeStatses.getNodes().size(), equalTo(1));
List<Path> files = new ArrayList<>();
for (FsInfo.Path info : nodeStatses.getNodes()[0].getFs()) {
for (FsInfo.Path info : nodeStatses.getNodes().get(0).getFs()) {
String path = info.getPath();
Path file = PathUtils.get(path).resolve("indices/" + test.getUUID() + "/" + Integer.toString(routing.getId()) + "/index");
if (Files.exists(file)) { // multi data path might only have one path in use

View File

@ -121,7 +121,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
String nodeId = shardRouting.currentNodeId();
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
Set<Path> files = new TreeSet<>(); // treeset makes sure iteration order is deterministic
for (FsInfo.Path fsPath : nodeStatses.getNodes()[0].getFs()) {
for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) {
String path = fsPath.getPath();
final String relativeDataLocationPath = "indices/"+ test.getUUID() +"/" + Integer.toString(shardRouting.getId()) + "/translog";
Path file = PathUtils.get(path).resolve(relativeDataLocationPath);

View File

@ -119,10 +119,9 @@ public class SuggestStatsIT extends ESIntegTestCase {
assertThat(suggest.getSuggestTimeInMillis(), lessThanOrEqualTo(totalShards * (endTime - startTime)));
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
NodeStats[] nodes = nodeStats.getNodes();
Set<String> nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2");
int num = 0;
for (NodeStats stat : nodes) {
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats suggestStats = stat.getIndices().getSearch().getTotal();
logger.info("evaluating {}", stat.getNode());
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {

View File

@ -101,7 +101,7 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
/** Returns true if any of the nodes used a noop breaker */
private boolean noopBreakerUsed() {
NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().setBreaker(true).get();
for (NodeStats nodeStats : stats) {
for (NodeStats nodeStats : stats.getNodes()) {
if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == NoopCircuitBreaker.LIMIT) {
return true;
}
@ -230,7 +230,7 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
// We need the request limit beforehand, just from a single node because the limit should always be the same
long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get()
.getNodes()[0].getBreaker().getStats(CircuitBreaker.REQUEST).getLimit();
.getNodes().get(0).getBreaker().getStats(CircuitBreaker.REQUEST).getLimit();
Settings resetSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "10b")

View File

@ -69,10 +69,10 @@ import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -315,7 +315,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
@Override
public void run() {
NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
assertThat(statsResponse.getNodes(), arrayWithSize(2));
assertThat(statsResponse.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
@ -344,7 +344,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
validateIndexRecoveryState(recoveryStates.get(0).getIndex());
statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
assertThat(statsResponse.getNodes(), arrayWithSize(2));
assertThat(statsResponse.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
@ -363,7 +363,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
ensureGreen();
statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
assertThat(statsResponse.getNodes(), arrayWithSize(2));
assertThat(statsResponse.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));

View File

@ -104,7 +104,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L));
@ -113,7 +113,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
@ -123,9 +123,9 @@ public class IndexStatsIT extends ESIntegTestCase {
// now check the per field stats
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), greaterThan(0L));
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes()));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes()));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
@ -134,7 +134,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L));
@ -152,8 +152,8 @@ public class IndexStatsIT extends ESIntegTestCase {
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes()[0].getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setQueryCache(true)
@ -173,8 +173,8 @@ public class IndexStatsIT extends ESIntegTestCase {
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes()[0].getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L));
indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setQueryCache(true)
@ -186,8 +186,8 @@ public class IndexStatsIT extends ESIntegTestCase {
Thread.sleep(100); // Make sure the filter cache entries have been removed...
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes()[0].getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setQueryCache(true)

View File

@ -54,29 +54,29 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertThat(response.getNodes().length, is(2));
assertThat(response.getNodes().size(), is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
response = client().admin().cluster().nodesInfo(nodesInfoRequest()).actionGet();
assertThat(response.getNodes().length, is(2));
assertThat(response.getNodes().size(), is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodes().size(), is(1));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodes().size(), is(1));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodes().size(), is(1));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodes().size(), is(1));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
}
@ -99,7 +99,7 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertThat(response.getNodes().length, is(2));
assertThat(response.getNodes().size(), is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());

View File

@ -168,8 +168,8 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
// Check that node info is correct
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) {
NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i];
assertEquals(2, nodesInfoResponse.getNodes().size());
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) {

View File

@ -67,7 +67,7 @@ public class NettyTransportPublishAddressIT extends ESIntegTestCase {
logger.info("--> checking if boundAddress matching publishAddress has same port");
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
for (NodeInfo nodeInfo : nodesInfoResponse) {
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getAddress();
if (nodeInfo.getNode().getName().equals(ipv4OnlyNode)) {
assertThat(boundTransportAddress.boundAddresses().length, equalTo(1));

View File

@ -130,10 +130,10 @@ public class SearchStatsTests extends ESIntegTestCase {
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getFetchCount(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getFetchTimeInMillis(), greaterThan(0L));
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
NodeStats[] nodes = nodeStats.getNodes();
Set<String> nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2");
int num = 0;
for (NodeStats stat : nodes) {
for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {
assertThat(total.getQueryCount(), greaterThan(0L));

View File

@ -65,6 +65,6 @@ public abstract class AbstractAzureComputeServiceTestCase extends ESIntegTestCas
NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertNotNull(nodeInfos);
assertNotNull(nodeInfos.getNodes());
assertEquals(expected, nodeInfos.getNodes().length);
assertEquals(expected, nodeInfos.getNodes().size());
}
}

View File

@ -38,6 +38,8 @@ import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import static java.util.Collections.emptyMap;
@ -107,7 +109,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
@Override
public CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
NodesStatsResponse response = new NodesStatsResponse(clusterName, stats);
NodesStatsResponse response = new NodesStatsResponse(clusterName, Arrays.asList(stats), Collections.emptyList());
listener.onResponse(response);
return new CountDownLatch(0);
}

View File

@ -2046,8 +2046,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
protected HttpRequestBuilder httpClient(CloseableHttpClient httpClient) {
final NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().get();
final NodeInfo[] nodes = nodeInfos.getNodes();
assertTrue(nodes.length > 0);
final List<NodeInfo> nodes = nodeInfos.getNodes();
assertFalse(nodeInfos.hasFailures());
TransportAddress publishAddress = randomFrom(nodes).getHttp().address().publishAddress();
assertEquals(1, publishAddress.uniqueAddressTypeId());
InetSocketAddress address = ((InetSocketTransportAddress) publishAddress).address();

View File

@ -154,8 +154,7 @@ final class ExternalNode implements Closeable {
static boolean waitForNode(final Client client, final String name) throws InterruptedException {
return ESTestCase.awaitBusy(() -> {
final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get();
final NodeInfo[] nodes = nodeInfos.getNodes();
for (NodeInfo info : nodes) {
for (NodeInfo info : nodeInfos.getNodes()) {
if (name.equals(info.getNode().getName())) {
return true;
}
@ -166,8 +165,7 @@ final class ExternalNode implements Closeable {
static NodeInfo nodeInfo(final Client client, final String nodeName) {
final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get();
final NodeInfo[] nodes = nodeInfos.getNodes();
for (NodeInfo info : nodes) {
for (NodeInfo info : nodeInfos.getNodes()) {
if (nodeName.equals(info.getNode().getName())) {
return info;
}

View File

@ -87,12 +87,12 @@ public final class ExternalTestCluster extends TestCluster {
try {
client.addTransportAddresses(transportAddresses);
NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().clear().setSettings(true).setHttp(true).get();
httpAddresses = new InetSocketAddress[nodeInfos.getNodes().length];
httpAddresses = new InetSocketAddress[nodeInfos.getNodes().size()];
this.clusterName = nodeInfos.getClusterName().value();
int dataNodes = 0;
int masterAndDataNodes = 0;
for (int i = 0; i < nodeInfos.getNodes().length; i++) {
NodeInfo nodeInfo = nodeInfos.getNodes()[i];
for (int i = 0; i < nodeInfos.getNodes().size(); i++) {
NodeInfo nodeInfo = nodeInfos.getNodes().get(i);
httpAddresses[i] = ((InetSocketTransportAddress) nodeInfo.getHttp().address().publishAddress()).address();
if (DiscoveryNode.isDataNode(nodeInfo.getSettings())) {
dataNodes++;