diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b2d58203954..707bf8de57f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -60,8 +60,11 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; /** * @@ -73,27 +76,41 @@ public class TransportBulkAction extends HandledTransportAction listener) { - final long startTime = System.currentTimeMillis(); + final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); - if (autoCreateIndex.needToCheck()) { + if (needToCheck()) { // Keep track of all unique indices and all unique types per index for the create index requests: final Map> indicesAndTypes = new HashMap<>(); for (ActionRequest request : bulkRequest.requests) { @@ -112,7 +129,7 @@ public class TransportBulkAction extends HandledTransportAction> entry : indicesAndTypes.entrySet()) { final String index = entry.getKey(); - if (autoCreateIndex.shouldAutoCreate(index, state)) { + if (shouldAutoCreate(index, state)) { CreateIndexRequest createIndexRequest = new CreateIndexRequest(); createIndexRequest.index(index); for (String type : entry.getValue()) { @@ -163,6 +180,14 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, ActionRequest request, String index, Throwable e) { if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; @@ -195,16 +220,15 @@ public class TransportBulkAction extends HandledTransportAction listener) { - final long startTime = System.currentTimeMillis(); - executeBulk(bulkRequest, startTime, listener, new AtomicArray<>(bulkRequest.requests.size())); + final long startTimeNanos = relativeTime(); + executeBulk(bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size())); } - private long buildTookInMillis(long startTime) { - // protect ourselves against time going backwards - return Math.max(1, System.currentTimeMillis() - startTime); + private long buildTookInMillis(long startTimeNanos) { + return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); } - private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener listener, final AtomicArray responses ) { + void executeBulk(final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses ) { final ClusterState clusterState = clusterService.state(); // TODO use timeout to wait here if its blocked... clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); @@ -302,7 +326,7 @@ public class TransportBulkAction extends HandledTransportAction()); + + TransportCreateIndexAction createIndexAction = new TransportCreateIndexAction( + Settings.EMPTY, + transportService, + clusterService, + threadPool, + null, + actionFilters, + resolver); + + if (controlled) { + + return new TestTransportBulkAction( + Settings.EMPTY, + threadPool, + transportService, + clusterService, + null, + createIndexAction, + actionFilters, + resolver, + null, + expected::get) { + @Override + public void executeBulk(BulkRequest bulkRequest, ActionListener listener) { + expected.set(1000000); + super.executeBulk(bulkRequest, listener); + } + + @Override + void executeBulk( + BulkRequest bulkRequest, + long startTimeNanos, + ActionListener listener, + AtomicArray responses) { + expected.set(1000000); + super.executeBulk(bulkRequest, startTimeNanos, listener, responses); + } + }; + } else { + return new TestTransportBulkAction( + Settings.EMPTY, + threadPool, + transportService, + clusterService, + null, + createIndexAction, + actionFilters, + resolver, + null, + System::nanoTime) { + @Override + public void executeBulk(BulkRequest bulkRequest, ActionListener listener) { + long elapsed = spinForAtLeastOneMillisecond(); + expected.set(elapsed); + super.executeBulk(bulkRequest, listener); + } + + @Override + void executeBulk( + BulkRequest bulkRequest, + long startTimeNanos, + ActionListener listener, + AtomicArray responses) { + long elapsed = spinForAtLeastOneMillisecond(); + expected.set(elapsed); + super.executeBulk(bulkRequest, startTimeNanos, listener, responses); + } + }; + } + } + + // test unit conversion with a controlled clock + public void testTookWithControlledClock() throws Exception { + runTestTook(true); + } + + // test took advances with System#nanoTime + public void testTookWithRealClock() throws Exception { + runTestTook(false); + } + + private void runTestTook(boolean controlled) throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk.json"); + // translate Windows line endings (\r\n) to standard ones (\n) + if (Constants.WINDOWS) { + bulkAction = Strings.replace(bulkAction, "\r\n", "\n"); + } + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null); + AtomicLong expected = new AtomicLong(); + TransportBulkAction action = createAction(controlled, expected); + action.doExecute(bulkRequest, new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (controlled) { + assertThat( + bulkItemResponses.getTook().getMillis(), + equalTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + } else { + assertThat( + bulkItemResponses.getTook().getMillis(), + greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + } + } + + @Override + public void onFailure(Throwable e) { + + } + }); + } + + static class Resolver extends IndexNameExpressionResolver { + public Resolver(Settings settings) { + super(settings); + } + + @Override + public String[] concreteIndices(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } + + static class TestTransportBulkAction extends TransportBulkAction { + + public TestTransportBulkAction( + Settings settings, + ThreadPool threadPool, + TransportService transportService, + ClusterService clusterService, + TransportShardBulkAction shardBulkAction, + TransportCreateIndexAction createIndexAction, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + AutoCreateIndex autoCreateIndex, + LongSupplier relativeTimeProvider) { + super( + settings, + threadPool, + transportService, + clusterService, + shardBulkAction, + createIndexAction, + actionFilters, + indexNameExpressionResolver, + autoCreateIndex, + relativeTimeProvider); + } + + @Override + boolean needToCheck() { + return randomBoolean(); + } + + @Override + boolean shouldAutoCreate(String index, ClusterState state) { + return randomBoolean(); + } + + } + + static class TestTransportCreateIndexAction extends TransportCreateIndexAction { + + public TestTransportCreateIndexAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + MetaDataCreateIndexService createIndexService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); + } + + @Override + protected void doExecute(Task task, CreateIndexRequest request, ActionListener listener) { + listener.onResponse(newResponse()); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnableTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnableTests.java index b0bbf0f47f2..c17429dccfb 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnableTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnableTests.java @@ -46,20 +46,13 @@ public class PrioritizedRunnableTests extends ESTestCase { // test age advances with System#nanoTime public void testGetAgeInMillisWithRealClock() throws InterruptedException { - long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); PrioritizedRunnable runnable = new PrioritizedRunnable(Priority.NORMAL) { @Override public void run() { } }; - // force at least one millisecond to elapse, but ensure the - // clock has enough resolution to observe the passage of time - long start = System.nanoTime(); - long elapsed; - while ((elapsed = (System.nanoTime() - start)) < nanosecondsInMillisecond) { - // busy spin - } + long elapsed = spinForAtLeastOneMillisecond(); // creation happened before start, so age will be at least as // large as elapsed diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index e80bb93aeb7..4a20d3c3fd6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -651,4 +651,16 @@ public abstract class ESTestCase extends LuceneTestCase { } throw new AssertionFailedError("Expected exception " + expectedType.getSimpleName()); } + + protected static long spinForAtLeastOneMillisecond() { + long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); + // force at least one millisecond to elapse, but ensure the + // clock has enough resolution to observe the passage of time + long start = System.nanoTime(); + long elapsed; + while ((elapsed = (System.nanoTime() - start)) < nanosecondsInMillisecond) { + // busy spin + } + return elapsed; + } }