From d44a6f9fbcc4f71116a5b6eb0573a786d67a58ae Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 14 Jan 2019 03:54:34 -0500 Subject: [PATCH] Simplify SyncedFlushService flow with StepListener (#37383) Today the SyncedFlushService flow is written with multiple nested callbacks which are hard to read. This commit replaces them with sequential step listeners. --- .../elasticsearch/action/StepListener.java | 3 +- .../indices/flush/SyncedFlushService.java | 83 ++++++++----------- 2 files changed, 36 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index efbf8c755d5..160ba23da24 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -84,6 +85,6 @@ public final class StepListener implements ActionListener { if (delegate.isDone() == false) { throw new IllegalStateException("step is not completed yet"); } - return FutureUtils.get(delegate); + return FutureUtils.get(delegate, 0L, TimeUnit.NANOSECONDS); // this future is done already - use a non-blocking method. } } diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index e1cd85faaef..9bc4e4ead12 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -219,56 +220,40 @@ public class SyncedFlushService implements IndexEventListener { return; } - final ActionListener> presyncListener = - new ActionListener>() { - @Override - public void onResponse(final Map presyncResponses) { - if (presyncResponses.isEmpty()) { - actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, - "all shards failed to commit on pre-sync")); - return; - } - final ActionListener inflightOpsListener = new ActionListener() { - @Override - public void onResponse(InFlightOpsResponse response) { - final int inflight = response.opCount(); - assert inflight >= 0; - if (inflight != 0) { - actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + - "] ongoing operations on primary")); - } else { - // 3. now send the sync request to all the shards; - final String sharedSyncId = sharedExistingSyncId(presyncResponses); - if (sharedSyncId != null) { - assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) : - "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + - presyncResponses + "]"; - reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, - presyncResponses, actionListener); - }else { - String syncId = UUIDs.randomBase64UUID(); - sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener); - } - } - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }; - // 2. fetch in flight operations - getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsListener); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }; - // 1. send pre-sync flushes to all replicas - sendPreSyncRequests(activeShards, state, shardId, presyncListener); + final StepListener> presyncStep = new StepListener<>(); + sendPreSyncRequests(activeShards, state, shardId, presyncStep); + + // 2. fetch in flight operations + final StepListener inflightOpsStep = new StepListener<>(); + presyncStep.whenComplete(presyncResponses -> { + if (presyncResponses.isEmpty()) { + actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync")); + } else { + getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsStep); + } + }, actionListener::onFailure); + + // 3. now send the sync request to all the shards + inflightOpsStep.whenComplete(inFlightOpsResponse -> { + final Map presyncResponses = presyncStep.result(); + final int inflight = inFlightOpsResponse.opCount(); + assert inflight >= 0; + if (inflight != 0) { + actionListener.onResponse( + new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary")); + } else { + final String sharedSyncId = sharedExistingSyncId(presyncResponses); + if (sharedSyncId != null) { + assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) : + "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]"; + reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener); + }else { + String syncId = UUIDs.randomBase64UUID(); + sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener); + } + } + }, actionListener::onFailure); } catch (Exception e) { actionListener.onFailure(e); }