From 55f99dfce6e763e0ceb0259b45933de26a8b7436 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 19 May 2015 12:01:31 +0200 Subject: [PATCH] Make SyncedFlushService fully asynchronous Some requests in the SyncedFlushService were sill blocking on network calls which made calling this service error prone if done on a network thread. This commit makes this service fully async based on ActionListener. --- .../indices/SyncedFlushService.java | 113 +++++++++--------- .../gateway/RecoveryFromGatewayTests.java | 3 +- .../org/elasticsearch/indices/FlushTest.java | 1 - .../indices/SycnedFlushSingleNodeTest.java | 27 ++--- .../{test => indices}/SyncedFlushUtil.java | 36 ++++-- 5 files changed, 97 insertions(+), 83 deletions(-) rename src/test/java/org/elasticsearch/{test => indices}/SyncedFlushUtil.java (60%) diff --git a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java index 6678b1c89dc..fc0a1b50df1 100644 --- a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java +++ b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.ShardId; @@ -62,16 +63,9 @@ public class SyncedFlushService extends AbstractComponent { public static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync"; public static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight"; - public static final String SETTING_PRE_SYNC_TIMEOUT = "indices.flush.synced.presync_timeout"; - public static final String SETTING_SYNC_TIMEOUT = "indices.flush.synced.sync_timeout"; - public static final String SETTING_IN_FLIGHT_OPS_TIMEOUT = "indices.flush.synced.in_flight_ops_timeout"; - private final IndicesService indicesService; private final ClusterService clusterService; private final TransportService transportService; - private final TimeValue preSyncTimeout; - private final TimeValue syncTimeout; - private final TimeValue inflightOpsTimeout; @Inject public SyncedFlushService(Settings settings, IndicesService indicesService, ClusterService clusterService, TransportService transportService) { @@ -83,9 +77,6 @@ public class SyncedFlushService extends AbstractComponent { transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreSyncedFlushRequest.class, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler()); transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, SyncedFlushRequest.class, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler()); transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest.class, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler()); - preSyncTimeout = settings.getAsTime(SETTING_PRE_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5)); - syncTimeout = settings.getAsTime(SETTING_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5)); - inflightOpsTimeout = settings.getAsTime(SETTING_IN_FLIGHT_OPS_TIMEOUT, TimeValue.timeValueMinutes(5)); indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { @Override public void onShardInactive(final IndexShard indexShard) { @@ -132,27 +123,49 @@ public class SyncedFlushService extends AbstractComponent { * the replica if it contains the same changes that the primary contains. * * Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies. - * - * **/ - - public void attemptSyncedFlush(ShardId shardId, ActionListener actionListener) { + **/ + public void attemptSyncedFlush(final ShardId shardId, final ActionListener actionListener) { try { final ClusterState state = clusterService.state(); final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); - Map commitIds = sendPreSyncRequests(activeShards, state, shardId); + final ActionListener> commitIdsListener = new ActionListener>() { + @Override + public void onResponse(final Map commitIds) { + if (commitIds.isEmpty()) { + actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync")); + } + final ActionListener inflightOpsListener = new ActionListener() { + @Override + public void onResponse(InFlightOpsResponse response) { + final int inflight = response.opCount(); + assert inflight >= -1; + if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0). + actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]")); + } else { + // 3. now send the sync request to all the shards + String syncId = Strings.base64UUID(); + sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener); + } + } - if (commitIds.isEmpty()) { - actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync")); - } + @Override + public void onFailure(Throwable e) { + actionListener.onFailure(e); + } + }; + // 2. fetch in flight operations + getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsListener); + } - int inflight = getInflightOpsCount(shardId, state, shardRoutingTable); - assert inflight >= -1; - if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0). - actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]")); - } - String syncId = Strings.base64UUID(); - sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener); + @Override + public void onFailure(Throwable e) { + actionListener.onFailure(e); + } + }; + + // 1. send pre-sync flushes to all replicas + sendPreSyncRequests(activeShards, state, shardId, commitIdsListener); } catch (Throwable t) { actionListener.onFailure(t); } @@ -177,15 +190,14 @@ public class SyncedFlushService extends AbstractComponent { /** * returns the number of inflight operations on primary. -1 upon error. */ - protected int getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable) { + protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId()); if (primaryNode == null) { logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard); - return -1; + listener.onResponse(new InFlightOpsResponse(-1)); + return; } - final AtomicInteger result = new AtomicInteger(-1); - final CountDownLatch latch = new CountDownLatch(1); logger.trace("{} retrieving in flight operation count", shardId); transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId), new BaseTransportResponseHandler() { @@ -196,15 +208,13 @@ public class SyncedFlushService extends AbstractComponent { @Override public void handleResponse(InFlightOpsResponse response) { - result.set(response.opCount()); - latch.countDown(); + listener.onResponse(response); } @Override public void handleException(TransportException exp) { logger.debug("{} unexpected error while retrieving inflight op count", shardId); - result.set(-1); - latch.countDown(); + listener.onFailure(exp); } @Override @@ -212,17 +222,6 @@ public class SyncedFlushService extends AbstractComponent { return ThreadPool.Names.SAME; } }); - try { - if (latch.await(inflightOpsTimeout.millis(), TimeUnit.MILLISECONDS) == false) { - logger.debug("{} in flight operation check timed out after [{}]", shardId, syncTimeout); - } - } catch (InterruptedException e) { - logger.debug("{} interrupted while waiting for in flight operation check", shardId); - Thread.currentThread().interrupt(); - } - final int count = result.get(); - logger.trace("{} in flight operation count [{}]", shardId, count); - return count; } @@ -286,15 +285,17 @@ public class SyncedFlushService extends AbstractComponent { /** * send presync requests to all started copies of the given shard */ - Map sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId) { - final CountDownLatch countDownLatch = new CountDownLatch(shards.size()); + void sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId, final ActionListener> listener) { + final CountDown countDown = new CountDown(shards.size()); final ConcurrentMap commitIds = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { logger.trace("{} sending pre-synced flush request to {}", shardId, shard); final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); if (node == null) { logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard); - countDownLatch.countDown(); + if(countDown.countDown()) { + listener.onResponse(commitIds); + } continue; } transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler() { @@ -308,13 +309,17 @@ public class SyncedFlushService extends AbstractComponent { Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId()); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException - countDownLatch.countDown(); + if(countDown.countDown()) { + listener.onResponse(commitIds); + } } @Override public void handleException(TransportException exp) { logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard); - countDownLatch.countDown(); + if(countDown.countDown()) { + listener.onResponse(commitIds); + } } @Override @@ -323,16 +328,6 @@ public class SyncedFlushService extends AbstractComponent { } }); } - try { - if (countDownLatch.await(preSyncTimeout.millis(), TimeUnit.MILLISECONDS) == false) { - logger.debug("{} waiting for pre sync flush requests timed out after [{}]. pending ops [{}]", shardId, preSyncTimeout, countDownLatch.getCount()); - } - } catch (InterruptedException e) { - logger.debug("{} interrupted while waiting for presync requests", shardId); - Thread.currentThread().interrupt(); - } - - return commitIds; } private PreSyncedFlushResponse performPreSyncedFlush(PreSyncedFlushRequest request) { diff --git a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java index 6ee3448b99f..93ead8ac862 100644 --- a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.settings.ImmutableSettings; @@ -40,7 +39,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; -import org.elasticsearch.test.SyncedFlushUtil; +import org.elasticsearch.indices.SyncedFlushUtil; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.junit.Test; diff --git a/src/test/java/org/elasticsearch/indices/FlushTest.java b/src/test/java/org/elasticsearch/indices/FlushTest.java index d66100218a7..65fb2f6816b 100644 --- a/src/test/java/org/elasticsearch/indices/FlushTest.java +++ b/src/test/java/org/elasticsearch/indices/FlushTest.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.SyncedFlushUtil; import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; diff --git a/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java b/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java index beb55f6ff4e..348c5753c83 100644 --- a/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java +++ b/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java @@ -28,7 +28,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchSingleNodeTest; -import org.elasticsearch.test.SyncedFlushUtil; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import java.util.List; @@ -50,11 +49,11 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); client().prepareIndex("test", "test", "2").setSource("{}").get(); String syncId = Strings.base64UUID(); - SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener); listener.latch.await(); assertNull(listener.error); @@ -67,8 +66,8 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success()); assertEquals("pending operations", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason()); - flushService.sendPreSyncRequests(activeShards, state, shardId); // pull another commit and make sure we can't seal with the old one - listener = new SyncedFlushUtil.SyncResultListener(); + SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't seal with the old one + listener = new SyncedFlushUtil.LatchedListener(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener); listener.latch.await(); assertNull(listener.error); @@ -91,7 +90,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); - SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); assertNull(listener.error); @@ -114,7 +113,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final ShardId shardId = shard.shardId(); shard.incrementOperationCounter(); try { - SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); assertNull(listener.error); @@ -135,7 +134,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { IndexShard shard = test.shard(0); SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); - SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.attemptSyncedFlush(new ShardId("test", 1), listener); listener.latch.await(); assertNotNull(listener.error); @@ -145,14 +144,14 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final ShardId shardId = shard.shardId(); client().admin().indices().prepareClose("test").get(); - listener = new SyncedFlushUtil.SyncResultListener(); + listener = new SyncedFlushUtil.LatchedListener(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); assertNotNull(listener.error); assertNull(listener.result); assertEquals("closed", listener.error.getMessage()); - listener = new SyncedFlushUtil.SyncResultListener(); + listener = new SyncedFlushUtil.LatchedListener(); flushService.attemptSyncedFlush(new ShardId("nosuchindex", 0), listener); listener.latch.await(); assertNotNull(listener.error); @@ -172,14 +171,14 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); if (randomBoolean()) { client().prepareIndex("test", "test", "2").setSource("{}").get(); } client().admin().indices().prepareFlush("test").setForce(true).get(); String syncId = Strings.base64UUID(); - final SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener(); + final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener); listener.latch.await(); assertNull(listener.error); @@ -206,11 +205,11 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); commitIds.clear(); // wipe it... String syncId = Strings.base64UUID(); - SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener); listener.latch.await(); assertNull(listener.error); diff --git a/src/test/java/org/elasticsearch/test/SyncedFlushUtil.java b/src/test/java/org/elasticsearch/indices/SyncedFlushUtil.java similarity index 60% rename from src/test/java/org/elasticsearch/test/SyncedFlushUtil.java rename to src/test/java/org/elasticsearch/indices/SyncedFlushUtil.java index ddf29297851..e16c85b4b7e 100644 --- a/src/test/java/org/elasticsearch/test/SyncedFlushUtil.java +++ b/src/test/java/org/elasticsearch/indices/SyncedFlushUtil.java @@ -16,15 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.test; +package org.elasticsearch.indices; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.SyncedFlushService; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; /** Utils for SyncedFlush */ public class SyncedFlushUtil { @@ -37,7 +42,7 @@ public class SyncedFlushUtil { * Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)} */ public static SyncedFlushService.SyncedFlushResult attemptSyncedFlush(SyncedFlushService service, ShardId shardId) { - SyncResultListener listener = new SyncResultListener(); + LatchedListener listener = new LatchedListener(); service.attemptSyncedFlush(shardId, listener); try { listener.latch.await(); @@ -50,14 +55,14 @@ public class SyncedFlushUtil { return listener.result; } - public static final class SyncResultListener implements ActionListener { - public volatile SyncedFlushService.SyncedFlushResult result; + public static final class LatchedListener implements ActionListener { + public volatile T result; public volatile Throwable error; public final CountDownLatch latch = new CountDownLatch(1); @Override - public void onResponse(SyncedFlushService.SyncedFlushResult syncedFlushResult) { - result = syncedFlushResult; + public void onResponse(T result) { + this.result = result; latch.countDown(); } @@ -68,4 +73,21 @@ public class SyncedFlushUtil { } } + /** + * Blocking version of {@link SyncedFlushService#sendPreSyncRequests(List, ClusterState, ShardId, ActionListener)} + */ + public static Map sendPreSyncRequests(SyncedFlushService service, List activeShards, ClusterState state, ShardId shardId) { + LatchedListener> listener = new LatchedListener<>(); + service.sendPreSyncRequests(activeShards, state, shardId, listener); + try { + listener.latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (listener.error != null) { + throw ExceptionsHelper.convertToElastic(listener.error); + } + return listener.result; + } + }