Introducing took time for _msearch

This commit adds the took time to the response for _msearch.

Relates #23767
This commit is contained in:
olcbean 2017-11-02 02:39:04 +01:00 committed by Jason Tedor
parent 59657ad1cb
commit b9896465cd
6 changed files with 262 additions and 23 deletions

View File

@ -21,12 +21,14 @@ package org.elasticsearch.action.search;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -112,11 +114,14 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
private Item[] items;
private long tookInMillis;
MultiSearchResponse() {
}
public MultiSearchResponse(Item[] items) {
public MultiSearchResponse(Item[] items, long tookInMillis) {
this.items = items;
this.tookInMillis = tookInMillis;
}
@Override
@ -131,6 +136,13 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
return this.items;
}
/**
* How long the msearch took.
*/
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -138,6 +150,9 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
for (int i = 0; i < items.length; i++) {
items[i] = Item.readItem(in);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
tookInMillis = in.readVLong();
}
}
@Override
@ -147,11 +162,15 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
for (Item item : items) {
item.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(tookInMillis);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();

View File

@ -34,16 +34,18 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {
private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;
@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
@ -53,19 +55,23 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
}
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver resolver, int availableProcessors) {
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
}
@Override
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
final long relativeStartTime = relativeTimeProvider.getAsLong();
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
@ -85,7 +91,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
final AtomicInteger responseCounter = new AtomicInteger(numRequests);
int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches);
for (int i = 0; i < numConcurrentSearches; i++) {
executeSearch(searchRequestSlots, responses, responseCounter, listener);
executeSearch(searchRequestSlots, responses, responseCounter, listener, relativeStartTime);
}
}
@ -111,11 +117,12 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
* @param responseCounter incremented on each response
* @param listener the listener attached to the multi-search request
*/
private void executeSearch(
void executeSearch(
final Queue<SearchRequestSlot> requests,
final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter,
final ActionListener<MultiSearchResponse> listener) {
final ActionListener<MultiSearchResponse> listener,
final long relativeStartTime) {
SearchRequestSlot request = requests.poll();
if (request == null) {
/*
@ -155,16 +162,25 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
} else {
if (thread == Thread.currentThread()) {
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener));
threadPool.generic()
.execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
executeSearch(requests, responses, responseCounter, listener);
executeSearch(requests, responses, responseCounter, listener, relativeStartTime);
}
}
}
private void finish() {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]),
buildTookInMillis()));
}
/**
* Builds how long it took to execute the msearch.
*/
private long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime);
}
});
}
@ -178,7 +194,5 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
this.request = request;
this.responseSlot = responseSlot;
}
}
}

View File

@ -101,7 +101,8 @@ public class ExpandSearchPhaseTests extends ESTestCase {
mSearchResponses.add(new MultiSearchResponse.Item(response, null));
}
listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0])));
listener.onResponse(
new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000)));
}
};
@ -153,10 +154,11 @@ public class ExpandSearchPhaseTests extends ESTestCase {
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
listener.onResponse(new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}));
}, randomIntBetween(1, 10000)));
}
};

View File

@ -0,0 +1,199 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* MultiSearch took time tests
*/
public class MultiSearchActionTookTests extends ESTestCase {
private ThreadPool threadPool;
private ClusterService clusterService;
@BeforeClass
public static void beforeClass() {
}
@AfterClass
public static void afterClass() {
}
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("MultiSearchActionTookTests");
clusterService = createClusterService(threadPool);
}
@After
public void tearDown() throws Exception {
clusterService.close();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
super.tearDown();
}
// test unit conversion using a controller clock
public void testTookWithControlledClock() throws Exception {
runTestTook(true);
}
// test using System#nanoTime
public void testTookWithRealClock() throws Exception {
runTestTook(false);
}
private void runTestTook(boolean controlledClock) throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest());
AtomicLong expected = new AtomicLong();
TransportMultiSearchAction action = createTransportMultiSearchAction(controlledClock, expected);
action.doExecute(multiSearchRequest, new ActionListener<MultiSearchResponse>() {
@Override
public void onResponse(MultiSearchResponse multiSearchResponse) {
if (controlledClock) {
assertThat(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS),
equalTo(multiSearchResponse.getTook().getMillis()));
} else {
assertThat(multiSearchResponse.getTook().getMillis(),
greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS)));
}
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
});
}
private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, AtomicLong expected) {
Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build();
TaskManager taskManager = mock(TaskManager.class);
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};
ActionFilters actionFilters = new ActionFilters(new HashSet<>());
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build());
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);
final int availableProcessors = Runtime.getRuntime().availableProcessors();
AtomicInteger counter = new AtomicInteger();
final List<String> threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME);
Randomness.shuffle(threadPoolNames);
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>(Settings.EMPTY,
"action", threadPool, actionFilters, resolver, taskManager) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
requests.add(request);
commonExecutor.execute(() -> {
counter.decrementAndGet();
listener.onResponse(new SearchResponse());
});
}
};
if (controlledClock) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver,
availableProcessors, expected::get) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener, long startTimeInNanos) {
expected.set(1000000);
super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos);
}
};
} else {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver,
availableProcessors, System::nanoTime) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener, long startTimeInNanos) {
long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10));
expected.set(elapsed);
super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos);
}
};
}
}
static class Resolver extends IndexNameExpressionResolver {
Resolver(Settings settings) {
super(settings);
}
@Override
public String[] concreteIndexNames(ClusterState state, IndicesRequest request) {
return request.indices();
}
}
}

View File

@ -146,13 +146,16 @@ public class MultiSearchRequestTests extends ESTestCase {
}
public void testResponseErrorToXContent() throws IOException {
long tookInMillis = randomIntBetween(1, 1000);
MultiSearchResponse response = new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item[] {
new MultiSearchResponse.Item(null, new IllegalStateException("foobar")),
new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz"))
});
}, tookInMillis);
assertEquals("{\"responses\":["
assertEquals("{\"took\":"
+ tookInMillis
+ ",\"responses\":["
+ "{"
+ "\"error\":{\"root_cause\":[{\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"}],"
+ "\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"},\"status\":500"

View File

@ -102,8 +102,10 @@ public class TransportMultiSearchActionTests extends ESTestCase {
});
}
};
TransportMultiSearchAction action =
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10);
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10,
System::nanoTime);
// Execute the multi search api and fail if we find an error after executing:
try {