Added simple count down class that allows to be fast forwarded

Closes #3910
This commit is contained in:
Simon Willnauer 2013-10-07 13:44:37 +02:00 committed by Luca Cavanna
parent fcf13e0fa7
commit 89de3ab627
9 changed files with 236 additions and 71 deletions

View File

@ -31,11 +31,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Delete index action.
*/
@ -102,9 +101,10 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
protected void masterOperation(final DeleteIndexRequest request, final ClusterState state, final ActionListener<DeleteIndexResponse> listener) throws ElasticSearchException {
if (request.indices().length == 0) {
listener.onResponse(new DeleteIndexResponse(true));
return;
}
// TODO: this API should be improved, currently, if one delete index failed, we send a failure, we should send a response array that includes all the indices that were deleted
final AtomicInteger count = new AtomicInteger(request.indices().length);
final CountDown count = new CountDown(request.indices().length);
for (final String index : request.indices()) {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
@ -116,7 +116,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
if (!response.acknowledged()) {
ack = false;
}
if (count.decrementAndGet() == 0) {
if (count.countDown()) {
if (lastFailure != null) {
listener.onFailure(lastFailure);
} else {
@ -129,7 +129,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
public void onFailure(Throwable t) {
logger.debug("[{}] failed to delete index", t, index);
lastFailure = t;
if (count.decrementAndGet() == 0) {
if (count.countDown()) {
listener.onFailure(t);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
@ -36,7 +37,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId;
@ -64,7 +64,7 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
private class Async {
final DiscoveryNodes nodes;
final AtomicInteger expectedOps;
final CountDown expectedOps;
final ClearScrollRequest request;
final List<Tuple<String, Long>[]> contexts = new ArrayList<Tuple<String, Long>[]>();
final AtomicReference<Throwable> expHolder;
@ -86,11 +86,11 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
this.request = request;
this.listener = listener;
this.expHolder = new AtomicReference<Throwable>();
this.expectedOps = new AtomicInteger(expectedOps);
this.expectedOps = new CountDown(expectedOps);
}
public void run() {
if (expectedOps.get() == 0) {
if (expectedOps.isCountedDown()) {
listener.onResponse(new ClearScrollResponse(true));
return;
}
@ -135,8 +135,7 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
}
void onFreedContext() {
assert expectedOps.get() > 0;
if (expectedOps.decrementAndGet() == 0) {
if (expectedOps.countDown()) {
boolean succeeded = expHolder.get() == null;
listener.onResponse(new ClearScrollResponse(succeeded));
}
@ -144,8 +143,7 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
void onFailedFreedContext(Throwable e, DiscoveryNode node) {
logger.warn("Clear SC failed on node[{}]", e, node);
assert expectedOps.get() > 0;
if (expectedOps.decrementAndGet() == 0) {
if (expectedOps.countDown()) {
listener.onResponse(new ClearScrollResponse(false));
} else {
expHolder.set(e);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
@ -44,8 +45,6 @@ import org.elasticsearch.indices.InvalidAliasNameException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
@ -243,13 +242,12 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
private class CountDownListener implements NodeAliasesUpdatedAction.Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final CountDown countDown;
private final Listener listener;
private final long version;
public CountDownListener(int countDown, Listener listener, long version) {
this.countDown = new AtomicInteger(countDown);
this.countDown = new CountDown(countDown);
this.listener = listener;
this.version = version;
}
@ -258,20 +256,18 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
public void onAliasesUpdated(NodeAliasesUpdatedAction.NodeAliasesUpdatedResponse response) {
if (version <= response.version()) {
logger.trace("Received NodeAliasesUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId());
if (countDown.decrementAndGet() == 0) {
if (countDown.countDown()) {
aliasOperationPerformedAction.remove(this);
if (notified.compareAndSet(false, true)) {
logger.trace("NodeAliasUpdated was acknowledged by all expected nodes, returning");
listener.onResponse(new Response(true));
}
logger.trace("NodeAliasUpdated was acknowledged by all expected nodes, returning");
listener.onResponse(new Response(true));
}
}
}
@Override
public void onTimeout() {
aliasOperationPerformedAction.remove(this);
if (notified.compareAndSet(false, true)) {
if (countDown.fastForward()) {
aliasOperationPerformedAction.remove(this);
listener.onResponse(new Response(false));
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
@ -46,8 +47,6 @@ import org.elasticsearch.rest.RestStatus;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -273,14 +272,12 @@ public class MetaDataIndexStateService extends AbstractComponent {
}
private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final CountDown countDown;
private final Listener listener;
private final long version;
public CountDownListener(int countDown, Listener listener, long version) {
this.countDown = new AtomicInteger(countDown);
public CountDownListener(int count, Listener listener, long version) {
this.countDown = new CountDown(count);
this.listener = listener;
this.version = version;
}
@ -289,20 +286,18 @@ public class MetaDataIndexStateService extends AbstractComponent {
public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) {
if (version <= response.version()) {
logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId());
if (countDown.decrementAndGet() == 0) {
if (countDown.countDown()) {
indicesStateUpdatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
logger.trace("NodeIndexStateUpdated was acknowledged by all expected nodes, returning");
listener.onResponse(new Response(true));
}
logger.trace("NodeIndexStateUpdated was acknowledged by all expected nodes, returning");
listener.onResponse(new Response(true));
}
}
}
@Override
public void onTimeout() {
indicesStateUpdatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
if (countDown.fastForward()) {
indicesStateUpdatedAction.remove(this);
listener.onResponse(new Response(false));
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
@ -49,8 +50,6 @@ import org.elasticsearch.percolator.PercolatorService;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
@ -593,8 +592,7 @@ public class MetaDataMappingService extends AbstractComponent {
private class CountDownListener implements NodeMappingCreatedAction.Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final CountDown countDown;
private final Listener listener;
private final long minClusterStateVersion;
@ -604,7 +602,7 @@ public class MetaDataMappingService extends AbstractComponent {
* @param listener listener to call when counter reaches 0.
*/
public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) {
this.countDown = new AtomicInteger(countDown);
this.countDown = new CountDown(countDown);
this.listener = listener;
this.minClusterStateVersion = minClusterStateVersion;
}
@ -619,18 +617,16 @@ public class MetaDataMappingService extends AbstractComponent {
}
public void decrementCounter() {
if (countDown.decrementAndGet() == 0) {
if (countDown.countDown()) {
mappingCreatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
listener.onResponse(new Response(true));
}
listener.onResponse(new Response(true));
}
}
@Override
public void onTimeout() {
mappingCreatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
if (countDown.fastForward()) {
mappingCreatedAction.remove(this);
listener.onResponse(new Response(false));
}
}

View File

@ -50,7 +50,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
@ -623,8 +622,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private class AckCountDownListener implements Discovery.AckListener {
private final AckedClusterStateUpdateTask ackedUpdateTask;
private final long version;
private final AtomicInteger countDown;
private final AtomicBoolean notified = new AtomicBoolean(false);
private final CountDown countDown;
private final Future<?> ackTimeoutCallback;
private Throwable lastFailure;
@ -638,7 +636,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, version);
this.countDown = new AtomicInteger(countDown);
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
@ -659,19 +657,16 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, version);
}
assert countDown.get() > 0;
if (countDown.decrementAndGet() == 0) {
if (notified.compareAndSet(false, true) ) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", version);
ackTimeoutCallback.cancel(true);
ackedUpdateTask.onAllNodesAcked(lastFailure);
}
if (countDown.countDown()) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", version);
ackTimeoutCallback.cancel(true);
ackedUpdateTask.onAllNodesAcked(lastFailure);
}
}
@Override
public void onTimeout() {
if (notified.compareAndSet(false, true)) {
if (countDown.fastForward()) {
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", version);
ackedUpdateTask.onAckTimeout();
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A simple thread safe count-down class that in contrast to a {@link CountDownLatch}
* never blocks. This class is useful if a certain action has to wait for N concurrent
* tasks to return or a timeout to occur in order to proceed.
*/
public final class CountDown {
private final AtomicInteger countDown;
private final int originalCount;
public CountDown(int count) {
if (count < 0) {
throw new ElasticSearchIllegalArgumentException("count must be greater or equal to 0 but was: " + count);
}
this.originalCount = count;
this.countDown = new AtomicInteger(count);
}
/**
* Decrements the count-down and returns <code>true</code> iff this call
* reached zero otherwise <code>false</code>
*/
public boolean countDown() {
assert originalCount > 0;
for (;;) {
final int current = countDown.get();
assert current >= 0;
if (current == 0) {
return false;
}
if (countDown.compareAndSet(current, current - 1)) {
return current == 1;
}
}
}
/**
* Fast forwards the count-down to zero and returns <code>true</code> iff
* the count down reached zero with this fast forward call otherwise
* <code>false</code>
*/
public boolean fastForward() {
assert originalCount > 0;
assert countDown.get() >= 0;
return countDown.getAndSet(0) > 0;
}
/**
* Returns <code>true</code> iff the count-down has reached zero. Otherwise <code>false</code>
*/
public boolean isCountedDown() {
assert countDown.get() >= 0;
return countDown.get() == 0;
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class CountDownTest extends ElasticsearchTestCase {
@Test @Repeat(iterations = 1000)
public void testConcurrent() throws InterruptedException {
final AtomicInteger count = new AtomicInteger(0);
final CountDown countDown = new CountDown(atLeast(10));
Thread[] threads = new Thread[atLeast(3)];
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException();
}
while (true) {
if(frequently()) {
if (countDown.isCountedDown()) {
break;
}
}
if (countDown.countDown()) {
count.incrementAndGet();
break;
}
}
}
};
threads[i].start();
}
latch.countDown();
Thread.yield();
if (rarely()) {
if (countDown.fastForward()) {
count.incrementAndGet();
}
assertThat(countDown.isCountedDown(), equalTo(true));
assertThat(countDown.fastForward(), equalTo(false));
}
for (Thread thread : threads) {
thread.join();
}
assertThat(countDown.isCountedDown(), equalTo(true));
assertThat(count.get(), Matchers.equalTo(1));
}
@Test
public void testSingleThreaded() {
int atLeast = atLeast(10);
final CountDown countDown = new CountDown(atLeast);
while(!countDown.isCountedDown()) {
atLeast--;
if (countDown.countDown()) {
assertThat(atLeast, equalTo(0));
assertThat(countDown.isCountedDown(), equalTo(true));
assertThat(countDown.fastForward(), equalTo(false));
break;
}
if (rarely()) {
assertThat(countDown.fastForward(), equalTo(true));
assertThat(countDown.isCountedDown(), equalTo(true));
assertThat(countDown.fastForward(), equalTo(false));
}
assertThat(atLeast, greaterThan(0));
}
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.search.scan;
import com.google.common.collect.Sets;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
@ -31,6 +30,7 @@ import org.elasticsearch.test.AbstractIntegrationTest;
import java.util.Set;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
public class SearchScanScrollingTests extends AbstractIntegrationTest {
@ -40,9 +40,8 @@ public class SearchScanScrollingTests extends AbstractIntegrationTest {
}
private void testScroll(int numberOfShards, long numberOfDocs, int size, boolean unbalanced) throws Exception {
wipeIndex("test");
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards)).get();
ensureGreen();
Set<String> ids = Sets.newHashSet();
Set<String> expectedIds = Sets.newHashSet();
@ -66,7 +65,7 @@ public class SearchScanScrollingTests extends AbstractIntegrationTest {
}
}
client().admin().indices().prepareRefresh().execute().actionGet();
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setSearchType(SearchType.SCAN)
@ -75,15 +74,15 @@ public class SearchScanScrollingTests extends AbstractIntegrationTest {
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
try {
assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs));
assertHitCount(searchResponse, numberOfDocs);
// start scrolling, until we get not results
while (true) {
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs));
assertThat(searchResponse.getFailedShards(), equalTo(0));
assertHitCount(searchResponse, numberOfDocs);
for (SearchHit hit : searchResponse.getHits()) {
assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false));
assertThat(hit.id() + "should not exist in the result set", ids.contains(hit.id()), equalTo(false));
ids.add(hit.id());
}
if (searchResponse.getHits().hits().length == 0) {