From c8f241f284e8cb931545cd163d8587bf71485d6f Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Mon, 5 Dec 2016 16:14:04 -0800 Subject: [PATCH] Plugins: Remove response action filters (#21950) Action filters currently have the ability to filter both the request and response. But the response side was not actually used. This change removes support for filtering responses with action filters. --- .../action/ingest/IngestActionFilter.java | 5 - .../ingest/IngestProxyActionFilter.java | 5 - .../action/support/ActionFilter.java | 22 -- .../action/support/ActionFilterChain.java | 6 - .../action/support/TransportAction.java | 66 +----- .../TransportActionFilterChainTests.java | 206 ------------------ .../cluster/ClusterInfoServiceIT.java | 5 - .../ReindexFromRemoteWithAuthTests.java | 6 - .../http/ContextAndHeaderTransportIT.java | 5 - 9 files changed, 1 insertion(+), 325 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index 5f7f32e1760..6e68c10e215 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -85,11 +85,6 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } } - @Override - public void apply(String action, Response response, ActionListener listener, ActionFilterChain chain) { - chain.proceed(action, response, listener); - } - void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) { executionService.executeIndexRequest(indexRequest, t -> { diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java index a3928c17fc7..2ba764d0c3e 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java @@ -86,11 +86,6 @@ public final class IngestProxyActionFilter implements ActionFilter { transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener, action::newResponse)); } - @Override - public void apply(String action, Response response, ActionListener listener, ActionFilterChain chain) { - chain.proceed(action, response, listener); - } - @Override public int order() { return Integer.MAX_VALUE; diff --git a/core/src/main/java/org/elasticsearch/action/support/ActionFilter.java b/core/src/main/java/org/elasticsearch/action/support/ActionFilter.java index 4a2c88f75dc..880d173b2fe 100644 --- a/core/src/main/java/org/elasticsearch/action/support/ActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/support/ActionFilter.java @@ -42,14 +42,6 @@ public interface ActionFilter { */ void apply(Task task, String action, Request request, ActionListener listener, ActionFilterChain chain); - - /** - * Enables filtering the execution of an action on the response side, either by sending a response through the - * {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain} - */ - void apply(String action, Response response, ActionListener listener, - ActionFilterChain chain); - /** * A simple base class for injectable action filters that spares the implementation from handling the * filter chain. This base class should serve any action filter implementations that doesn't require @@ -74,19 +66,5 @@ public interface ActionFilter { * if it should be aborted since the filter already handled the request and called the given listener. */ protected abstract boolean apply(String action, ActionRequest request, ActionListener listener); - - @Override - public final void apply(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - if (apply(action, response, listener)) { - chain.proceed(action, response, listener); - } - } - - /** - * Applies this filter and returns {@code true} if the execution chain should proceed, or {@code false} - * if it should be aborted since the filter already handled the response by calling the given listener. - */ - protected abstract boolean apply(String action, ActionResponse response, ActionListener listener); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/ActionFilterChain.java b/core/src/main/java/org/elasticsearch/action/support/ActionFilterChain.java index 29991451f2e..56ba070b1aa 100644 --- a/core/src/main/java/org/elasticsearch/action/support/ActionFilterChain.java +++ b/core/src/main/java/org/elasticsearch/action/support/ActionFilterChain.java @@ -34,10 +34,4 @@ public interface ActionFilterChain listener); - - /** - * Continue processing the response. Should only be called if a response has not been sent through - * the given {@link ActionListener listener} - */ - void proceed(final String action, final Response response, final ActionListener listener); } diff --git a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java index dbd08aa376f..e8f4d943e95 100644 --- a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -167,8 +167,7 @@ public abstract class TransportAction(actionName, listener, - new ResponseFilterChain<>(this.action.filters, logger))); + this.action.doExecute(task, request, listener); } else { listener.onFailure(new IllegalStateException("proceed was called too many times")); } @@ -178,69 +177,6 @@ public abstract class TransportAction listener) { - assert false : "request filter chain should never be called on the response side"; - } - } - - private static class ResponseFilterChain - implements ActionFilterChain { - - private final ActionFilter[] filters; - private final AtomicInteger index; - private final Logger logger; - - private ResponseFilterChain(ActionFilter[] filters, Logger logger) { - this.filters = filters; - this.index = new AtomicInteger(filters.length); - this.logger = logger; - } - - @Override - public void proceed(Task task, String action, Request request, ActionListener listener) { - assert false : "response filter chain should never be called on the request side"; - } - - @Override - public void proceed(String action, Response response, ActionListener listener) { - int i = index.decrementAndGet(); - try { - if (i >= 0) { - filters[i].apply(action, response, listener, this); - } else if (i == -1) { - listener.onResponse(response); - } else { - listener.onFailure(new IllegalStateException("proceed was called too many times")); - } - } catch (Exception e) { - logger.trace("Error during transport action execution.", e); - listener.onFailure(e); - } - } - } - - private static class FilteredActionListener implements ActionListener { - - private final String actionName; - private final ActionListener listener; - private final ResponseFilterChain chain; - - private FilteredActionListener(String actionName, ActionListener listener, ResponseFilterChain chain) { - this.actionName = actionName; - this.listener = listener; - this.chain = chain; - } - - @Override - public void onResponse(Response response) { - chain.proceed(actionName, response, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } } /** diff --git a/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index 228c68a1760..6df7c39aea9 100644 --- a/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -137,86 +137,6 @@ public class TransportActionFilterChainTests extends ESTestCase { } } - public void testActionFiltersResponse() throws ExecutionException, InterruptedException { - int numFilters = randomInt(10); - Set orders = new HashSet<>(numFilters); - while (orders.size() < numFilters) { - orders.add(randomInt(10)); - } - - Set filters = new HashSet<>(); - for (Integer order : orders) { - filters.add(new ResponseTestFilter(order, randomFrom(ResponseOperation.values()))); - } - - String actionName = randomAsciiOfLength(randomInt(30)); - ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { - @Override - protected void doExecute(TestRequest request, ActionListener listener) { - listener.onResponse(new TestResponse()); - } - }; - - ArrayList actionFiltersByOrder = new ArrayList<>(filters); - Collections.sort(actionFiltersByOrder, new Comparator() { - @Override - public int compare(ActionFilter o1, ActionFilter o2) { - return Integer.compare(o2.order(), o1.order()); - } - }); - - List expectedActionFilters = new ArrayList<>(); - boolean errorExpected = false; - for (ActionFilter filter : actionFiltersByOrder) { - ResponseTestFilter testFilter = (ResponseTestFilter) filter; - expectedActionFilters.add(testFilter); - if (testFilter.callback == ResponseOperation.LISTENER_FAILURE) { - errorExpected = true; - } - if (testFilter.callback != ResponseOperation.CONTINUE_PROCESSING) { - break; - } - } - - PlainListenableActionFuture future = new PlainListenableActionFuture<>(null); - transportAction.execute(new TestRequest(), future); - try { - assertThat(future.get(), notNullValue()); - assertThat("shouldn't get here if an error is expected", errorExpected, equalTo(false)); - } catch(ExecutionException e) { - assertThat("shouldn't get here if an error is not expected " + e.getMessage(), errorExpected, equalTo(true)); - } - - List testFiltersByLastExecution = new ArrayList<>(); - for (ActionFilter actionFilter : actionFilters.filters()) { - testFiltersByLastExecution.add((ResponseTestFilter) actionFilter); - } - Collections.sort(testFiltersByLastExecution, new Comparator() { - @Override - public int compare(ResponseTestFilter o1, ResponseTestFilter o2) { - return Integer.compare(o1.executionToken, o2.executionToken); - } - }); - - ArrayList finalTestFilters = new ArrayList<>(); - for (ActionFilter filter : testFiltersByLastExecution) { - ResponseTestFilter testFilter = (ResponseTestFilter) filter; - finalTestFilters.add(testFilter); - if (testFilter.callback != ResponseOperation.CONTINUE_PROCESSING) { - break; - } - } - - assertThat(finalTestFilters.size(), equalTo(expectedActionFilters.size())); - for (int i = 0; i < finalTestFilters.size(); i++) { - ResponseTestFilter testFilter = finalTestFilters.get(i); - assertThat(testFilter, equalTo(expectedActionFilters.get(i))); - assertThat(testFilter.runs.get(), equalTo(1)); - assertThat(testFilter.lastActionName, equalTo(actionName)); - } - } - public void testTooManyContinueProcessingRequest() throws ExecutionException, InterruptedException { final int additionalContinueCount = randomInt(10); @@ -274,63 +194,6 @@ public class TransportActionFilterChainTests extends ESTestCase { } } - public void testTooManyContinueProcessingResponse() throws ExecutionException, InterruptedException { - final int additionalContinueCount = randomInt(10); - - ResponseTestFilter testFilter = new ResponseTestFilter(randomInt(), new ResponseCallback() { - @Override - public void execute(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - for (int i = 0; i <= additionalContinueCount; i++) { - chain.proceed(action, response, listener); - } - } - }); - - Set filters = new HashSet<>(); - filters.add(testFilter); - - String actionName = randomAsciiOfLength(randomInt(30)); - ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { - @Override - protected void doExecute(TestRequest request, ActionListener listener) { - listener.onResponse(new TestResponse()); - } - }; - - final CountDownLatch latch = new CountDownLatch(additionalContinueCount + 1); - final AtomicInteger responses = new AtomicInteger(); - final List failures = new CopyOnWriteArrayList<>(); - - transportAction.execute(new TestRequest(), new ActionListener() { - @Override - public void onResponse(TestResponse testResponse) { - responses.incrementAndGet(); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - failures.add(e); - latch.countDown(); - } - }); - - if (!latch.await(10, TimeUnit.SECONDS)) { - fail("timeout waiting for the filter to notify the listener as many times as expected"); - } - - assertThat(testFilter.runs.get(), equalTo(1)); - assertThat(testFilter.lastActionName, equalTo(actionName)); - - assertThat(responses.get(), equalTo(1)); - assertThat(failures.size(), equalTo(additionalContinueCount)); - for (Throwable failure : failures) { - assertThat(failure, instanceOf(IllegalStateException.class)); - } - } - private class RequestTestFilter implements ActionFilter { private final RequestCallback callback; private final int order; @@ -356,45 +219,6 @@ public class TransportActionFilterChainTests extends ESTestCase { this.executionToken = counter.incrementAndGet(); this.callback.execute(task, action, request, listener, chain); } - - @Override - public void apply(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - chain.proceed(action, response, listener); - } - } - - private class ResponseTestFilter implements ActionFilter { - private final ResponseCallback callback; - private final int order; - AtomicInteger runs = new AtomicInteger(); - volatile String lastActionName; - volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list - - ResponseTestFilter(int order, ResponseCallback callback) { - this.order = order; - this.callback = callback; - } - - @Override - public int order() { - return order; - } - - @Override - public void apply(Task task, String action, Request request, - ActionListener listener, ActionFilterChain chain) { - chain.proceed(task, action, request, listener); - } - - @Override - public void apply(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - this.runs.incrementAndGet(); - this.lastActionName = action; - this.executionToken = counter.incrementAndGet(); - this.callback.execute(action, response, listener, chain); - } } private static enum RequestOperation implements RequestCallback { @@ -422,41 +246,11 @@ public class TransportActionFilterChainTests extends ESTestCase { } } - private static enum ResponseOperation implements ResponseCallback { - CONTINUE_PROCESSING { - @Override - public void execute(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - chain.proceed(action, response, listener); - } - }, - LISTENER_RESPONSE { - @Override - @SuppressWarnings("unchecked") // Safe because its all we test with - public void execute(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - ((ActionListener) listener).onResponse(new TestResponse()); - } - }, - LISTENER_FAILURE { - @Override - public void execute(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - listener.onFailure(new ElasticsearchTimeoutException("")); - } - } - } - private interface RequestCallback { void execute(Task task, String action, Request request, ActionListener listener, ActionFilterChain actionFilterChain); } - private interface ResponseCallback { - void execute(String action, Response response, ActionListener listener, - ActionFilterChain chain); - } - public static class TestRequest extends ActionRequest { @Override public ActionRequestValidationException validate() { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 111a3b1fe10..d7a62ca7f10 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -100,11 +100,6 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { return true; } - @Override - protected boolean apply(String action, ActionResponse response, ActionListener listener) { - return true; - } - @Override public int order() { return 0; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java index 47084d1d661..a5e9d149178 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java @@ -187,11 +187,5 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase { } chain.proceed(task, action, request, listener); } - - @Override - public void apply(String action, Response response, ActionListener listener, - ActionFilterChain chain) { - chain.proceed(action, response, listener); - } } } diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ContextAndHeaderTransportIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ContextAndHeaderTransportIT.java index 32cefab1d5d..4c39d80a674 100644 --- a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ContextAndHeaderTransportIT.java +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ContextAndHeaderTransportIT.java @@ -321,11 +321,6 @@ public class ContextAndHeaderTransportIT extends HttpSmokeTestCase { requests.add(new RequestAndHeaders(threadPool.getThreadContext().getHeaders(), request)); return true; } - - @Override - protected boolean apply(String action, ActionResponse response, ActionListener listener) { - return true; - } } private static class RequestAndHeaders {