Bridging the gap in network overhead measurement in the profiler (#1360)

* Bridging the gap in network overhead measurement in the profiler

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* add tests + remove total n/w time field

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* refactor network time into a class

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* gradle test fix

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
This commit is contained in:
Poojita Raj 2021-12-16 19:50:04 -08:00 committed by GitHub
parent 6cc462b92d
commit 4b97713a2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 265 additions and 27 deletions

View File

@ -0,0 +1,72 @@
/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.search.profile;
import org.apache.lucene.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
public class ProfilerSingleNodeNetworkTest extends OpenSearchSingleNodeTestCase {
/**
* This test checks to make sure in a single node cluster, the network time
* is 0 as expected in the profiler for inbound an doutbound network time.
*/
public void testProfilerNetworkTime() throws Exception {
createIndex("test");
ensureGreen();
int numDocs = randomIntBetween(100, 150);
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex("test", "type1", String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
}
List<String> stringFields = Arrays.asList("field1");
List<String> numericFields = Arrays.asList("field2");
int iters = between(20, 100);
for (int i = 0; i < iters; i++) {
QueryBuilder q = randomQueryBuilder(stringFields, numericFields, numDocs, 3);
logger.info("Query: {}", q);
SearchResponse resp = client().prepareSearch()
.setQuery(q)
.setTrackTotalHits(true)
.setProfile(true)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.get();
assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));
for (Map.Entry<String, ProfileShardResult> shard : resp.getProfileResults().entrySet()) {
assertThat(
"Profile response inbound network time should be 0 in single node clusters",
shard.getValue().getNetworkTime().getInboundNetworkTime(),
is(0L)
);
assertThat(
"Profile response outbound network time should be 0 in single node clusters",
shard.getValue().getNetworkTime().getOutboundNetworkTime(),
is(0L)
);
}
}
}
}

View File

@ -34,11 +34,7 @@ package org.opensearch.search.profile.query;
import org.apache.lucene.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.search.*;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
@ -48,18 +44,15 @@ import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
public class QueryProfilerIT extends OpenSearchIntegTestCase {
@ -98,6 +91,8 @@ public class QueryProfilerIT extends OpenSearchIntegTestCase {
assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));
for (Map.Entry<String, ProfileShardResult> shard : resp.getProfileResults().entrySet()) {
assertThat(shard.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
assertThat(shard.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
for (QueryProfileShardResult searchProfiles : shard.getValue().getQueryProfileResults()) {
for (ProfileResult result : searchProfiles.getQueryResults()) {
assertNotNull(result.getQueryName());

View File

@ -35,6 +35,7 @@ package org.opensearch.action.search;
import org.opensearch.action.ActionListener;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.fetch.QueryFetchSearchResult;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.transport.Transport;
@ -66,7 +67,24 @@ public final class SearchExecutionStatsCollector implements ActionListener<Searc
@Override
public void onResponse(SearchPhaseResult response) {
if (response instanceof QueryFetchSearchResult) {
response.queryResult().getShardSearchRequest().setOutboundNetworkTime(0);
response.queryResult().getShardSearchRequest().setInboundNetworkTime(0);
}
QuerySearchResult queryResult = response.queryResult();
if (response.getShardSearchRequest() != null) {
if (response.remoteAddress() != null) {
// update outbound network time for request sent over network for shard requests
response.getShardSearchRequest()
.setOutboundNetworkTime(
Math.max(0, System.currentTimeMillis() - response.getShardSearchRequest().getOutboundNetworkTime())
);
} else {
// reset inbound and outbound network time to 0 for local request for shard requests
response.getShardSearchRequest().setOutboundNetworkTime(0);
response.getShardSearchRequest().setInboundNetworkTime(0);
}
}
if (nodeId != null && queryResult != null) {
final long serviceTimeEWMA = queryResult.serviceTimeEWMA();
final int queueSize = queryResult.nodeQueueSize();

View File

@ -121,6 +121,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
final SearchActionListener<SearchPhaseResult> listener
) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
// update inbound network time with current time before sending request over n/w to data node
if (request != null) {
request.setInboundNetworkTime(System.currentTimeMillis());
}
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
}

View File

@ -33,6 +33,7 @@
package org.opensearch.search.internal;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchRequest;
@ -90,6 +91,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
private final float indexBoost;
private final Boolean requestCache;
private final long nowInMillis;
private long inboundNetworkTime;
private long outboundNetworkTime;
private final boolean allowPartialSearchResults;
private final String[] indexRoutings;
private final String preference;
@ -221,6 +224,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
this.preference = preference;
this.scroll = scroll;
this.nowInMillis = nowInMillis;
this.inboundNetworkTime = 0;
this.outboundNetworkTime = 0;
this.clusterAlias = clusterAlias;
this.originalIndices = originalIndices;
this.readerId = readerId;
@ -240,6 +245,10 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
indexBoost = in.readFloat();
nowInMillis = in.readVLong();
requestCache = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
inboundNetworkTime = in.readVLong();
outboundNetworkTime = in.readVLong();
}
clusterAlias = in.readOptionalString();
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) {
allowPartialSearchResults = in.readBoolean();
@ -283,6 +292,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
this.aliasFilter = clone.aliasFilter;
this.indexBoost = clone.indexBoost;
this.nowInMillis = clone.nowInMillis;
this.inboundNetworkTime = clone.inboundNetworkTime;
this.outboundNetworkTime = clone.outboundNetworkTime;
this.requestCache = clone.requestCache;
this.clusterAlias = clone.clusterAlias;
this.allowPartialSearchResults = clone.allowPartialSearchResults;
@ -317,6 +328,10 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
out.writeVLong(nowInMillis);
}
out.writeOptionalBoolean(requestCache);
if (asKey == false && out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeVLong(inboundNetworkTime);
out.writeVLong(outboundNetworkTime);
}
out.writeOptionalString(clusterAlias);
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) {
out.writeBoolean(allowPartialSearchResults);
@ -395,6 +410,22 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
return nowInMillis;
}
public long getInboundNetworkTime() {
return inboundNetworkTime;
}
public void setInboundNetworkTime(long newTime) {
this.inboundNetworkTime = newTime;
}
public long getOutboundNetworkTime() {
return outboundNetworkTime;
}
public void setOutboundNetworkTime(long newTime) {
this.outboundNetworkTime = newTime;
}
public Boolean requestCache() {
return requestCache;
}

View File

@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.search.profile;
import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import java.io.IOException;
public class NetworkTime implements Writeable {
private long inboundNetworkTime;
private long outboundNetworkTime;
public NetworkTime(long inboundTime, long outboundTime) {
this.inboundNetworkTime = inboundTime;
this.outboundNetworkTime = outboundTime;
}
public NetworkTime(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
this.inboundNetworkTime = in.readVLong();
this.outboundNetworkTime = in.readVLong();
}
}
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeVLong(inboundNetworkTime);
out.writeVLong(outboundNetworkTime);
}
}
public long getInboundNetworkTime() {
return this.inboundNetworkTime;
}
public long getOutboundNetworkTime() {
return this.outboundNetworkTime;
}
public void setInboundNetworkTime(long newTime) {
this.inboundNetworkTime = newTime;
}
public void setOutboundNetworkTime(long newTime) {
this.outboundNetworkTime = newTime;
}
}

View File

@ -49,9 +49,16 @@ public class ProfileShardResult implements Writeable {
private final AggregationProfileShardResult aggProfileShardResult;
public ProfileShardResult(List<QueryProfileShardResult> queryProfileResults, AggregationProfileShardResult aggProfileShardResult) {
private NetworkTime networkTime;
public ProfileShardResult(
List<QueryProfileShardResult> queryProfileResults,
AggregationProfileShardResult aggProfileShardResult,
NetworkTime networkTime
) {
this.aggProfileShardResult = aggProfileShardResult;
this.queryProfileResults = Collections.unmodifiableList(queryProfileResults);
this.networkTime = networkTime;
}
public ProfileShardResult(StreamInput in) throws IOException {
@ -63,6 +70,7 @@ public class ProfileShardResult implements Writeable {
}
this.queryProfileResults = Collections.unmodifiableList(queryProfileResults);
this.aggProfileShardResult = new AggregationProfileShardResult(in);
this.networkTime = new NetworkTime(in);
}
@Override
@ -72,6 +80,7 @@ public class ProfileShardResult implements Writeable {
queryShardResult.writeTo(out);
}
aggProfileShardResult.writeTo(out);
networkTime.writeTo(out);
}
public List<QueryProfileShardResult> getQueryProfileResults() {
@ -81,4 +90,14 @@ public class ProfileShardResult implements Writeable {
public AggregationProfileShardResult getAggregationProfileResults() {
return aggProfileShardResult;
}
public NetworkTime getNetworkTime() {
return networkTime;
}
public void setNetworkTime(NetworkTime newTime) {
networkTime.setInboundNetworkTime(newTime.getInboundNetworkTime());
networkTime.setOutboundNetworkTime(newTime.getOutboundNetworkTime());
}
}

View File

@ -38,6 +38,7 @@ import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.profile.aggregation.AggregationProfileShardResult;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.query.QueryProfileShardResult;
@ -58,11 +59,12 @@ import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedT
* holds a map of shard ID -&gt; Profiled results
*/
public final class SearchProfileShardResults implements Writeable, ToXContentFragment {
private static final String SEARCHES_FIELD = "searches";
private static final String ID_FIELD = "id";
private static final String SHARDS_FIELD = "shards";
public static final String PROFILE_FIELD = "profile";
public static final String INBOUND_NETWORK_FIELD = "inbound_network_time_in_millis";
public static final String OUTBOUND_NETWORK_FIELD = "outbound_network_time_in_millis";
private Map<String, ProfileShardResult> shardResults;
@ -104,6 +106,8 @@ public final class SearchProfileShardResults implements Writeable, ToXContentFra
for (String key : sortedKeys) {
builder.startObject();
builder.field(ID_FIELD, key);
builder.field(INBOUND_NETWORK_FIELD, shardResults.get(key).getNetworkTime().getInboundNetworkTime());
builder.field(OUTBOUND_NETWORK_FIELD, shardResults.get(key).getNetworkTime().getOutboundNetworkTime());
builder.startArray(SEARCHES_FIELD);
ProfileShardResult profileShardResult = shardResults.get(key);
for (QueryProfileShardResult result : profileShardResult.getQueryProfileResults()) {
@ -145,12 +149,18 @@ public final class SearchProfileShardResults implements Writeable, ToXContentFra
AggregationProfileShardResult aggProfileShardResult = null;
String id = null;
String currentFieldName = null;
long inboundNetworkTime = 0;
long outboundNetworkTime = 0;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (ID_FIELD.equals(currentFieldName)) {
id = parser.text();
} else if (INBOUND_NETWORK_FIELD.equals(currentFieldName)) {
inboundNetworkTime = parser.longValue();
} else if (OUTBOUND_NETWORK_FIELD.equals(currentFieldName)) {
outboundNetworkTime = parser.longValue();
} else {
parser.skipChildren();
}
@ -168,7 +178,8 @@ public final class SearchProfileShardResults implements Writeable, ToXContentFra
parser.skipChildren();
}
}
searchProfileResults.put(id, new ProfileShardResult(queryProfileResults, aggProfileShardResult));
NetworkTime networkTime = new NetworkTime(inboundNetworkTime, outboundNetworkTime);
searchProfileResults.put(id, new ProfileShardResult(queryProfileResults, aggProfileShardResult, networkTime));
}
/**
@ -180,7 +191,7 @@ public final class SearchProfileShardResults implements Writeable, ToXContentFra
* @return A {@link ProfileShardResult} representing the results for this
* shard
*/
public static ProfileShardResult buildShardResults(Profilers profilers) {
public static ProfileShardResult buildShardResults(Profilers profilers, ShardSearchRequest request) {
List<QueryProfiler> queryProfilers = profilers.getQueryProfilers();
AggregationProfiler aggProfiler = profilers.getAggregationProfiler();
List<QueryProfileShardResult> queryResults = new ArrayList<>(queryProfilers.size());
@ -193,6 +204,11 @@ public final class SearchProfileShardResults implements Writeable, ToXContentFra
queryResults.add(result);
}
AggregationProfileShardResult aggResults = new AggregationProfileShardResult(aggProfiler.getTree());
return new ProfileShardResult(queryResults, aggResults);
NetworkTime networkTime = new NetworkTime(0, 0);
if (request != null) {
networkTime.setInboundNetworkTime(request.getInboundNetworkTime());
networkTime.setOutboundNetworkTime(request.getOutboundNetworkTime());
}
return new ProfileShardResult(queryResults, aggResults, networkTime);
}
}

View File

@ -168,7 +168,10 @@ public class QueryPhase {
aggregationPhase.execute(searchContext);
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(
searchContext.getProfilers(),
searchContext.request()
);
searchContext.queryResult().profileResults(shardResults);
}
}

View File

@ -32,12 +32,6 @@
package org.opensearch.search.query;
import static java.util.Collections.emptyList;
import static org.opensearch.common.lucene.Lucene.readTopDocs;
import static org.opensearch.common.lucene.Lucene.writeTopDocs;
import java.io.IOException;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TotalHits;
import org.opensearch.LegacyESVersion;
@ -54,9 +48,16 @@ import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.profile.NetworkTime;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.suggest.Suggest;
import java.io.IOException;
import static java.util.Collections.emptyList;
import static org.opensearch.common.lucene.Lucene.readTopDocs;
import static org.opensearch.common.lucene.Lucene.writeTopDocs;
public final class QuerySearchResult extends SearchPhaseResult {
private int from;
@ -245,6 +246,11 @@ public final class QuerySearchResult extends SearchPhaseResult {
throw new IllegalStateException("profile results already consumed");
}
ProfileShardResult result = profileShardResults;
NetworkTime newNetworkTime = new NetworkTime(
this.getShardSearchRequest().getInboundNetworkTime(),
this.getShardSearchRequest().getOutboundNetworkTime()
);
result.setNetworkTime(newNetworkTime);
profileShardResults = null;
return result;
}

View File

@ -36,6 +36,7 @@ import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
@ -83,6 +84,12 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
Releasable unregisterTask = () -> taskManager.unregister(task);
try {
if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {
if (request instanceof ShardSearchRequest) {
// on receiving request, update the inbound network time to reflect time spent in transit over the network
((ShardSearchRequest) request).setInboundNetworkTime(
Math.max(0, System.currentTimeMillis() - ((ShardSearchRequest) request).getInboundNetworkTime())
);
}
final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
unregisterTask = Releasables.wrap(unregisterTask, stopTracking);

View File

@ -34,6 +34,7 @@ package org.opensearch.transport;
import org.opensearch.Version;
import org.opensearch.common.lease.Releasable;
import org.opensearch.search.query.QuerySearchResult;
import java.io.IOException;
import java.util.Set;
@ -82,6 +83,10 @@ public final class TcpTransportChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response) throws IOException {
try {
if (response instanceof QuerySearchResult && ((QuerySearchResult) response).getShardSearchRequest() != null) {
// update outbound network time with current time before sending response over network
((QuerySearchResult) response).getShardSearchRequest().setOutboundNetworkTime(System.currentTimeMillis());
}
outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressResponse, isHandshake);
} finally {
release(false);

View File

@ -59,6 +59,8 @@ public class SearchProfileShardResultsTests extends OpenSearchTestCase {
public static SearchProfileShardResults createTestItem() {
int size = rarely() ? 0 : randomIntBetween(1, 2);
long inboundTime = 0;
long outboundTime = 0;
Map<String, ProfileShardResult> searchProfileResults = new HashMap<>(size);
for (int i = 0; i < size; i++) {
List<QueryProfileShardResult> queryProfileResults = new ArrayList<>();
@ -67,7 +69,11 @@ public class SearchProfileShardResultsTests extends OpenSearchTestCase {
queryProfileResults.add(QueryProfileShardResultTests.createTestItem());
}
AggregationProfileShardResult aggProfileShardResult = AggregationProfileShardResultTests.createTestItem(1);
searchProfileResults.put(randomAlphaOfLengthBetween(5, 10), new ProfileShardResult(queryProfileResults, aggProfileShardResult));
NetworkTime networkTime = new NetworkTime(inboundTime, outboundTime);
searchProfileResults.put(
randomAlphaOfLengthBetween(5, 10),
new ProfileShardResult(queryProfileResults, aggProfileShardResult, networkTime)
);
}
return new SearchProfileShardResults(searchProfileResults);
}