From 0e9594e02dae2859b0b247f8d3c1242acbf0e63d Mon Sep 17 00:00:00 2001 From: javanna Date: Thu, 24 Jul 2014 17:25:50 +0200 Subject: [PATCH] Internal: use AtomicInteger instead of volatile int for the current action filter position Also improved filter chain tests to not rely on execution time, and made filter chain tests look more similar to what happens in reality by removing multiple threads creation in testTooManyContinueProcessing (something we don't support anyway, makes little sense to test it). Closes #7021 --- .../action/support/TransportAction.java | 6 +- .../elasticsearch/rest/RestController.java | 10 +- .../TransportActionFilterChainTests.java | 17 +- .../elasticsearch/rest/FakeRestRequest.java | 98 +++++++ .../rest/HeadersCopyClientTests.java | 70 ----- .../rest/RestFilterChainTests.java | 270 ++++++++++++++++++ 6 files changed, 383 insertions(+), 88 deletions(-) create mode 100644 src/test/java/org/elasticsearch/rest/FakeRestRequest.java create mode 100644 src/test/java/org/elasticsearch/rest/RestFilterChainTests.java diff --git a/src/main/java/org/elasticsearch/action/support/TransportAction.java b/src/main/java/org/elasticsearch/action/support/TransportAction.java index cfff3b8cef6..1e3b8a8eb53 100644 --- a/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; +import java.util.concurrent.atomic.AtomicInteger; + import static org.elasticsearch.action.support.PlainActionFuture.newFuture; /** @@ -146,12 +148,12 @@ public abstract class TransportAction { private final RestFilter executionFilter; - private volatile int index; + private final AtomicInteger index = new AtomicInteger(); ControllerFilterChain(RestFilter executionFilter) { this.executionFilter = executionFilter; @@ -225,8 +224,7 @@ public class RestController extends AbstractLifecycleComponent { @Override public void continueProcessing(RestRequest request, RestChannel channel) { try { - int loc = index; - index++; + int loc = index.getAndIncrement(); if (loc > filters.length) { throw new ElasticsearchIllegalStateException("filter continueProcessing was called more than expected"); } else if (loc == filters.length) { diff --git a/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index f5fa0bac936..433165afb06 100644 --- a/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -100,7 +100,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase { Collections.sort(testFiltersByLastExecution, new Comparator() { @Override public int compare(TestFilter o1, TestFilter o2) { - return Long.compare(o1.lastExecution, o2.lastExecution); + return Integer.compare(o1.executionToken, o2.executionToken); } }); @@ -131,12 +131,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase { @Override public void execute(final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) { for (int i = 0; i <= additionalContinueCount; i++) { - new Thread() { - @Override - public void run() { - actionFilterChain.continueProcessing(action, actionRequest, actionListener); - } - }.start(); + actionFilterChain.continueProcessing(action, actionRequest, actionListener); } } }); @@ -185,13 +180,15 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase { } } - private static class TestFilter implements ActionFilter { + private final AtomicInteger counter = new AtomicInteger(); + + private class TestFilter implements ActionFilter { private final int order; private final Callback callback; AtomicInteger runs = new AtomicInteger(); volatile String lastActionName; - volatile long lastExecution = Long.MAX_VALUE; //the filters that don't run will go last in the sorted list + volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list TestFilter(int order, Callback callback) { this.order = order; @@ -203,7 +200,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase { public void process(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { this.runs.incrementAndGet(); this.lastActionName = action; - this.lastExecution = System.nanoTime(); + this.executionToken = counter.incrementAndGet(); this.callback.execute(action, actionRequest, actionListener, actionFilterChain); } diff --git a/src/test/java/org/elasticsearch/rest/FakeRestRequest.java b/src/test/java/org/elasticsearch/rest/FakeRestRequest.java new file mode 100644 index 00000000000..e9f6dafe580 --- /dev/null +++ b/src/test/java/org/elasticsearch/rest/FakeRestRequest.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest; + +import org.elasticsearch.common.bytes.BytesReference; + +import java.util.HashMap; +import java.util.Map; + +class FakeRestRequest extends RestRequest { + + private final Map headers; + + FakeRestRequest() { + this(new HashMap()); + } + + FakeRestRequest(Map headers) { + this.headers = headers; + } + + @Override + public Method method() { + return Method.GET; + } + + @Override + public String uri() { + return "/"; + } + + @Override + public String rawPath() { + return "/"; + } + + @Override + public boolean hasContent() { + return false; + } + + @Override + public boolean contentUnsafe() { + return false; + } + + @Override + public BytesReference content() { + return null; + } + + @Override + public String header(String name) { + return headers.get(name); + } + + @Override + public Iterable> headers() { + return headers.entrySet(); + } + + @Override + public boolean hasParam(String key) { + return false; + } + + @Override + public String param(String key) { + return null; + } + + @Override + public String param(String key, String defaultValue) { + return null; + } + + @Override + public Map params() { + return null; + } +} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/rest/HeadersCopyClientTests.java b/src/test/java/org/elasticsearch/rest/HeadersCopyClientTests.java index e7a868c3263..5536a1c03a7 100644 --- a/src/test/java/org/elasticsearch/rest/HeadersCopyClientTests.java +++ b/src/test/java/org/elasticsearch/rest/HeadersCopyClientTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.client.*; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.support.AbstractClusterAdminClient; import org.elasticsearch.client.support.AbstractIndicesAdminClient; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -332,75 +331,6 @@ public class HeadersCopyClientTests extends ElasticsearchTestCase { } } - private static class FakeRestRequest extends RestRequest { - - private final Map headers; - - private FakeRestRequest(Map headers) { - this.headers = headers; - } - - @Override - public Method method() { - return null; - } - - @Override - public String uri() { - return null; - } - - @Override - public String rawPath() { - return null; - } - - @Override - public boolean hasContent() { - return false; - } - - @Override - public boolean contentUnsafe() { - return false; - } - - @Override - public BytesReference content() { - return null; - } - - @Override - public String header(String name) { - return headers.get(name); - } - - @Override - public Iterable> headers() { - return headers.entrySet(); - } - - @Override - public boolean hasParam(String key) { - return false; - } - - @Override - public String param(String key) { - return null; - } - - @Override - public String param(String key, String defaultValue) { - return null; - } - - @Override - public Map params() { - return null; - } - } - private static class NoOpClient extends AbstractClient implements AdminClient { @Override diff --git a/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java b/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java new file mode 100644 index 00000000000..3e84600b0b7 --- /dev/null +++ b/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java @@ -0,0 +1,270 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest; + +import com.google.common.collect.Lists; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class RestFilterChainTests extends ElasticsearchTestCase { + + @Test + public void testRestFilters() throws InterruptedException { + + RestController restController = new RestController(ImmutableSettings.EMPTY); + + int numFilters = randomInt(10); + Set orders = new HashSet<>(numFilters); + while (orders.size() < numFilters) { + orders.add(randomInt(10)); + } + + List filters = new ArrayList<>(); + for (Integer order : orders) { + TestFilter testFilter = new TestFilter(order, randomFrom(Operation.values())); + filters.add(testFilter); + restController.registerFilter(testFilter); + } + + ArrayList restFiltersByOrder = Lists.newArrayList(filters); + Collections.sort(restFiltersByOrder, new Comparator() { + @Override + public int compare(RestFilter o1, RestFilter o2) { + return Integer.compare(o2.order(), o1.order()); + } + }); + + List expectedRestFilters = Lists.newArrayList(); + for (RestFilter filter : restFiltersByOrder) { + TestFilter testFilter = (TestFilter) filter; + expectedRestFilters.add(testFilter); + if (!(testFilter.callback == Operation.CONTINUE_PROCESSING) ) { + break; + } + } + + restController.registerHandler(RestRequest.Method.GET, "/", new RestHandler() { + @Override + public void handleRequest(RestRequest request, RestChannel channel) throws Exception { + channel.sendResponse(new TestResponse()); + } + }); + + FakeRestRequest fakeRestRequest = new FakeRestRequest(); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, 1); + restController.dispatchRequest(fakeRestRequest, fakeRestChannel); + assertThat(fakeRestChannel.await(), equalTo(true)); + + + List testFiltersByLastExecution = Lists.newArrayList(); + for (RestFilter restFilter : filters) { + testFiltersByLastExecution.add((TestFilter)restFilter); + } + Collections.sort(testFiltersByLastExecution, new Comparator() { + @Override + public int compare(TestFilter o1, TestFilter o2) { + return Long.compare(o1.executionToken, o2.executionToken); + } + }); + + ArrayList finalTestFilters = Lists.newArrayList(); + for (RestFilter filter : testFiltersByLastExecution) { + TestFilter testFilter = (TestFilter) filter; + finalTestFilters.add(testFilter); + if (!(testFilter.callback == Operation.CONTINUE_PROCESSING) ) { + break; + } + } + + assertThat(finalTestFilters.size(), equalTo(expectedRestFilters.size())); + + for (int i = 0; i < finalTestFilters.size(); i++) { + TestFilter testFilter = finalTestFilters.get(i); + assertThat(testFilter, equalTo(expectedRestFilters.get(i))); + assertThat(testFilter.runs.get(), equalTo(1)); + } + } + + @Test + public void testTooManyContinueProcessing() throws InterruptedException { + + final int additionalContinueCount = randomInt(10); + + TestFilter testFilter = new TestFilter(randomInt(), new Callback() { + @Override + public void execute(final RestRequest request, final RestChannel channel, final RestFilterChain filterChain) throws Exception { + for (int i = 0; i <= additionalContinueCount; i++) { + filterChain.continueProcessing(request, channel); + } + } + }); + + RestController restController = new RestController(ImmutableSettings.EMPTY); + restController.registerFilter(testFilter); + + restController.registerHandler(RestRequest.Method.GET, "/", new RestHandler() { + @Override + public void handleRequest(RestRequest request, RestChannel channel) throws Exception { + channel.sendResponse(new TestResponse()); + } + }); + + FakeRestRequest fakeRestRequest = new FakeRestRequest(); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, additionalContinueCount + 1); + restController.dispatchRequest(fakeRestRequest, fakeRestChannel); + fakeRestChannel.await(); + + assertThat(testFilter.runs.get(), equalTo(1)); + + assertThat(fakeRestChannel.responses.get(), equalTo(1)); + assertThat(fakeRestChannel.errors.get(), equalTo(additionalContinueCount)); + } + + private static class FakeRestChannel extends RestChannel { + + private final CountDownLatch latch; + AtomicInteger responses = new AtomicInteger(); + AtomicInteger errors = new AtomicInteger(); + + protected FakeRestChannel(RestRequest request, int responseCount) { + super(request); + this.latch = new CountDownLatch(responseCount); + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return super.newBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource) throws IOException { + return super.newBuilder(autoDetectSource); + } + + @Override + protected BytesStreamOutput newBytesOutput() { + return super.newBytesOutput(); + } + + @Override + public RestRequest request() { + return super.request(); + } + + @Override + public void sendResponse(RestResponse response) { + if (response.status() == RestStatus.OK) { + responses.incrementAndGet(); + } else { + errors.incrementAndGet(); + } + latch.countDown(); + } + + public boolean await() throws InterruptedException { + return latch.await(10, TimeUnit.SECONDS); + } + } + + private static enum Operation implements Callback { + CONTINUE_PROCESSING { + @Override + public void execute(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception { + filterChain.continueProcessing(request, channel); + } + }, + CHANNEL_RESPONSE { + @Override + public void execute(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception { + channel.sendResponse(new TestResponse()); + } + } + } + + private static interface Callback { + void execute(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception; + } + + private final AtomicInteger counter = new AtomicInteger(); + + private class TestFilter extends RestFilter { + private final int order; + private final Callback callback; + AtomicInteger runs = new AtomicInteger(); + volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list + + TestFilter(int order, Callback callback) { + this.order = order; + this.callback = callback; + } + + @Override + public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception { + this.runs.incrementAndGet(); + this.executionToken = counter.incrementAndGet(); + this.callback.execute(request, channel, filterChain); + } + + @Override + public int order() { + return order; + } + + @Override + public String toString() { + return "[order:" + order + ", executionToken:" + executionToken + "]"; + } + } + + private static class TestResponse extends RestResponse { + @Override + public String contentType() { + return null; + } + + @Override + public boolean contentThreadSafe() { + return false; + } + + @Override + public BytesReference content() { + return null; + } + + @Override + public RestStatus status() { + return RestStatus.OK; + } + } +}