Use and test relative time in TransportBulkAction

This commit modifies TransportBulkAction to use relative time instead of
absolute time when measuring how long a bulk request took to be
processed, and adds tests for this functionality.

Closes #16916
This commit is contained in:
Jason Tedor 2016-03-02 10:51:31 -05:00
parent e75a0da4d5
commit 166259db64
4 changed files with 309 additions and 20 deletions

View File

@ -60,8 +60,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
/**
*
@ -73,27 +76,41 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final ClusterService clusterService;
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider;
@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
this(settings, threadPool, transportService, clusterService,
shardBulkAction, createIndexAction,
actionFilters, indexNameExpressionResolver,
autoCreateIndex,
System::nanoTime);
}
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, BulkRequest::new);
Objects.requireNonNull(relativeTimeProvider);
this.clusterService = clusterService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
this.allowIdGeneration = this.settings.getAsBoolean("action.bulk.action.allow_id_generation", true);
this.relativeTimeProvider = relativeTimeProvider;
}
@Override
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
if (autoCreateIndex.needToCheck()) {
if (needToCheck()) {
// Keep track of all unique indices and all unique types per index for the create index requests:
final Map<String, Set<String>> indicesAndTypes = new HashMap<>();
for (ActionRequest request : bulkRequest.requests) {
@ -112,7 +129,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
ClusterState state = clusterService.state();
for (Map.Entry<String, Set<String>> entry : indicesAndTypes.entrySet()) {
final String index = entry.getKey();
if (autoCreateIndex.shouldAutoCreate(index, state)) {
if (shouldAutoCreate(index, state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
for (String type : entry.getValue()) {
@ -163,6 +180,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
boolean needToCheck() {
return autoCreateIndex.needToCheck();
}
boolean shouldAutoCreate(String index, ClusterState state) {
return autoCreateIndex.shouldAutoCreate(index, state);
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Throwable e) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
@ -195,16 +220,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
* @see #doExecute(BulkRequest, org.elasticsearch.action.ActionListener)
*/
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();
executeBulk(bulkRequest, startTime, listener, new AtomicArray<>(bulkRequest.requests.size()));
final long startTimeNanos = relativeTime();
executeBulk(bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size()));
}
private long buildTookInMillis(long startTime) {
// protect ourselves against time going backwards
return Math.max(1, System.currentTimeMillis() - startTime);
private long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
void executeBulk(final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
final ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
@ -302,7 +326,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime)));
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
return;
}
@ -352,7 +376,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime)));
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
}
});
}
@ -398,7 +422,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return false;
}
private static class ConcreteIndices {
private final ClusterState state;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@ -422,4 +445,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return concreteIndex;
}
}
private long relativeTime() {
return relativeTimeProvider.getAsLong();
}
}

View File

@ -0,0 +1,256 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
public class TransportBulkActionTookTests extends ESTestCase {
private ThreadPool threadPool;
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = mock(ThreadPool.class);
}
private TransportBulkAction createAction(boolean controlled, AtomicLong expected) {
CapturingTransport capturingTransport = new CapturingTransport();
ClusterService clusterService = new TestClusterService(threadPool);
TransportService transportService = new TransportService(capturingTransport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);
ActionFilters actionFilters = new ActionFilters(new HashSet<>());
TransportCreateIndexAction createIndexAction = new TransportCreateIndexAction(
Settings.EMPTY,
transportService,
clusterService,
threadPool,
null,
actionFilters,
resolver);
if (controlled) {
return new TestTransportBulkAction(
Settings.EMPTY,
threadPool,
transportService,
clusterService,
null,
createIndexAction,
actionFilters,
resolver,
null,
expected::get) {
@Override
public void executeBulk(BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
expected.set(1000000);
super.executeBulk(bulkRequest, listener);
}
@Override
void executeBulk(
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses) {
expected.set(1000000);
super.executeBulk(bulkRequest, startTimeNanos, listener, responses);
}
};
} else {
return new TestTransportBulkAction(
Settings.EMPTY,
threadPool,
transportService,
clusterService,
null,
createIndexAction,
actionFilters,
resolver,
null,
System::nanoTime) {
@Override
public void executeBulk(BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long elapsed = spinForAtLeastOneMillisecond();
expected.set(elapsed);
super.executeBulk(bulkRequest, listener);
}
@Override
void executeBulk(
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses) {
long elapsed = spinForAtLeastOneMillisecond();
expected.set(elapsed);
super.executeBulk(bulkRequest, startTimeNanos, listener, responses);
}
};
}
}
// test unit conversion with a controlled clock
public void testTookWithControlledClock() throws Exception {
runTestTook(true);
}
// test took advances with System#nanoTime
public void testTookWithRealClock() throws Exception {
runTestTook(false);
}
private void runTestTook(boolean controlled) throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk.json");
// translate Windows line endings (\r\n) to standard ones (\n)
if (Constants.WINDOWS) {
bulkAction = Strings.replace(bulkAction, "\r\n", "\n");
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null);
AtomicLong expected = new AtomicLong();
TransportBulkAction action = createAction(controlled, expected);
action.doExecute(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (controlled) {
assertThat(
bulkItemResponses.getTook().getMillis(),
equalTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS)));
} else {
assertThat(
bulkItemResponses.getTook().getMillis(),
greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS)));
}
}
@Override
public void onFailure(Throwable e) {
}
});
}
static class Resolver extends IndexNameExpressionResolver {
public Resolver(Settings settings) {
super(settings);
}
@Override
public String[] concreteIndices(ClusterState state, IndicesRequest request) {
return request.indices();
}
}
static class TestTransportBulkAction extends TransportBulkAction {
public TestTransportBulkAction(
Settings settings,
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
TransportShardBulkAction shardBulkAction,
TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
LongSupplier relativeTimeProvider) {
super(
settings,
threadPool,
transportService,
clusterService,
shardBulkAction,
createIndexAction,
actionFilters,
indexNameExpressionResolver,
autoCreateIndex,
relativeTimeProvider);
}
@Override
boolean needToCheck() {
return randomBoolean();
}
@Override
boolean shouldAutoCreate(String index, ClusterState state) {
return randomBoolean();
}
}
static class TestTransportCreateIndexAction extends TransportCreateIndexAction {
public TestTransportCreateIndexAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(Task task, CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) {
listener.onResponse(newResponse());
}
}
}

View File

@ -46,20 +46,13 @@ public class PrioritizedRunnableTests extends ESTestCase {
// test age advances with System#nanoTime
public void testGetAgeInMillisWithRealClock() throws InterruptedException {
long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
PrioritizedRunnable runnable = new PrioritizedRunnable(Priority.NORMAL) {
@Override
public void run() {
}
};
// force at least one millisecond to elapse, but ensure the
// clock has enough resolution to observe the passage of time
long start = System.nanoTime();
long elapsed;
while ((elapsed = (System.nanoTime() - start)) < nanosecondsInMillisecond) {
// busy spin
}
long elapsed = spinForAtLeastOneMillisecond();
// creation happened before start, so age will be at least as
// large as elapsed

View File

@ -651,4 +651,16 @@ public abstract class ESTestCase extends LuceneTestCase {
}
throw new AssertionFailedError("Expected exception " + expectedType.getSimpleName());
}
protected static long spinForAtLeastOneMillisecond() {
long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
// force at least one millisecond to elapse, but ensure the
// clock has enough resolution to observe the passage of time
long start = System.nanoTime();
long elapsed;
while ((elapsed = (System.nanoTime() - start)) < nanosecondsInMillisecond) {
// busy spin
}
return elapsed;
}
}