Expose adaptive replica selection stats in /_nodes/stats API
This exposes the collected metrics we store for ARS in the nodes stats, as well as the computed rank of nodes. Each node exposes its perspective about the cluster. Here's an example output (with `?human`): ```json ... "adaptive_selection" : { "_k6v1-wERxyUd5ke6s-D0g" : { "outgoing_searches" : 0, "avg_queue_size" : 0, "avg_service_time" : "7.8ms", "avg_service_time_ns" : 7896963, "avg_response_time" : "9ms", "avg_response_time_ns" : 9095598, "rank" : "9.1" }, "VJiCUFoiTpySGmO00eWmtQ" : { "outgoing_searches" : 0, "avg_queue_size" : 0, "avg_service_time" : "1.3ms", "avg_service_time_ns" : 1330240, "avg_response_time" : "4.5ms", "avg_response_time_ns" : 4524154, "rank" : "4.5" }, "DHNGTdzyT9iiaCpEUsIAKA" : { "outgoing_searches" : 0, "avg_queue_size" : 0, "avg_service_time" : "2.1ms", "avg_service_time_ns" : 2113164, "avg_response_time" : "6.3ms", "avg_response_time_ns" : 6375810, "rank" : "6.4" } } ... ```
This commit is contained in:
parent
a7fa5d3335
commit
fcfbdf1f37
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.node.stats;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -36,6 +37,7 @@ import org.elasticsearch.monitor.fs.FsInfo;
|
|||
import org.elasticsearch.monitor.jvm.JvmStats;
|
||||
import org.elasticsearch.monitor.os.OsStats;
|
||||
import org.elasticsearch.monitor.process.ProcessStats;
|
||||
import org.elasticsearch.node.AdaptiveSelectionStats;
|
||||
import org.elasticsearch.script.ScriptStats;
|
||||
import org.elasticsearch.threadpool.ThreadPoolStats;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
|
@ -86,6 +88,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
@Nullable
|
||||
private IngestStats ingestStats;
|
||||
|
||||
@Nullable
|
||||
private AdaptiveSelectionStats adaptiveSelectionStats;
|
||||
|
||||
NodeStats() {
|
||||
}
|
||||
|
||||
|
@ -95,7 +100,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
@Nullable AllCircuitBreakerStats breaker,
|
||||
@Nullable ScriptStats scriptStats,
|
||||
@Nullable DiscoveryStats discoveryStats,
|
||||
@Nullable IngestStats ingestStats) {
|
||||
@Nullable IngestStats ingestStats,
|
||||
@Nullable AdaptiveSelectionStats adaptiveSelectionStats) {
|
||||
super(node);
|
||||
this.timestamp = timestamp;
|
||||
this.indices = indices;
|
||||
|
@ -110,6 +116,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
this.scriptStats = scriptStats;
|
||||
this.discoveryStats = discoveryStats;
|
||||
this.ingestStats = ingestStats;
|
||||
this.adaptiveSelectionStats = adaptiveSelectionStats;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
|
@ -199,6 +206,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
return ingestStats;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AdaptiveSelectionStats getAdaptiveSelectionStats() {
|
||||
return adaptiveSelectionStats;
|
||||
}
|
||||
|
||||
public static NodeStats readNodeStats(StreamInput in) throws IOException {
|
||||
NodeStats nodeInfo = new NodeStats();
|
||||
nodeInfo.readFrom(in);
|
||||
|
@ -223,6 +235,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
scriptStats = in.readOptionalWriteable(ScriptStats::new);
|
||||
discoveryStats = in.readOptionalWriteable(DiscoveryStats::new);
|
||||
ingestStats = in.readOptionalWriteable(IngestStats::new);
|
||||
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
|
||||
adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
|
||||
} else {
|
||||
adaptiveSelectionStats = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,6 +263,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
out.writeOptionalWriteable(scriptStats);
|
||||
out.writeOptionalWriteable(discoveryStats);
|
||||
out.writeOptionalWriteable(ingestStats);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
|
||||
out.writeOptionalWriteable(adaptiveSelectionStats);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -306,6 +326,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
if (getIngestStats() != null) {
|
||||
getIngestStats().toXContent(builder, params);
|
||||
}
|
||||
if (getAdaptiveSelectionStats() != null) {
|
||||
getAdaptiveSelectionStats().toXContent(builder, params);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.node.stats;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -43,6 +44,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
private boolean script;
|
||||
private boolean discovery;
|
||||
private boolean ingest;
|
||||
private boolean adaptiveSelection;
|
||||
|
||||
public NodesStatsRequest() {
|
||||
}
|
||||
|
@ -71,6 +73,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
this.script = true;
|
||||
this.discovery = true;
|
||||
this.ingest = true;
|
||||
this.adaptiveSelection = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -90,6 +93,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
this.script = false;
|
||||
this.discovery = false;
|
||||
this.ingest = false;
|
||||
this.adaptiveSelection = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -265,6 +269,18 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean adaptiveSelection() {
|
||||
return adaptiveSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should adaptiveSelection statistics be returned.
|
||||
*/
|
||||
public NodesStatsRequest adaptiveSelection(boolean adaptiveSelection) {
|
||||
this.adaptiveSelection = adaptiveSelection;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
@ -280,6 +296,11 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
script = in.readBoolean();
|
||||
discovery = in.readBoolean();
|
||||
ingest = in.readBoolean();
|
||||
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
|
||||
adaptiveSelection = in.readBoolean();
|
||||
} else {
|
||||
adaptiveSelection = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -297,5 +318,8 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
out.writeBoolean(script);
|
||||
out.writeBoolean(discovery);
|
||||
out.writeBoolean(ingest);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
|
||||
out.writeBoolean(adaptiveSelection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
|
|||
NodesStatsRequest request = nodeStatsRequest.request;
|
||||
return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(),
|
||||
request.fs(), request.transport(), request.http(), request.breaker(), request.script(), request.discovery(),
|
||||
request.ingest());
|
||||
request.ingest(), request.adaptiveSelection());
|
||||
}
|
||||
|
||||
public static class NodeStatsRequest extends BaseNodeRequest {
|
||||
|
|
|
@ -92,7 +92,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
|
|||
@Override
|
||||
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
|
||||
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
|
||||
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, true, true, true, false, true, false, false, false, false, false, false);
|
||||
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
|
||||
true, true, true, false, true, false, false, false, false, false, false, false);
|
||||
List<ShardStats> shardsStats = new ArrayList<>();
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
@ -94,6 +95,10 @@ public class SearchTransportService extends AbstractComponent {
|
|||
this.responseWrapper = responseWrapper;
|
||||
}
|
||||
|
||||
public Map<String, Long> getClientConnections() {
|
||||
return Collections.unmodifiableMap(clientConnections);
|
||||
}
|
||||
|
||||
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
|
||||
transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(originalIndices, contextId),
|
||||
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.node;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.ToXContent.Params;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Class representing statistics about adaptive replica selection. This includes
|
||||
* EWMA of queue size, service time, and response time, as well as outgoing
|
||||
* searches to each node and the "rank" based on the ARS formula.
|
||||
*/
|
||||
public class AdaptiveSelectionStats implements Writeable, ToXContentFragment {
|
||||
|
||||
private final Map<String, Long> clientOutgoingConnections;
|
||||
private final Map<String, ResponseCollectorService.ComputedNodeStats> nodeComputedStats;
|
||||
|
||||
public AdaptiveSelectionStats(Map<String, Long> clientConnections,
|
||||
Map<String, ResponseCollectorService.ComputedNodeStats> nodeComputedStats) {
|
||||
this.clientOutgoingConnections = clientConnections;
|
||||
this.nodeComputedStats = nodeComputedStats;
|
||||
}
|
||||
|
||||
public AdaptiveSelectionStats(StreamInput in) throws IOException {
|
||||
this.clientOutgoingConnections = in.readMap(StreamInput::readString, StreamInput::readLong);
|
||||
this.nodeComputedStats = in.readMap(StreamInput::readString, ResponseCollectorService.ComputedNodeStats::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeMap(this.clientOutgoingConnections, StreamOutput::writeString, StreamOutput::writeLong);
|
||||
out.writeMap(this.nodeComputedStats, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("adaptive_selection");
|
||||
Set<String> allNodeIds = Sets.union(clientOutgoingConnections.keySet(), nodeComputedStats.keySet());
|
||||
for (String nodeId : allNodeIds) {
|
||||
builder.startObject(nodeId);
|
||||
ResponseCollectorService.ComputedNodeStats stats = nodeComputedStats.get(nodeId);
|
||||
if (stats != null) {
|
||||
long outgoingSearches = clientOutgoingConnections.getOrDefault(nodeId, 0L);
|
||||
builder.field("outgoing_searches", outgoingSearches);
|
||||
builder.field("avg_queue_size", stats.queueSize);
|
||||
builder.timeValueField("avg_service_time_ns", "avg_service_time", (long) stats.serviceTime, TimeUnit.NANOSECONDS);
|
||||
builder.timeValueField("avg_response_time_ns", "avg_response_time", (long) stats.responseTime, TimeUnit.NANOSECONDS);
|
||||
builder.field("rank", String.format(Locale.ROOT, "%.1f", stats.rank(outgoingSearches)));
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of node id to the outgoing search requests to that node
|
||||
*/
|
||||
public Map<String, Long> getOutgoingConnections() {
|
||||
return clientOutgoingConnections;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of node id to the computed stats
|
||||
*/
|
||||
public Map<String, ResponseCollectorService.ComputedNodeStats> getComputedStats() {
|
||||
return nodeComputedStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of node id to the ranking of the nodes based on the adaptive replica formula
|
||||
*/
|
||||
public Map<String, Double> getRanks() {
|
||||
return nodeComputedStats.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey,
|
||||
e -> e.getValue().rank(clientOutgoingConnections.getOrDefault(e.getKey(), 0L))));
|
||||
}
|
||||
}
|
|
@ -447,7 +447,8 @@ public class Node implements Closeable {
|
|||
clusterModule.getAllocationService());
|
||||
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
|
||||
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
|
||||
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
|
||||
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
|
||||
searchTransportService);
|
||||
modules.add(b -> {
|
||||
b.bind(Node.class).toInstance(this);
|
||||
b.bind(NodeService.class).toInstance(nodeService);
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.search.SearchTransportService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -36,6 +37,7 @@ import org.elasticsearch.indices.IndicesService;
|
|||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.monitor.MonitorService;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -54,17 +56,19 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
private final CircuitBreakerService circuitBreakerService;
|
||||
private final IngestService ingestService;
|
||||
private final SettingsFilter settingsFilter;
|
||||
private ScriptService scriptService;
|
||||
private final ScriptService scriptService;
|
||||
private final HttpServerTransport httpServerTransport;
|
||||
|
||||
private final ResponseCollectorService responseCollectorService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
|
||||
private final Discovery discovery;
|
||||
|
||||
NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
|
||||
TransportService transportService, IndicesService indicesService, PluginsService pluginService,
|
||||
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
|
||||
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
|
||||
SettingsFilter settingsFilter) {
|
||||
TransportService transportService, IndicesService indicesService, PluginsService pluginService,
|
||||
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
|
||||
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
|
||||
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
|
||||
SearchTransportService searchTransportService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.monitorService = monitorService;
|
||||
|
@ -77,6 +81,8 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
this.ingestService = ingestService;
|
||||
this.settingsFilter = settingsFilter;
|
||||
this.scriptService = scriptService;
|
||||
this.responseCollectorService = responseCollectorService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
clusterService.addStateApplier(ingestService.getPipelineStore());
|
||||
clusterService.addStateApplier(ingestService.getPipelineExecutionService());
|
||||
}
|
||||
|
@ -99,7 +105,7 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
|
||||
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
|
||||
boolean fs, boolean transport, boolean http, boolean circuitBreaker,
|
||||
boolean script, boolean discoveryStats, boolean ingest) {
|
||||
boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection) {
|
||||
// for indices stats we want to include previous allocated shards stats as well (it will
|
||||
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
|
||||
return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(),
|
||||
|
@ -114,7 +120,8 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
circuitBreaker ? circuitBreakerService.stats() : null,
|
||||
script ? scriptService.stats() : null,
|
||||
discoveryStats ? discovery.stats() : null,
|
||||
ingest ? ingestService.getPipelineExecutionService().stats() : null
|
||||
ingest ? ingestService.getPipelineExecutionService().stats() : null,
|
||||
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getClientConnections()) : null
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,9 +26,13 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
|
@ -91,6 +95,10 @@ public final class ResponseCollectorService extends AbstractComponent implements
|
|||
return nodeStats;
|
||||
}
|
||||
|
||||
public AdaptiveSelectionStats getAdaptiveStats(Map<String, Long> clientSearchConnections) {
|
||||
return new AdaptiveSelectionStats(clientSearchConnections, getAllNodeStatistics());
|
||||
}
|
||||
|
||||
/**
|
||||
* Optionally return a {@code NodeStatistics} for the given nodeid, if
|
||||
* response information exists for the given node. Returns an empty
|
||||
|
@ -106,7 +114,7 @@ public final class ResponseCollectorService extends AbstractComponent implements
|
|||
* node's statistics. This includes the EWMA of queue size, response time,
|
||||
* and service time.
|
||||
*/
|
||||
public static class ComputedNodeStats {
|
||||
public static class ComputedNodeStats implements Writeable {
|
||||
// We store timestamps with nanosecond precision, however, the
|
||||
// formula specifies milliseconds, therefore we need to convert
|
||||
// the values so the times don't unduely weight the formula
|
||||
|
@ -120,12 +128,34 @@ public final class ResponseCollectorService extends AbstractComponent implements
|
|||
public final double responseTime;
|
||||
public final double serviceTime;
|
||||
|
||||
ComputedNodeStats(int clientNum, NodeStatistics nodeStats) {
|
||||
public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double responseTime, double serviceTime) {
|
||||
this.nodeId = nodeId;
|
||||
this.clientNum = clientNum;
|
||||
this.nodeId = nodeStats.nodeId;
|
||||
this.queueSize = (int) nodeStats.queueSize.getAverage();
|
||||
this.responseTime = nodeStats.responseTime.getAverage();
|
||||
this.serviceTime = nodeStats.serviceTime;
|
||||
this.queueSize = queueSize;
|
||||
this.responseTime = responseTime;
|
||||
this.serviceTime = serviceTime;
|
||||
}
|
||||
|
||||
ComputedNodeStats(int clientNum, NodeStatistics nodeStats) {
|
||||
this(nodeStats.nodeId, clientNum,
|
||||
(int) nodeStats.queueSize.getAverage(), nodeStats.responseTime.getAverage(), nodeStats.serviceTime);
|
||||
}
|
||||
|
||||
ComputedNodeStats(StreamInput in) throws IOException {
|
||||
this.nodeId = in.readString();
|
||||
this.clientNum = in.readInt();
|
||||
this.queueSize = in.readInt();
|
||||
this.responseTime = in.readDouble();
|
||||
this.serviceTime = in.readDouble();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(this.nodeId);
|
||||
out.writeInt(this.clientNum);
|
||||
out.writeInt(this.queueSize);
|
||||
out.writeDouble(this.responseTime);
|
||||
out.writeDouble(this.serviceTime);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,9 +163,9 @@ public final class ResponseCollectorService extends AbstractComponent implements
|
|||
* https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf
|
||||
*/
|
||||
private double innerRank(long outstandingRequests) {
|
||||
// this is a placeholder value, the concurrency compensation is
|
||||
// defined as the number of outstanding requests from the client
|
||||
// to the node times the number of clients in the system
|
||||
// the concurrency compensation is defined as the number of
|
||||
// outstanding requests from the client to the node times the number
|
||||
// of clients in the system
|
||||
double concurrencyCompensation = outstandingRequests * clientNum;
|
||||
|
||||
// Cubic queue adjustment factor. The paper chose 3 though we could
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.elasticsearch.monitor.fs.FsInfo;
|
|||
import org.elasticsearch.monitor.jvm.JvmStats;
|
||||
import org.elasticsearch.monitor.os.OsStats;
|
||||
import org.elasticsearch.monitor.process.ProcessStats;
|
||||
import org.elasticsearch.node.AdaptiveSelectionStats;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.script.ScriptStats;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -46,6 +48,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
||||
|
@ -278,6 +281,22 @@ public class NodeStatsTests extends ESTestCase {
|
|||
assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount());
|
||||
}
|
||||
}
|
||||
AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats();
|
||||
AdaptiveSelectionStats deserializedAdaptiveStats = deserializedNodeStats.getAdaptiveSelectionStats();
|
||||
if (adaptiveStats == null) {
|
||||
assertNull(deserializedAdaptiveStats);
|
||||
} else {
|
||||
assertEquals(adaptiveStats.getOutgoingConnections(), deserializedAdaptiveStats.getOutgoingConnections());
|
||||
assertEquals(adaptiveStats.getRanks(), deserializedAdaptiveStats.getRanks());
|
||||
adaptiveStats.getComputedStats().forEach((k, v) -> {
|
||||
ResponseCollectorService.ComputedNodeStats aStats = adaptiveStats.getComputedStats().get(k);
|
||||
ResponseCollectorService.ComputedNodeStats bStats = deserializedAdaptiveStats.getComputedStats().get(k);
|
||||
assertEquals(aStats.nodeId, bStats.nodeId);
|
||||
assertEquals(aStats.queueSize, bStats.queueSize, 0.01);
|
||||
assertEquals(aStats.serviceTime, bStats.serviceTime, 0.01);
|
||||
assertEquals(aStats.responseTime, bStats.responseTime, 0.01);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -407,8 +426,31 @@ public class NodeStatsTests extends ESTestCase {
|
|||
}
|
||||
ingestStats = new IngestStats(totalStats, statsPerPipeline);
|
||||
}
|
||||
AdaptiveSelectionStats adaptiveSelectionStats = null;
|
||||
if (frequently()) {
|
||||
int numNodes = randomIntBetween(0,10);
|
||||
Map<String, Long> nodeConnections = new HashMap<>();
|
||||
Map<String, ResponseCollectorService.ComputedNodeStats> nodeStats = new HashMap<>();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
String nodeId = randomAlphaOfLengthBetween(3, 10);
|
||||
// add outgoing connection info
|
||||
if (frequently()) {
|
||||
nodeConnections.put(nodeId, randomLongBetween(0, 100));
|
||||
}
|
||||
// add node calculations
|
||||
if (frequently()) {
|
||||
ResponseCollectorService.ComputedNodeStats stats = new ResponseCollectorService.ComputedNodeStats(nodeId,
|
||||
randomIntBetween(1,10), randomIntBetween(0, 2000),
|
||||
randomDoubleBetween(1.0, 10000000.0, true),
|
||||
randomDoubleBetween(1.0, 10000000.0, true));
|
||||
nodeStats.put(nodeId, stats);
|
||||
}
|
||||
}
|
||||
adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats);
|
||||
}
|
||||
//TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
|
||||
return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats, fsInfo,
|
||||
transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats);
|
||||
return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats,
|
||||
fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
|
||||
ingestStats, adaptiveSelectionStats);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -152,11 +152,11 @@ public class DiskUsageTests extends ESTestCase {
|
|||
};
|
||||
List<NodeStats> nodeStats = Arrays.asList(
|
||||
new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null),
|
||||
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null),
|
||||
new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null),
|
||||
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null),
|
||||
new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null)
|
||||
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null)
|
||||
);
|
||||
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
|
||||
DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
|
||||
|
@ -193,11 +193,11 @@ public class DiskUsageTests extends ESTestCase {
|
|||
};
|
||||
List<NodeStats> nodeStats = Arrays.asList(
|
||||
new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null),
|
||||
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null),
|
||||
new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null),
|
||||
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null),
|
||||
new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null)
|
||||
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null)
|
||||
);
|
||||
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
|
||||
DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");
|
||||
|
|
|
@ -67,7 +67,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
|
|||
null, null, null, null, null,
|
||||
fsInfo,
|
||||
null, null, null,
|
||||
null, null, null);
|
||||
null, null, null, null);
|
||||
}
|
||||
|
||||
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
|
||||
|
|
|
@ -2069,7 +2069,8 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
|
||||
CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments);
|
||||
NodeStats stats = nodeService.stats(flags, false, false, false, false, false, false, false, false, false, false, false);
|
||||
NodeStats stats = nodeService.stats(flags,
|
||||
false, false, false, false, false, false, false, false, false, false, false, false);
|
||||
assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
|
||||
assertThat("Query cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
|
||||
assertThat("FixedBitSet cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getSegments().getBitsetMemoryInBytes(), equalTo(0L));
|
||||
|
|
Loading…
Reference in New Issue