Introduce StepListener (#37327)

This commit introduces StepListener which provides a simple way to write
a flow consisting of multiple asynchronous steps without having nested
callbacks.

Relates #37291
This commit is contained in:
Nhat Nguyen 2019-01-11 13:06:17 -05:00 committed by GitHub
parent 955d3aea19
commit 70cee18e56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 209 additions and 1 deletions

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import java.util.function.Consumer;
/**
* A {@link StepListener} provides a simple way to write a flow consisting of
* multiple asynchronous steps without having nested callbacks. For example:
*
* <pre>{@code
* void asyncFlowMethod(... ActionListener<R> flowListener) {
* StepListener<R1> step1 = new StepListener<>();
* asyncStep1(..., step1);
* StepListener<R2> step2 = new StepListener<>();
* step1.whenComplete(r1 -> {
* asyncStep2(r1, ..., step2);
* }, flowListener::onFailure);
*
* step2.whenComplete(r2 -> {
* R1 r1 = step1.result();
* R r = combine(r1, r2);
* flowListener.onResponse(r);
* }, flowListener::onFailure);
* }
* }</pre>
*/
public final class StepListener<Response> implements ActionListener<Response> {
private final ListenableFuture<Response> delegate;
public StepListener() {
this.delegate = new ListenableFuture<>();
}
@Override
public void onResponse(Response response) {
delegate.onResponse(response);
}
@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}
/**
* Registers the given actions which are called when this step is completed. If this step is completed successfully,
* the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure.
*
* @param onResponse is called when this step is completed successfully
* @param onFailure is called when this step is completed with a failure
*/
public void whenComplete(CheckedConsumer<Response, Exception> onResponse, Consumer<Exception> onFailure) {
delegate.addListener(ActionListener.wrap(onResponse, onFailure), EsExecutors.newDirectExecutorService(), null);
}
/**
* Gets the result of this step. This method will throw {@link IllegalStateException} if this step is not completed yet.
*/
public Response result() {
if (delegate.isDone() == false) {
throw new IllegalStateException("step is not completed yet");
}
return FutureUtils.get(delegate);
}
}

View File

@ -60,7 +60,13 @@ public final class ListenableFuture<V> extends BaseFuture<V> implements ActionLi
if (done) { if (done) {
run = true; run = true;
} else { } else {
listeners.add(new Tuple<>(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), executor)); final ActionListener<V> wrappedListener;
if (threadContext == null) {
wrappedListener = listener;
} else {
wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
}
listeners.add(new Tuple<>(wrappedListener, executor));
run = false; run = false;
} }
} }

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
public class StepListenerTests extends ESTestCase {
private ThreadPool threadPool;
@Before
public void setUpThreadPool() {
threadPool = new TestThreadPool(getTestName());
}
@After
public void tearDownThreadPool() {
terminate(threadPool);
}
public void testSimpleSteps() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Consumer<Exception> onFailure = e -> {
latch.countDown();
fail("test a happy path");
};
StepListener<String> step1 = new StepListener<>(); //[a]sync provide a string
executeAction(() -> step1.onResponse("hello"));
StepListener<Integer> step2 = new StepListener<>(); //[a]sync calculate the length of the string
step1.whenComplete(str -> executeAction(() -> step2.onResponse(str.length())), onFailure);
step2.whenComplete(length -> executeAction(latch::countDown), onFailure);
latch.await();
assertThat(step1.result(), equalTo("hello"));
assertThat(step2.result(), equalTo(5));
}
public void testAbortOnFailure() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
int failedStep = randomBoolean() ? 1 : 2;
AtomicInteger failureNotified = new AtomicInteger();
Consumer<Exception> onFailure = e -> {
failureNotified.getAndIncrement();
latch.countDown();
assertThat(e.getMessage(), equalTo("failed at step " + failedStep));
};
StepListener<String> step1 = new StepListener<>(); //[a]sync provide a string
if (failedStep == 1) {
executeAction(() -> step1.onFailure(new RuntimeException("failed at step 1")));
} else {
executeAction(() -> step1.onResponse("hello"));
}
StepListener<Integer> step2 = new StepListener<>(); //[a]sync calculate the length of the string
step1.whenComplete(str -> {
if (failedStep == 2) {
executeAction(() -> step2.onFailure(new RuntimeException("failed at step 2")));
} else {
executeAction(() -> step2.onResponse(str.length()));
}
}, onFailure);
step2.whenComplete(length -> latch.countDown(), onFailure);
latch.await();
assertThat(failureNotified.get(), equalTo(1));
if (failedStep == 1) {
assertThat(expectThrows(RuntimeException.class, step1::result).getMessage(),
equalTo("failed at step 1"));
assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(),
equalTo("step is not completed yet"));
} else {
assertThat(step1.result(), equalTo("hello"));
assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(),
equalTo("failed at step 2"));
}
}
private void executeAction(Runnable runnable) {
if (randomBoolean()) {
threadPool.generic().execute(runnable);
} else {
runnable.run();
}
}
}