From fcfbdf1f3737d59284bd75e426751d564216a6e1 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 18 Oct 2017 06:57:30 -0600 Subject: [PATCH] Expose adaptive replica selection stats in /_nodes/stats API This exposes the collected metrics we store for ARS in the nodes stats, as well as the computed rank of nodes. Each node exposes its perspective about the cluster. Here's an example output (with `?human`): ```json ... "adaptive_selection" : { "_k6v1-wERxyUd5ke6s-D0g" : { "outgoing_searches" : 0, "avg_queue_size" : 0, "avg_service_time" : "7.8ms", "avg_service_time_ns" : 7896963, "avg_response_time" : "9ms", "avg_response_time_ns" : 9095598, "rank" : "9.1" }, "VJiCUFoiTpySGmO00eWmtQ" : { "outgoing_searches" : 0, "avg_queue_size" : 0, "avg_service_time" : "1.3ms", "avg_service_time_ns" : 1330240, "avg_response_time" : "4.5ms", "avg_response_time_ns" : 4524154, "rank" : "4.5" }, "DHNGTdzyT9iiaCpEUsIAKA" : { "outgoing_searches" : 0, "avg_queue_size" : 0, "avg_service_time" : "2.1ms", "avg_service_time_ns" : 2113164, "avg_response_time" : "6.3ms", "avg_response_time_ns" : 6375810, "rank" : "6.4" } } ... ``` --- .../admin/cluster/node/stats/NodeStats.java | 25 +++- .../cluster/node/stats/NodesStatsRequest.java | 24 ++++ .../node/stats/TransportNodesStatsAction.java | 2 +- .../stats/TransportClusterStatsAction.java | 3 +- .../action/search/SearchTransportService.java | 5 + .../node/AdaptiveSelectionStats.java | 108 ++++++++++++++++++ .../java/org/elasticsearch/node/Node.java | 3 +- .../org/elasticsearch/node/NodeService.java | 23 ++-- .../node/ResponseCollectorService.java | 48 ++++++-- .../cluster/node/stats/NodeStatsTests.java | 46 +++++++- .../elasticsearch/cluster/DiskUsageTests.java | 12 +- .../MockInternalClusterInfoService.java | 2 +- .../test/InternalTestCluster.java | 3 +- 13 files changed, 273 insertions(+), 31 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/node/AdaptiveSelectionStats.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 7e4ae6d57ab..750cf609dc6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.Version; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; @@ -36,6 +37,7 @@ import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.monitor.process.ProcessStats; +import org.elasticsearch.node.AdaptiveSelectionStats; import org.elasticsearch.script.ScriptStats; import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.transport.TransportStats; @@ -86,6 +88,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private IngestStats ingestStats; + @Nullable + private AdaptiveSelectionStats adaptiveSelectionStats; + NodeStats() { } @@ -95,7 +100,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable AllCircuitBreakerStats breaker, @Nullable ScriptStats scriptStats, @Nullable DiscoveryStats discoveryStats, - @Nullable IngestStats ingestStats) { + @Nullable IngestStats ingestStats, + @Nullable AdaptiveSelectionStats adaptiveSelectionStats) { super(node); this.timestamp = timestamp; this.indices = indices; @@ -110,6 +116,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { this.scriptStats = scriptStats; this.discoveryStats = discoveryStats; this.ingestStats = ingestStats; + this.adaptiveSelectionStats = adaptiveSelectionStats; } public long getTimestamp() { @@ -199,6 +206,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { return ingestStats; } + @Nullable + public AdaptiveSelectionStats getAdaptiveSelectionStats() { + return adaptiveSelectionStats; + } + public static NodeStats readNodeStats(StreamInput in) throws IOException { NodeStats nodeInfo = new NodeStats(); nodeInfo.readFrom(in); @@ -223,6 +235,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { scriptStats = in.readOptionalWriteable(ScriptStats::new); discoveryStats = in.readOptionalWriteable(DiscoveryStats::new); ingestStats = in.readOptionalWriteable(IngestStats::new); + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new); + } else { + adaptiveSelectionStats = null; + } } @Override @@ -246,6 +263,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { out.writeOptionalWriteable(scriptStats); out.writeOptionalWriteable(discoveryStats); out.writeOptionalWriteable(ingestStats); + if (out.getVersion().onOrAfter(Version.V_6_1_0)) { + out.writeOptionalWriteable(adaptiveSelectionStats); + } } @Override @@ -306,6 +326,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { if (getIngestStats() != null) { getIngestStats().toXContent(builder, params); } + if (getAdaptiveSelectionStats() != null) { + getAdaptiveSelectionStats().toXContent(builder, params); + } return builder; } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index a2098d17361..54ef5b65977 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; @@ -43,6 +44,7 @@ public class NodesStatsRequest extends BaseNodesRequest { private boolean script; private boolean discovery; private boolean ingest; + private boolean adaptiveSelection; public NodesStatsRequest() { } @@ -71,6 +73,7 @@ public class NodesStatsRequest extends BaseNodesRequest { this.script = true; this.discovery = true; this.ingest = true; + this.adaptiveSelection = true; return this; } @@ -90,6 +93,7 @@ public class NodesStatsRequest extends BaseNodesRequest { this.script = false; this.discovery = false; this.ingest = false; + this.adaptiveSelection = false; return this; } @@ -265,6 +269,18 @@ public class NodesStatsRequest extends BaseNodesRequest { return this; } + public boolean adaptiveSelection() { + return adaptiveSelection; + } + + /** + * Should adaptiveSelection statistics be returned. + */ + public NodesStatsRequest adaptiveSelection(boolean adaptiveSelection) { + this.adaptiveSelection = adaptiveSelection; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -280,6 +296,11 @@ public class NodesStatsRequest extends BaseNodesRequest { script = in.readBoolean(); discovery = in.readBoolean(); ingest = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + adaptiveSelection = in.readBoolean(); + } else { + adaptiveSelection = false; + } } @Override @@ -297,5 +318,8 @@ public class NodesStatsRequest extends BaseNodesRequest { out.writeBoolean(script); out.writeBoolean(discovery); out.writeBoolean(ingest); + if (out.getVersion().onOrAfter(Version.V_6_1_0)) { + out.writeBoolean(adaptiveSelection); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 56c98ed7db0..f9f3b0826f9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -73,7 +73,7 @@ public class TransportNodesStatsAction extends TransportNodesAction shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 8a4c8b0882f..509f89a7542 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -60,6 +60,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; @@ -94,6 +95,10 @@ public class SearchTransportService extends AbstractComponent { this.responseWrapper = responseWrapper; } + public Map getClientConnections() { + return Collections.unmodifiableMap(clientConnections); + } + public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) { transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(originalIndices, contextId), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener() { diff --git a/core/src/main/java/org/elasticsearch/node/AdaptiveSelectionStats.java b/core/src/main/java/org/elasticsearch/node/AdaptiveSelectionStats.java new file mode 100644 index 00000000000..3deb161cc8e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/node/AdaptiveSelectionStats.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.node; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.xcontent.ToXContent.Params; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Class representing statistics about adaptive replica selection. This includes + * EWMA of queue size, service time, and response time, as well as outgoing + * searches to each node and the "rank" based on the ARS formula. + */ +public class AdaptiveSelectionStats implements Writeable, ToXContentFragment { + + private final Map clientOutgoingConnections; + private final Map nodeComputedStats; + + public AdaptiveSelectionStats(Map clientConnections, + Map nodeComputedStats) { + this.clientOutgoingConnections = clientConnections; + this.nodeComputedStats = nodeComputedStats; + } + + public AdaptiveSelectionStats(StreamInput in) throws IOException { + this.clientOutgoingConnections = in.readMap(StreamInput::readString, StreamInput::readLong); + this.nodeComputedStats = in.readMap(StreamInput::readString, ResponseCollectorService.ComputedNodeStats::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.clientOutgoingConnections, StreamOutput::writeString, StreamOutput::writeLong); + out.writeMap(this.nodeComputedStats, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("adaptive_selection"); + Set allNodeIds = Sets.union(clientOutgoingConnections.keySet(), nodeComputedStats.keySet()); + for (String nodeId : allNodeIds) { + builder.startObject(nodeId); + ResponseCollectorService.ComputedNodeStats stats = nodeComputedStats.get(nodeId); + if (stats != null) { + long outgoingSearches = clientOutgoingConnections.getOrDefault(nodeId, 0L); + builder.field("outgoing_searches", outgoingSearches); + builder.field("avg_queue_size", stats.queueSize); + builder.timeValueField("avg_service_time_ns", "avg_service_time", (long) stats.serviceTime, TimeUnit.NANOSECONDS); + builder.timeValueField("avg_response_time_ns", "avg_response_time", (long) stats.responseTime, TimeUnit.NANOSECONDS); + builder.field("rank", String.format(Locale.ROOT, "%.1f", stats.rank(outgoingSearches))); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + /** + * Returns a map of node id to the outgoing search requests to that node + */ + public Map getOutgoingConnections() { + return clientOutgoingConnections; + } + + /** + * Returns a map of node id to the computed stats + */ + public Map getComputedStats() { + return nodeComputedStats; + } + + /** + * Returns a map of node id to the ranking of the nodes based on the adaptive replica formula + */ + public Map getRanks() { + return nodeComputedStats.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> e.getValue().rank(clientOutgoingConnections.getOrDefault(e.getKey(), 0L)))); + } +} diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index a7669f3ff54..0ddc03de8c0 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -447,7 +447,8 @@ public class Node implements Closeable { clusterModule.getAllocationService()); this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), - httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter()); + httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, + searchTransportService); modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); diff --git a/core/src/main/java/org/elasticsearch/node/NodeService.java b/core/src/main/java/org/elasticsearch/node/NodeService.java index 319df478f95..5fd4599df94 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/NodeService.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; +import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -36,6 +37,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -54,17 +56,19 @@ public class NodeService extends AbstractComponent implements Closeable { private final CircuitBreakerService circuitBreakerService; private final IngestService ingestService; private final SettingsFilter settingsFilter; - private ScriptService scriptService; + private final ScriptService scriptService; private final HttpServerTransport httpServerTransport; - + private final ResponseCollectorService responseCollectorService; + private final SearchTransportService searchTransportService; private final Discovery discovery; NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, - TransportService transportService, IndicesService indicesService, PluginsService pluginService, - CircuitBreakerService circuitBreakerService, ScriptService scriptService, - @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, - SettingsFilter settingsFilter) { + TransportService transportService, IndicesService indicesService, PluginsService pluginService, + CircuitBreakerService circuitBreakerService, ScriptService scriptService, + @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, + SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, + SearchTransportService searchTransportService) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -77,6 +81,8 @@ public class NodeService extends AbstractComponent implements Closeable { this.ingestService = ingestService; this.settingsFilter = settingsFilter; this.scriptService = scriptService; + this.responseCollectorService = responseCollectorService; + this.searchTransportService = searchTransportService; clusterService.addStateApplier(ingestService.getPipelineStore()); clusterService.addStateApplier(ingestService.getPipelineExecutionService()); } @@ -99,7 +105,7 @@ public class NodeService extends AbstractComponent implements Closeable { public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean fs, boolean transport, boolean http, boolean circuitBreaker, - boolean script, boolean discoveryStats, boolean ingest) { + boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(), @@ -114,7 +120,8 @@ public class NodeService extends AbstractComponent implements Closeable { circuitBreaker ? circuitBreakerService.stats() : null, script ? scriptService.stats() : null, discoveryStats ? discovery.stats() : null, - ingest ? ingestService.getPipelineExecutionService().stats() : null + ingest ? ingestService.getPipelineExecutionService().stats() : null, + adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getClientConnections()) : null ); } diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 6fea0e2e1c0..9881d4404af 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -26,9 +26,13 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Locale; @@ -91,6 +95,10 @@ public final class ResponseCollectorService extends AbstractComponent implements return nodeStats; } + public AdaptiveSelectionStats getAdaptiveStats(Map clientSearchConnections) { + return new AdaptiveSelectionStats(clientSearchConnections, getAllNodeStatistics()); + } + /** * Optionally return a {@code NodeStatistics} for the given nodeid, if * response information exists for the given node. Returns an empty @@ -106,7 +114,7 @@ public final class ResponseCollectorService extends AbstractComponent implements * node's statistics. This includes the EWMA of queue size, response time, * and service time. */ - public static class ComputedNodeStats { + public static class ComputedNodeStats implements Writeable { // We store timestamps with nanosecond precision, however, the // formula specifies milliseconds, therefore we need to convert // the values so the times don't unduely weight the formula @@ -120,12 +128,34 @@ public final class ResponseCollectorService extends AbstractComponent implements public final double responseTime; public final double serviceTime; - ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { + public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double responseTime, double serviceTime) { + this.nodeId = nodeId; this.clientNum = clientNum; - this.nodeId = nodeStats.nodeId; - this.queueSize = (int) nodeStats.queueSize.getAverage(); - this.responseTime = nodeStats.responseTime.getAverage(); - this.serviceTime = nodeStats.serviceTime; + this.queueSize = queueSize; + this.responseTime = responseTime; + this.serviceTime = serviceTime; + } + + ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { + this(nodeStats.nodeId, clientNum, + (int) nodeStats.queueSize.getAverage(), nodeStats.responseTime.getAverage(), nodeStats.serviceTime); + } + + ComputedNodeStats(StreamInput in) throws IOException { + this.nodeId = in.readString(); + this.clientNum = in.readInt(); + this.queueSize = in.readInt(); + this.responseTime = in.readDouble(); + this.serviceTime = in.readDouble(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeInt(this.clientNum); + out.writeInt(this.queueSize); + out.writeDouble(this.responseTime); + out.writeDouble(this.serviceTime); } /** @@ -133,9 +163,9 @@ public final class ResponseCollectorService extends AbstractComponent implements * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf */ private double innerRank(long outstandingRequests) { - // this is a placeholder value, the concurrency compensation is - // defined as the number of outstanding requests from the client - // to the node times the number of clients in the system + // the concurrency compensation is defined as the number of + // outstanding requests from the client to the node times the number + // of clients in the system double concurrencyCompensation = outstandingRequests * clientNum; // Cubic queue adjustment factor. The paper chose 3 though we could diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 338ffe06fb8..9d01b74213e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,8 @@ import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.monitor.process.ProcessStats; +import org.elasticsearch.node.AdaptiveSelectionStats; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.script.ScriptStats; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -46,6 +48,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -278,6 +281,22 @@ public class NodeStatsTests extends ESTestCase { assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount()); } } + AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); + AdaptiveSelectionStats deserializedAdaptiveStats = deserializedNodeStats.getAdaptiveSelectionStats(); + if (adaptiveStats == null) { + assertNull(deserializedAdaptiveStats); + } else { + assertEquals(adaptiveStats.getOutgoingConnections(), deserializedAdaptiveStats.getOutgoingConnections()); + assertEquals(adaptiveStats.getRanks(), deserializedAdaptiveStats.getRanks()); + adaptiveStats.getComputedStats().forEach((k, v) -> { + ResponseCollectorService.ComputedNodeStats aStats = adaptiveStats.getComputedStats().get(k); + ResponseCollectorService.ComputedNodeStats bStats = deserializedAdaptiveStats.getComputedStats().get(k); + assertEquals(aStats.nodeId, bStats.nodeId); + assertEquals(aStats.queueSize, bStats.queueSize, 0.01); + assertEquals(aStats.serviceTime, bStats.serviceTime, 0.01); + assertEquals(aStats.responseTime, bStats.responseTime, 0.01); + }); + } } } } @@ -407,8 +426,31 @@ public class NodeStatsTests extends ESTestCase { } ingestStats = new IngestStats(totalStats, statsPerPipeline); } + AdaptiveSelectionStats adaptiveSelectionStats = null; + if (frequently()) { + int numNodes = randomIntBetween(0,10); + Map nodeConnections = new HashMap<>(); + Map nodeStats = new HashMap<>(); + for (int i = 0; i < numNodes; i++) { + String nodeId = randomAlphaOfLengthBetween(3, 10); + // add outgoing connection info + if (frequently()) { + nodeConnections.put(nodeId, randomLongBetween(0, 100)); + } + // add node calculations + if (frequently()) { + ResponseCollectorService.ComputedNodeStats stats = new ResponseCollectorService.ComputedNodeStats(nodeId, + randomIntBetween(1,10), randomIntBetween(0, 2000), + randomDoubleBetween(1.0, 10000000.0, true), + randomDoubleBetween(1.0, 10000000.0, true)); + nodeStats.put(nodeId, stats); + } + } + adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats); + } //TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet - return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats, fsInfo, - transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats); + return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats, + fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, + ingestStats, adaptiveSelectionStats); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 942d7a222ec..09ff06919e9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -152,11 +152,11 @@ public class DiskUsageTests extends ESTestCase { }; List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null), + null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null), + null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null) + null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages); DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1"); @@ -193,11 +193,11 @@ public class DiskUsageTests extends ESTestCase { }; List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null), + null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null), + null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null) + null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages); DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1"); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 4f6e8bb1924..dd8f90b9e71 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -67,7 +67,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService { null, null, null, null, null, fsInfo, null, null, null, - null, null, null); + null, null, null, null); } public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c8eec0cbb54..9ad0afdf6a4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2069,7 +2069,8 @@ public final class InternalTestCluster extends TestCluster { NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node); CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments); - NodeStats stats = nodeService.stats(flags, false, false, false, false, false, false, false, false, false, false, false); + NodeStats stats = nodeService.stats(flags, + false, false, false, false, false, false, false, false, false, false, false, false); assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat("Query cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); assertThat("FixedBitSet cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getSegments().getBitsetMemoryInBytes(), equalTo(0L));