Make synced flush attemp async to run it easily from a TransportAction

Today we enforce blocking which doesnt' really fit in the elasticsearch model
this commit adds async execution to the synced flush service by passing a
ActinListener to the service returing immediately.
This commit is contained in:
Simon Willnauer 2015-05-12 10:03:12 +02:00
parent 571feec451
commit a7abe0a324
5 changed files with 149 additions and 46 deletions

View File

@ -18,9 +18,14 @@
*/
package org.elasticsearch.indices;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.DelegatingActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -34,7 +39,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.Engine;
@ -52,6 +59,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class SyncedFlushService extends AbstractComponent {
@ -86,41 +94,53 @@ public class SyncedFlushService extends AbstractComponent {
inflightOpsTimeout = settings.getAsTime(SETTING_IN_FLIGHT_OPS_TIMEOUT, TimeValue.timeValueMinutes(5));
indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override
public void onShardInactive(IndexShard indexShard) {
public void onShardInactive(final IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
attemptSyncedFlush(indexShard.shardId());
attemptSyncedFlush(indexShard.shardId(), new ActionListener<SyncedFlushResult>() {
@Override
public void onResponse(SyncedFlushResult syncedFlushResult) {
logger.debug("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
}
@Override
public void onFailure(Throwable e) {
logger.debug("{} sync flush on inactive shard failed", e, indexShard.shardId());
}
});
}
}
});
}
public SyncedFlushResult attemptSyncedFlush(ShardId shardId) {
final ClusterState state = clusterService.state();
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {
throw new IndexMissingException(shardId.index());
}
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id());
if (shardRoutingTable == null) {
throw new IndexShardMissingException(shardId);
}
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
Map<String, byte[]> commitIds = sendPreSyncRequests(activeShards, state, shardId);
public void attemptSyncedFlush(ShardId shardId, ActionListener<SyncedFlushResult> actionListener) {
try {
final ClusterState state = clusterService.state();
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {
throw new IndexMissingException(shardId.index());
}
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id());
if (shardRoutingTable == null) {
throw new IndexShardMissingException(shardId);
}
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
Map<String, byte[]> commitIds = sendPreSyncRequests(activeShards, state, shardId);
if (commitIds.isEmpty()) {
return new SyncedFlushResult("all shards failed to commit on pre-sync");
if (commitIds.isEmpty()) {
actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync"));
}
int inflight = getInflightOpsCount(shardId, state, shardRoutingTable);
if (inflight != 1) {
actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]"));
}
String syncId = Strings.base64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener);
} catch (Throwable t) {
actionListener.onFailure(t);
}
int inflight = getInflightOpsCount(shardId, state, shardRoutingTable);
if (inflight != 1) {
return new SyncedFlushResult("operation counter on primary is non zero [" + inflight + "]");
}
String syncId = Strings.base64UUID();
Map<ShardRouting, SyncedFlushResponse> results = sendSyncRequests(syncId, activeShards, state, commitIds, shardId);
return new SyncedFlushResult(syncId, results);
}
/**
@ -167,29 +187,32 @@ public class SyncedFlushService extends AbstractComponent {
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for in flight operation check", shardId);
}
final int count = result.get();
logger.trace("{} in flight operation count [{}]", shardId, count);
return count;
}
private Map<ShardRouting, SyncedFlushResponse> sendSyncRequests(final String syncId, List<ShardRouting> shards, ClusterState state, Map<String, byte[]> expectedCommitIds, final ShardId shardId) {
final CountDownLatch countDownLatch = new CountDownLatch(shards.size());
private void sendSyncRequests(final String syncId, List<ShardRouting> shards, ClusterState state, Map<String, byte[]> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
final CountDown countDownLatch = new CountDown(shards.size());
final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("unknown node"));
countDownLatch.countDown();
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
continue;
}
final byte[] expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
if (expectedCommitId == null) {
logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
countDownLatch.countDown();
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
continue;
}
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
@ -205,14 +228,18 @@ public class SyncedFlushService extends AbstractComponent {
SyncedFlushResponse existing = results.put(shard, response);
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
countDownLatch.countDown();
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
}
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
results.put(shard, new SyncedFlushResponse(exp.getMessage()));
countDownLatch.countDown();
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
}
@Override
@ -221,15 +248,7 @@ public class SyncedFlushService extends AbstractComponent {
}
});
}
try {
if (countDownLatch.await(syncTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("{} waiting for synced flush with id [{}] timed out after [{}]. pending ops [{}]", shardId, syncId, syncTimeout, countDownLatch.getCount());
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for sync requests (sync id [{}])", shardId, syncId);
}
return results;
}
/**
@ -326,22 +345,30 @@ public class SyncedFlushService extends AbstractComponent {
private final Map<ShardRouting, SyncedFlushResponse> shardResponses;
private final String syncId;
public ShardId getShardId() {
return shardId;
}
private final ShardId shardId;
/**
* failure constructor
*/
SyncedFlushResult(String failureReason) {
SyncedFlushResult(ShardId shardId, String failureReason) {
this.syncId = null;
this.failureReason = failureReason;
this.shardResponses = new HashMap<>();
this.shardId = shardId;
}
/**
* success constructor
*/
SyncedFlushResult(String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
SyncedFlushResult(ShardId shardId, String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
this.failureReason = null;
this.shardResponses = shardResponses;
this.syncId = syncId;
this.shardId = shardId;
}
public boolean success() {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.SyncedFlushUtil;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.Test;
@ -401,7 +402,7 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
int numShards = Integer.parseInt(client().admin().indices().prepareGetSettings("test").get().getSetting("test", "index.number_of_shards"));
SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class);
for (int i = 0; i < numShards; i++) {
assertTrue(syncedFlushService.attemptSyncedFlush(new ShardId("test", i)).success());
assertTrue(SyncedFlushUtil.attemptSyncedFlush(syncedFlushService, new ShardId("test", i)).success());
}
assertSyncIdsNotNull();
}

View File

@ -233,13 +233,20 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
}
@Test
public void testMarkAsInactiveTriggersSyncedFlush() {
public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService("test").shard(0).markAsInactive();
assertBusy(new Runnable() { // should be very very quick
@Override
public void run() {
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
});
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.SyncedFlushUtil;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
@ -88,7 +89,7 @@ public class FlushTest extends ElasticsearchIntegrationTest {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
SyncedFlushService.SyncedFlushResult result = internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0));
SyncedFlushService.SyncedFlushResult result = SyncedFlushUtil.attemptSyncedFlush(internalCluster().getInstance(SyncedFlushService.class), new ShardId("test", 0));
assertTrue(result.success());
assertThat(result.totalShards(), equalTo(indexStats.getShards().length));
assertThat(result.successfulShards(), equalTo(indexStats.getShards().length));

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/** Utils for SyncedFlush */
public class SyncedFlushUtil {
private SyncedFlushUtil() {
}
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
public static SyncedFlushService.SyncedFlushResult attemptSyncedFlush(SyncedFlushService service, ShardId shardId) {
final CountDownLatch countDown = new CountDownLatch(1);
final AtomicReference<SyncedFlushService.SyncedFlushResult> result = new AtomicReference<>();
final AtomicReference<Throwable> exception = new AtomicReference<>();
service.attemptSyncedFlush(shardId, new ActionListener<SyncedFlushService.SyncedFlushResult>() {
@Override
public void onResponse(SyncedFlushService.SyncedFlushResult syncedFlushResult) {
result.compareAndSet(null, syncedFlushResult);
countDown.countDown();
}
@Override
public void onFailure(Throwable e) {
exception.compareAndSet(null, e);
countDown.countDown();
}
});
try {
countDown.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (exception.get() != null) {
throw ExceptionsHelper.convertToElastic(exception.get());
}
return result.get();
}
}