Simplify SyncedFlushService flow with StepListener (#37383)

Today the SyncedFlushService flow is written with multiple nested 
callbacks which are hard to read. This commit replaces them with 
sequential step listeners.
This commit is contained in:
Nhat Nguyen 2019-01-14 03:54:34 -05:00 committed by GitHub
parent 374e24c7fd
commit d44a6f9fbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 50 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@ -84,6 +85,6 @@ public final class StepListener<Response> implements ActionListener<Response> {
if (delegate.isDone() == false) { if (delegate.isDone() == false) {
throw new IllegalStateException("step is not completed yet"); throw new IllegalStateException("step is not completed yet");
} }
return FutureUtils.get(delegate); return FutureUtils.get(delegate, 0L, TimeUnit.NANOSECONDS); // this future is done already - use a non-blocking method.
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
@ -219,56 +220,40 @@ public class SyncedFlushService implements IndexEventListener {
return; return;
} }
final ActionListener<Map<String, PreSyncedFlushResponse>> presyncListener =
new ActionListener<Map<String, PreSyncedFlushResponse>>() {
@Override
public void onResponse(final Map<String, PreSyncedFlushResponse> presyncResponses) {
if (presyncResponses.isEmpty()) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards,
"all shards failed to commit on pre-sync"));
return;
}
final ActionListener<InFlightOpsResponse> inflightOpsListener = new ActionListener<InFlightOpsResponse>() {
@Override
public void onResponse(InFlightOpsResponse response) {
final int inflight = response.opCount();
assert inflight >= 0;
if (inflight != 0) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight +
"] ongoing operations on primary"));
} else {
// 3. now send the sync request to all the shards;
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
if (sharedSyncId != null) {
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" +
presyncResponses + "]";
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards,
presyncResponses, actionListener);
}else {
String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
}
}
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
};
// 2. fetch in flight operations
getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsListener);
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
};
// 1. send pre-sync flushes to all replicas // 1. send pre-sync flushes to all replicas
sendPreSyncRequests(activeShards, state, shardId, presyncListener); final StepListener<Map<String, PreSyncedFlushResponse>> presyncStep = new StepListener<>();
sendPreSyncRequests(activeShards, state, shardId, presyncStep);
// 2. fetch in flight operations
final StepListener<InFlightOpsResponse> inflightOpsStep = new StepListener<>();
presyncStep.whenComplete(presyncResponses -> {
if (presyncResponses.isEmpty()) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
} else {
getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsStep);
}
}, actionListener::onFailure);
// 3. now send the sync request to all the shards
inflightOpsStep.whenComplete(inFlightOpsResponse -> {
final Map<String, PreSyncedFlushResponse> presyncResponses = presyncStep.result();
final int inflight = inFlightOpsResponse.opCount();
assert inflight >= 0;
if (inflight != 0) {
actionListener.onResponse(
new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
} else {
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
if (sharedSyncId != null) {
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
}else {
String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
}
}
}, actionListener::onFailure);
} catch (Exception e) { } catch (Exception e) {
actionListener.onFailure(e); actionListener.onFailure(e);
} }