Plugins: Remove response action filters (#21950)

Action filters currently have the ability to filter both the request and
response. But the response side was not actually used. This change
removes support for filtering responses with action filters.
This commit is contained in:
Ryan Ernst 2016-12-05 16:14:04 -08:00 committed by GitHub
parent 2087234d74
commit c8f241f284
9 changed files with 1 additions and 325 deletions

View File

@ -85,11 +85,6 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
}
@Override
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener, ActionFilterChain<?, Response> chain) {
chain.proceed(action, response, listener);
}
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) {
executionService.executeIndexRequest(indexRequest, t -> {

View File

@ -86,11 +86,6 @@ public final class IngestProxyActionFilter implements ActionFilter {
transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener, action::newResponse));
}
@Override
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener, ActionFilterChain<?, Response> chain) {
chain.proceed(action, response, listener);
}
@Override
public int order() {
return Integer.MAX_VALUE;

View File

@ -42,14 +42,6 @@ public interface ActionFilter {
*/
<Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request,
ActionListener<Response> listener, ActionFilterChain<Request, Response> chain);
/**
* Enables filtering the execution of an action on the response side, either by sending a response through the
* {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain}
*/
<Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain);
/**
* A simple base class for injectable action filters that spares the implementation from handling the
* filter chain. This base class should serve any action filter implementations that doesn't require
@ -74,19 +66,5 @@ public interface ActionFilter {
* if it should be aborted since the filter already handled the request and called the given listener.
*/
protected abstract boolean apply(String action, ActionRequest request, ActionListener<?> listener);
@Override
public final <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
if (apply(action, response, listener)) {
chain.proceed(action, response, listener);
}
}
/**
* Applies this filter and returns {@code true} if the execution chain should proceed, or {@code false}
* if it should be aborted since the filter already handled the response by calling the given listener.
*/
protected abstract boolean apply(String action, ActionResponse response, ActionListener<?> listener);
}
}

View File

@ -34,10 +34,4 @@ public interface ActionFilterChain<Request extends ActionRequest, Response exten
* the given {@link ActionListener listener}
*/
void proceed(Task task, final String action, final Request request, final ActionListener<Response> listener);
/**
* Continue processing the response. Should only be called if a response has not been sent through
* the given {@link ActionListener listener}
*/
void proceed(final String action, final Response response, final ActionListener<Response> listener);
}

View File

@ -167,8 +167,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
if (i < this.action.filters.length) {
this.action.filters[i].apply(task, actionName, request, listener, this);
} else if (i == this.action.filters.length) {
this.action.doExecute(task, request, new FilteredActionListener<>(actionName, listener,
new ResponseFilterChain<>(this.action.filters, logger)));
this.action.doExecute(task, request, listener);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
@ -178,69 +177,6 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
}
}
@Override
public void proceed(String action, Response response, ActionListener<Response> listener) {
assert false : "request filter chain should never be called on the response side";
}
}
private static class ResponseFilterChain<Request extends ActionRequest, Response extends ActionResponse>
implements ActionFilterChain<Request, Response> {
private final ActionFilter[] filters;
private final AtomicInteger index;
private final Logger logger;
private ResponseFilterChain(ActionFilter[] filters, Logger logger) {
this.filters = filters;
this.index = new AtomicInteger(filters.length);
this.logger = logger;
}
@Override
public void proceed(Task task, String action, Request request, ActionListener<Response> listener) {
assert false : "response filter chain should never be called on the request side";
}
@Override
public void proceed(String action, Response response, ActionListener<Response> listener) {
int i = index.decrementAndGet();
try {
if (i >= 0) {
filters[i].apply(action, response, listener, this);
} else if (i == -1) {
listener.onResponse(response);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch (Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
}
private static class FilteredActionListener<Response extends ActionResponse> implements ActionListener<Response> {
private final String actionName;
private final ActionListener<Response> listener;
private final ResponseFilterChain<?, Response> chain;
private FilteredActionListener(String actionName, ActionListener<Response> listener, ResponseFilterChain<?, Response> chain) {
this.actionName = actionName;
this.listener = listener;
this.chain = chain;
}
@Override
public void onResponse(Response response) {
chain.proceed(actionName, response, listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
/**

View File

@ -137,86 +137,6 @@ public class TransportActionFilterChainTests extends ESTestCase {
}
}
public void testActionFiltersResponse() throws ExecutionException, InterruptedException {
int numFilters = randomInt(10);
Set<Integer> orders = new HashSet<>(numFilters);
while (orders.size() < numFilters) {
orders.add(randomInt(10));
}
Set<ActionFilter> filters = new HashSet<>();
for (Integer order : orders) {
filters.add(new ResponseTestFilter(order, randomFrom(ResponseOperation.values())));
}
String actionName = randomAsciiOfLength(randomInt(30));
ActionFilters actionFilters = new ActionFilters(filters);
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
@Override
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
listener.onResponse(new TestResponse());
}
};
ArrayList<ActionFilter> actionFiltersByOrder = new ArrayList<>(filters);
Collections.sort(actionFiltersByOrder, new Comparator<ActionFilter>() {
@Override
public int compare(ActionFilter o1, ActionFilter o2) {
return Integer.compare(o2.order(), o1.order());
}
});
List<ActionFilter> expectedActionFilters = new ArrayList<>();
boolean errorExpected = false;
for (ActionFilter filter : actionFiltersByOrder) {
ResponseTestFilter testFilter = (ResponseTestFilter) filter;
expectedActionFilters.add(testFilter);
if (testFilter.callback == ResponseOperation.LISTENER_FAILURE) {
errorExpected = true;
}
if (testFilter.callback != ResponseOperation.CONTINUE_PROCESSING) {
break;
}
}
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(null);
transportAction.execute(new TestRequest(), future);
try {
assertThat(future.get(), notNullValue());
assertThat("shouldn't get here if an error is expected", errorExpected, equalTo(false));
} catch(ExecutionException e) {
assertThat("shouldn't get here if an error is not expected " + e.getMessage(), errorExpected, equalTo(true));
}
List<ResponseTestFilter> testFiltersByLastExecution = new ArrayList<>();
for (ActionFilter actionFilter : actionFilters.filters()) {
testFiltersByLastExecution.add((ResponseTestFilter) actionFilter);
}
Collections.sort(testFiltersByLastExecution, new Comparator<ResponseTestFilter>() {
@Override
public int compare(ResponseTestFilter o1, ResponseTestFilter o2) {
return Integer.compare(o1.executionToken, o2.executionToken);
}
});
ArrayList<ResponseTestFilter> finalTestFilters = new ArrayList<>();
for (ActionFilter filter : testFiltersByLastExecution) {
ResponseTestFilter testFilter = (ResponseTestFilter) filter;
finalTestFilters.add(testFilter);
if (testFilter.callback != ResponseOperation.CONTINUE_PROCESSING) {
break;
}
}
assertThat(finalTestFilters.size(), equalTo(expectedActionFilters.size()));
for (int i = 0; i < finalTestFilters.size(); i++) {
ResponseTestFilter testFilter = finalTestFilters.get(i);
assertThat(testFilter, equalTo(expectedActionFilters.get(i)));
assertThat(testFilter.runs.get(), equalTo(1));
assertThat(testFilter.lastActionName, equalTo(actionName));
}
}
public void testTooManyContinueProcessingRequest() throws ExecutionException, InterruptedException {
final int additionalContinueCount = randomInt(10);
@ -274,63 +194,6 @@ public class TransportActionFilterChainTests extends ESTestCase {
}
}
public void testTooManyContinueProcessingResponse() throws ExecutionException, InterruptedException {
final int additionalContinueCount = randomInt(10);
ResponseTestFilter testFilter = new ResponseTestFilter(randomInt(), new ResponseCallback() {
@Override
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
for (int i = 0; i <= additionalContinueCount; i++) {
chain.proceed(action, response, listener);
}
}
});
Set<ActionFilter> filters = new HashSet<>();
filters.add(testFilter);
String actionName = randomAsciiOfLength(randomInt(30));
ActionFilters actionFilters = new ActionFilters(filters);
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
@Override
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
listener.onResponse(new TestResponse());
}
};
final CountDownLatch latch = new CountDownLatch(additionalContinueCount + 1);
final AtomicInteger responses = new AtomicInteger();
final List<Throwable> failures = new CopyOnWriteArrayList<>();
transportAction.execute(new TestRequest(), new ActionListener<TestResponse>() {
@Override
public void onResponse(TestResponse testResponse) {
responses.incrementAndGet();
latch.countDown();
}
@Override
public void onFailure(Exception e) {
failures.add(e);
latch.countDown();
}
});
if (!latch.await(10, TimeUnit.SECONDS)) {
fail("timeout waiting for the filter to notify the listener as many times as expected");
}
assertThat(testFilter.runs.get(), equalTo(1));
assertThat(testFilter.lastActionName, equalTo(actionName));
assertThat(responses.get(), equalTo(1));
assertThat(failures.size(), equalTo(additionalContinueCount));
for (Throwable failure : failures) {
assertThat(failure, instanceOf(IllegalStateException.class));
}
}
private class RequestTestFilter implements ActionFilter {
private final RequestCallback callback;
private final int order;
@ -356,45 +219,6 @@ public class TransportActionFilterChainTests extends ESTestCase {
this.executionToken = counter.incrementAndGet();
this.callback.execute(task, action, request, listener, chain);
}
@Override
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
chain.proceed(action, response, listener);
}
}
private class ResponseTestFilter implements ActionFilter {
private final ResponseCallback callback;
private final int order;
AtomicInteger runs = new AtomicInteger();
volatile String lastActionName;
volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list
ResponseTestFilter(int order, ResponseCallback callback) {
this.order = order;
this.callback = callback;
}
@Override
public int order() {
return order;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request,
ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
chain.proceed(task, action, request, listener);
}
@Override
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
this.runs.incrementAndGet();
this.lastActionName = action;
this.executionToken = counter.incrementAndGet();
this.callback.execute(action, response, listener, chain);
}
}
private static enum RequestOperation implements RequestCallback {
@ -422,41 +246,11 @@ public class TransportActionFilterChainTests extends ESTestCase {
}
}
private static enum ResponseOperation implements ResponseCallback {
CONTINUE_PROCESSING {
@Override
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
chain.proceed(action, response, listener);
}
},
LISTENER_RESPONSE {
@Override
@SuppressWarnings("unchecked") // Safe because its all we test with
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
((ActionListener<TestResponse>) listener).onResponse(new TestResponse());
}
},
LISTENER_FAILURE {
@Override
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
listener.onFailure(new ElasticsearchTimeoutException(""));
}
}
}
private interface RequestCallback {
<Request extends ActionRequest, Response extends ActionResponse> void execute(Task task, String action, Request request,
ActionListener<Response> listener, ActionFilterChain<Request, Response> actionFilterChain);
}
private interface ResponseCallback {
<Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain);
}
public static class TestRequest extends ActionRequest {
@Override
public ActionRequestValidationException validate() {

View File

@ -100,11 +100,6 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
return true;
}
@Override
protected boolean apply(String action, ActionResponse response, ActionListener<?> listener) {
return true;
}
@Override
public int order() {
return 0;

View File

@ -187,11 +187,5 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
}
chain.proceed(task, action, request, listener);
}
@Override
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
chain.proceed(action, response, listener);
}
}
}

View File

@ -321,11 +321,6 @@ public class ContextAndHeaderTransportIT extends HttpSmokeTestCase {
requests.add(new RequestAndHeaders(threadPool.getThreadContext().getHeaders(), request));
return true;
}
@Override
protected boolean apply(String action, ActionResponse response, ActionListener listener) {
return true;
}
}
private static class RequestAndHeaders {