diff --git a/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index 0e38259ad4d..ab5c423349a 100644 --- a/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateListener; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; @@ -91,11 +91,11 @@ public abstract class TransportMasterNodeOperationAction listener) { - innerExecute(request, listener, false); + innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false); } - private void innerExecute(final Request request, final ActionListener listener, final boolean retrying) { - final ClusterState clusterState = clusterService.state(); + private void innerExecute(final Request request, final ActionListener listener, final ClusterStateObserver observer, final boolean retrying) { + final ClusterState clusterState = observer.observedState(); final DiscoveryNodes nodes = clusterState.nodes(); if (nodes.localNodeMaster() || localExecute(request)) { // check for block, if blocked, retry, else, execute locally @@ -105,37 +105,32 @@ public abstract class TransportMasterNodeOperationAction listener; private final Request request; - private volatile ClusterState clusterState; private volatile ShardIterator shardIt; private final AtomicBoolean primaryOperationStarted = new AtomicBoolean(); private final ReplicationType replicationType; - protected final long startTime = System.currentTimeMillis(); + private volatile ClusterStateObserver observer; AsyncShardOperationAction(Request request, ActionListener listener) { this.request = request; @@ -335,40 +333,40 @@ public abstract class TransportShardReplicationOperationActiontrue if the action starting to be performed on the primary (or is done). */ - public boolean start(final boolean fromClusterEvent) throws ElasticsearchException { - this.clusterState = clusterService.state(); + protected boolean doStart() throws ElasticsearchException { try { - ClusterBlockException blockException = checkGlobalBlock(clusterState, request); + ClusterBlockException blockException = checkGlobalBlock(observer.observedState(), request); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); - retry(fromClusterEvent, blockException); + retry(blockException); return false; } else { throw blockException; } } // check if we need to execute, and if not, return - if (!resolveRequest(clusterState, request, listener)) { + if (!resolveRequest(observer.observedState(), request, listener)) { return true; } - blockException = checkRequestBlock(clusterState, request); + blockException = checkRequestBlock(observer.observedState(), request); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); - retry(fromClusterEvent, blockException); + retry(blockException); return false; } else { throw blockException; } } - shardIt = shards(clusterState, request); + shardIt = shards(observer.observedState(), request); } catch (Throwable e) { listener.onFailure(e); return true; @@ -378,7 +376,7 @@ public abstract class TransportShardReplicationOperationAction() { @Override @@ -471,7 +469,7 @@ public abstract class TransportShardReplicationOperationAction response, final AtomicInteger counter, final ShardRouting shard, String nodeId, final IndexMetaData indexMetaData) { // if we don't have that node, it means that it might have failed and will be created again, in // this case, we don't have to do the operation, and just let it failover - if (!clusterState.nodes().nodeExists(nodeId)) { + if (!observer.observedState().nodes().nodeExists(nodeId)) { if (counter.decrementAndGet() == 0) { listener.onResponse(response.response()); } @@ -707,8 +676,8 @@ public abstract class TransportShardReplicationOperationAction listener) { this.request = request; this.listener = listener; } public void start() { - start(false); + observer = new ClusterStateObserver(clusterService, request.timeout(), logger); + doStart(); } - public boolean start(final boolean fromClusterEvent) throws ElasticsearchException { - final ClusterState clusterState = clusterService.state(); - nodes = clusterState.nodes(); + protected boolean doStart() throws ElasticsearchException { + nodes = observer.observedState().nodes(); try { - ClusterBlockException blockException = checkGlobalBlock(clusterState, request); + ClusterBlockException blockException = checkGlobalBlock(observer.observedState(), request); if (blockException != null) { if (blockException.retryable()) { - retry(fromClusterEvent, blockException); + retry(blockException); return false; } else { throw blockException; } } // check if we need to execute, and if not, return - if (!resolveRequest(clusterState, request, listener)) { + if (!resolveRequest(observer.observedState(), request, listener)) { return true; } - blockException = checkRequestBlock(clusterState, request); + blockException = checkRequestBlock(observer.observedState(), request); if (blockException != null) { if (blockException.retryable()) { - retry(fromClusterEvent, blockException); + retry(blockException); return false; } else { throw blockException; } } - shardIt = shards(clusterState, request); + shardIt = shards(observer.observedState(), request); } catch (Throwable e) { listener.onFailure(e); return true; @@ -162,7 +163,7 @@ public abstract class TransportInstanceSingleOperationAction { @@ -117,6 +134,8 @@ public class ClusterState implements ToXContent { private SettingsFilter settingsFilter; + private volatile ClusterStateStatus status; + public ClusterState(long version, ClusterState state) { this(state.clusterName, version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs()); } @@ -129,6 +148,16 @@ public class ClusterState implements ToXContent { this.nodes = nodes; this.blocks = blocks; this.customs = customs; + this.status = ClusterStateStatus.UNKNOWN; + } + + public ClusterStateStatus status() { + return status; + } + + public ClusterState status(ClusterStateStatus newStatus) { + this.status = newStatus; + return this; } public long version() { diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java new file mode 100644 index 00000000000..dda9301b096 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java @@ -0,0 +1,327 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * A utility class which simplifies interacting with the cluster state in cases where + * one tries to take action based on the current state but may want to wait for a new state + * and retry upon failure. + */ +public class ClusterStateObserver { + + protected final ESLogger logger; + + public final ChangePredicate MATCH_ALL_CHANGES_PREDICATE = new EventPredicate() { + + @Override + public boolean apply(ClusterChangedEvent changedEvent) { + return changedEvent.previousState().version() != changedEvent.state().version(); + } + }; + + private ClusterService clusterService; + volatile TimeValue timeOutValue; + + + final AtomicReference lastObservedState; + // observingContext is not null when waiting on cluster state changes + final AtomicReference observingContext = new AtomicReference(null); + volatile long startTime; + volatile boolean timedOut; + + volatile TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener(); + + + public ClusterStateObserver(ClusterService clusterService, ESLogger logger) { + this(clusterService, new TimeValue(60000), logger); + } + + /** + * @param clusterService + * @param timeout a global timeout for this observer. After it has expired the observer + * will fail any existing or new #waitForNextChange calls. + */ + public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) { + this.timeOutValue = timeout; + this.clusterService = clusterService; + this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state())); + this.startTime = System.currentTimeMillis(); + this.logger = logger; + } + + /** last cluster state observer by this observer. Note that this may not be the current one */ + public ClusterState observedState() { + ObservedState state = lastObservedState.get(); + assert state != null; + return state.clusterState; + } + + /** indicates whether this observer has timedout */ + public boolean isTimedOut() { + return timedOut; + } + + public void waitForNextChange(Listener listener) { + waitForNextChange(listener, MATCH_ALL_CHANGES_PREDICATE); + } + + public void waitForNextChange(Listener listener, @Nullable TimeValue timeOutValue) { + waitForNextChange(listener, MATCH_ALL_CHANGES_PREDICATE, timeOutValue); + } + + public void waitForNextChange(Listener listener, ChangePredicate changePredicate) { + waitForNextChange(listener, changePredicate, null); + } + + /** + * Wait for the next cluster state which satisfies changePredicate + * + * @param listener callback listener + * @param changePredicate predicate to check whether cluster state changes are relevant and the callback should be called + * @param timeOutValue a timeout for waiting. If null the global observer timeout will be used. + */ + public void waitForNextChange(Listener listener, ChangePredicate changePredicate, @Nullable TimeValue timeOutValue) { + + if (observingContext.get() != null) { + throw new ElasticsearchException("already waiting for a cluster state change"); + } + long timeoutTimeLeft; + if (timeOutValue == null) { + timeOutValue = this.timeOutValue; + long timeSinceStart = System.currentTimeMillis() - startTime; + timeoutTimeLeft = timeOutValue.millis() - timeSinceStart; + if (timeoutTimeLeft <= 0l) { + // things have timeout while we were busy -> notify + logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart)); + // update to latest, in case people want to retry + timedOut = true; + lastObservedState.set(new ObservedState(clusterService.state())); + listener.onTimeout(timeOutValue); + return; + } + } else { + this.startTime = System.currentTimeMillis(); + this.timeOutValue = timeOutValue; + timeoutTimeLeft = timeOutValue.millis(); + timedOut = false; + } + + // sample a new state + ObservedState newState = new ObservedState(clusterService.state()); + ObservedState lastState = lastObservedState.get(); + if (changePredicate.apply(lastState.clusterState, lastState.status, newState.clusterState, newState.status)) { + // good enough, let's go. + logger.trace("observer: sampled state accepted by predicate ({})", newState); + lastObservedState.set(newState); + listener.onNewClusterState(newState.clusterState); + } else { + logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState); + ObservingContext context = new ObservingContext(listener, changePredicate); + if (!observingContext.compareAndSet(null, context)) { + throw new ElasticsearchException("already waiting for a cluster state change"); + } + clusterService.add(new TimeValue(timeoutTimeLeft), clusterStateListener); + } + } + + public void close() { + if (observingContext.getAndSet(null) != null) { + clusterService.remove(clusterStateListener); + logger.trace("cluster state observer closed"); + } + } + + /** + * reset this observer to the give cluster state. Any pending waits will be canceled. + * + * @param toState + */ + public void reset(ClusterState toState) { + if (observingContext.getAndSet(null) != null) { + clusterService.remove(clusterStateListener); + } + lastObservedState.set(new ObservedState(toState)); + } + + class ObserverClusterStateListener implements TimeoutClusterStateListener { + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ObservingContext context = observingContext.get(); + if (context == null) { + // No need to remove listener as it is the responsibility of the thread that set observingContext to null + return; + } + if (context.changePredicate.apply(event)) { + if (observingContext.compareAndSet(context, null)) { + clusterService.remove(this); + ObservedState state = new ObservedState(event.state()); + logger.trace("observer: accepting cluster state change ({})", state); + lastObservedState.set(state); + context.listener.onNewClusterState(state.clusterState); + } else { + logger.trace("observer: predicate approved change but observing context has changed - ignoring (new cluster state version [{}])", event.state().version()); + } + } else { + logger.trace("observer: predicate rejected change (new cluster state version [{}])", event.state().version()); + } + } + + @Override + public void postAdded() { + ObservingContext context = observingContext.get(); + if (context == null) { + // No need to remove listener as it is the responsibility of the thread that set observingContext to null + return; + } + ObservedState newState = new ObservedState(clusterService.state()); + ObservedState lastState = lastObservedState.get(); + if (context.changePredicate.apply(lastState.clusterState, lastState.status, newState.clusterState, newState.status)) { + // double check we're still listening + if (observingContext.compareAndSet(context, null)) { + logger.trace("observer: post adding listener: accepting current cluster state ({})", newState); + clusterService.remove(this); + lastObservedState.set(newState); + context.listener.onNewClusterState(newState.clusterState); + } else { + logger.trace("observer: postAdded - predicate approved state but observing context has changed - ignoring ({})", newState); + } + } else { + logger.trace("observer: postAdded - predicate rejected state ({})", newState); + } + } + + @Override + public void onClose() { + ObservingContext context = observingContext.getAndSet(null); + + if (context != null) { + logger.trace("observer: cluster service closed. notifying listener."); + clusterService.remove(this); + context.listener.onClusterServiceClose(); + } + } + + @Override + public void onTimeout(TimeValue timeout) { + ObservingContext context = observingContext.getAndSet(null); + if (context != null) { + clusterService.remove(this); + long timeSinceStart = System.currentTimeMillis() - startTime; + logger.debug("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart)); + // update to latest, in case people want to retry + lastObservedState.set(new ObservedState(clusterService.state())); + timedOut = true; + context.listener.onTimeout(timeOutValue); + } + } + } + + public interface Listener { + + /** called when a new state is observed */ + void onNewClusterState(ClusterState state); + + /** called when the cluster service is closed */ + void onClusterServiceClose(); + + void onTimeout(TimeValue timeout); + } + + public interface ChangePredicate { + + /** + * a rough check used when starting to monitor for a new change. Called infrequently can be less accurate. + * + * @return true if newState should be accepted + */ + public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, + ClusterState newState, ClusterState.ClusterStateStatus newStatus); + + /** + * called to see whether a cluster change should be accepted + * + * @return true if changedEvent.state() should be accepted + */ + public boolean apply(ClusterChangedEvent changedEvent); + } + + + public static abstract class ValidationPredicate implements ChangePredicate { + + @Override + public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, ClusterState.ClusterStateStatus newStatus) { + if (previousState != newState || previousStatus != newStatus) { + return validate(newState); + } + return false; + } + + protected abstract boolean validate(ClusterState newState); + + @Override + public boolean apply(ClusterChangedEvent changedEvent) { + if (changedEvent.previousState().version() != changedEvent.state().version()) { + return validate(changedEvent.state()); + } + return false; + } + } + + public static abstract class EventPredicate implements ChangePredicate { + @Override + public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, ClusterState.ClusterStateStatus newStatus) { + return previousState != newState || previousStatus != newStatus; + } + + } + + static class ObservingContext { + final public Listener listener; + final public ChangePredicate changePredicate; + + public ObservingContext(Listener listener, ChangePredicate changePredicate) { + this.listener = listener; + this.changePredicate = changePredicate; + } + } + + static class ObservedState { + final public ClusterState clusterState; + final public ClusterState.ClusterStateStatus status; + + public ObservedState(ClusterState clusterState) { + this.clusterState = clusterState; + this.status = clusterState.status(); + } + + @Override + public String toString() { + return "version [" + clusterState.version() + "], status [" + status + "]"; + } + } +} diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index ed204dcb4bc..40ad8b13f41 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -376,6 +376,8 @@ public class InternalClusterService extends AbstractLifecycleComponent implem continue; } final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); + nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() { diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 0f566608adb..5ed8133c1d4 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -170,7 +170,7 @@ public class PublishClusterStateAction extends AbstractComponent { } in.setVersion(request.version()); ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); - + clusterState.status(ClusterState.ClusterStateStatus.RECEIVED); logger.debug("received cluster state version {}", clusterState.version()); listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() { @Override diff --git a/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java index 2f8f9ba0e17..96e10afb906 100644 --- a/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.util.HashMap; @@ -43,6 +44,7 @@ import static org.hamcrest.Matchers.greaterThan; public class NoMasterNodeTests extends ElasticsearchIntegrationTest { @Test + @TestLogging("action:TRACE,cluster.service:TRACE") public void testNoMasterActions() throws Exception { Settings settings = settingsBuilder() .put("discovery.type", "zen") diff --git a/src/test/java/org/elasticsearch/search/facet/SimpleFacetsTests.java b/src/test/java/org/elasticsearch/search/facet/SimpleFacetsTests.java index 02f8e7b167b..7b1eee41596 100644 --- a/src/test/java/org/elasticsearch/search/facet/SimpleFacetsTests.java +++ b/src/test/java/org/elasticsearch/search/facet/SimpleFacetsTests.java @@ -64,6 +64,7 @@ import static org.hamcrest.Matchers.*; /** * */ + public class SimpleFacetsTests extends ElasticsearchIntegrationTest { private int numRuns = -1; @@ -299,14 +300,14 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { @Slow public void testConcurrentFacets() throws ElasticsearchException, IOException, InterruptedException, ExecutionException { assertAcked(prepareCreate("test") - .addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties") - .startObject("byte").field("type", "byte").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() - .startObject("short").field("type", "short").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() - .startObject("integer").field("type", "integer").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() - .startObject("long").field("type", "long").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() - .startObject("float").field("type", "float").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() - .startObject("double").field("type", "double").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() - .endObject().endObject().endObject())); + .addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties") + .startObject("byte").field("type", "byte").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() + .startObject("short").field("type", "short").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() + .startObject("integer").field("type", "integer").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() + .startObject("long").field("type", "long").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() + .startObject("float").field("type", "float").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() + .startObject("double").field("type", "double").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject() + .endObject().endObject().endObject())); ensureGreen(); for (int i = 0; i < 100; i++) { @@ -327,7 +328,9 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { .endObject()).execute().actionGet(); } + logger.info("done indexing. issue a refresh."); flushAndRefresh(); + final AtomicInteger searchId = new AtomicInteger(0); ConcurrentDuel duel = new ConcurrentDuel<>(5); { final Client cl = client(); @@ -362,7 +365,11 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { @Override public Facets run() { final SearchRequestBuilder facetRequest; - if (count.incrementAndGet() % 2 == 0) { // every second request is mapped + int searchId = count.incrementAndGet(); + if (searchId % 100 == 0) { + logger.info("-> run {} searches", searchId); + } + if (searchId % 2 == 0) { // every second request is mapped facetRequest = cl.prepareSearch().setQuery(matchAllQuery()) .addFacet(termsFacet("double").field("double").size(10)) .addFacet(termsFacet("float").field("float").size(10)) @@ -389,6 +396,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { }, 5000 ); } + logger.info("starting second duel"); { duel.duel(new ConcurrentDuel.DuelJudge() { @@ -420,7 +428,12 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { @Override public Facets run() { final SearchRequestBuilder facetRequest; - switch (count.incrementAndGet() % 6) { + int searchId = count.incrementAndGet(); + if (searchId % 100 == 0) { + logger.info("-> run {} searches", searchId); + } + + switch (searchId % 6) { case 4: facetRequest = client().prepareSearch() .setQuery(matchAllQuery()) @@ -496,7 +509,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { .startObject("filtered") .field("type", "string") .startObject("fielddata").field("format", "fst").field("loading", randomBoolean() ? "eager" : "lazy").startObject("filter") - .startObject("regex").field("pattern", "\\d{1,2}").endObject().endObject() + .startObject("regex").field("pattern", "\\d{1,2}").endObject().endObject() .endObject() // only 1 or 2 digits .endObject() @@ -525,6 +538,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { .endObject()).execute().actionGet(); } + logger.info("done indexing. refreshing."); flushAndRefresh(); ConcurrentDuel duel = new ConcurrentDuel<>(5); String[] fieldPostFix = new String[]{"", "_mv"}; @@ -567,6 +581,9 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { final SearchRequestBuilder facetRequest; int incrementAndGet = count.incrementAndGet(); final String field; + if (incrementAndGet % 100 == 0) { + logger.info("-> run {} searches", incrementAndGet); + } switch (incrementAndGet % 2) { case 1: field = "filtered" + postfix; @@ -612,7 +629,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest { SearchResponse actionGet = facetRequest.execute().actionGet(); return actionGet.getFacets(); } - }, 5000 + }, 2000 ); }