Internal: use AtomicInteger instead of volatile int for the current action filter position

Also improved filter chain tests to not rely on execution time, and made filter chain tests look more similar to what happens in reality by removing multiple threads creation in testTooManyContinueProcessing (something we don't support anyway, makes little sense to test it).

Closes #7021
This commit is contained in:
javanna 2014-07-24 17:25:50 +02:00 committed by Luca Cavanna
parent 264d59c3e2
commit 0e9594e02d
6 changed files with 383 additions and 88 deletions

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture; import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
/** /**
@ -146,12 +148,12 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
private class TransportActionFilterChain implements ActionFilterChain { private class TransportActionFilterChain implements ActionFilterChain {
private volatile int index = 0; private final AtomicInteger index = new AtomicInteger();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void continueProcessing(String action, ActionRequest actionRequest, ActionListener actionListener) { public void continueProcessing(String action, ActionRequest actionRequest, ActionListener actionListener) {
int i = index++; int i = index.getAndIncrement();
try { try {
if (i < filters.length) { if (i < filters.length) {
filters[i].process(action, actionRequest, actionListener, this); filters[i].process(action, actionRequest, actionListener, this);

View File

@ -33,10 +33,9 @@ import org.elasticsearch.rest.support.RestUtils;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
/** /**
* *
@ -216,7 +215,7 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
private final RestFilter executionFilter; private final RestFilter executionFilter;
private volatile int index; private final AtomicInteger index = new AtomicInteger();
ControllerFilterChain(RestFilter executionFilter) { ControllerFilterChain(RestFilter executionFilter) {
this.executionFilter = executionFilter; this.executionFilter = executionFilter;
@ -225,8 +224,7 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
@Override @Override
public void continueProcessing(RestRequest request, RestChannel channel) { public void continueProcessing(RestRequest request, RestChannel channel) {
try { try {
int loc = index; int loc = index.getAndIncrement();
index++;
if (loc > filters.length) { if (loc > filters.length) {
throw new ElasticsearchIllegalStateException("filter continueProcessing was called more than expected"); throw new ElasticsearchIllegalStateException("filter continueProcessing was called more than expected");
} else if (loc == filters.length) { } else if (loc == filters.length) {

View File

@ -100,7 +100,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
Collections.sort(testFiltersByLastExecution, new Comparator<TestFilter>() { Collections.sort(testFiltersByLastExecution, new Comparator<TestFilter>() {
@Override @Override
public int compare(TestFilter o1, TestFilter o2) { public int compare(TestFilter o1, TestFilter o2) {
return Long.compare(o1.lastExecution, o2.lastExecution); return Integer.compare(o1.executionToken, o2.executionToken);
} }
}); });
@ -131,12 +131,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
@Override @Override
public void execute(final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) { public void execute(final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) {
for (int i = 0; i <= additionalContinueCount; i++) { for (int i = 0; i <= additionalContinueCount; i++) {
new Thread() { actionFilterChain.continueProcessing(action, actionRequest, actionListener);
@Override
public void run() {
actionFilterChain.continueProcessing(action, actionRequest, actionListener);
}
}.start();
} }
} }
}); });
@ -185,13 +180,15 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
} }
} }
private static class TestFilter implements ActionFilter { private final AtomicInteger counter = new AtomicInteger();
private class TestFilter implements ActionFilter {
private final int order; private final int order;
private final Callback callback; private final Callback callback;
AtomicInteger runs = new AtomicInteger(); AtomicInteger runs = new AtomicInteger();
volatile String lastActionName; volatile String lastActionName;
volatile long lastExecution = Long.MAX_VALUE; //the filters that don't run will go last in the sorted list volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list
TestFilter(int order, Callback callback) { TestFilter(int order, Callback callback) {
this.order = order; this.order = order;
@ -203,7 +200,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
public void process(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { public void process(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) {
this.runs.incrementAndGet(); this.runs.incrementAndGet();
this.lastActionName = action; this.lastActionName = action;
this.lastExecution = System.nanoTime(); this.executionToken = counter.incrementAndGet();
this.callback.execute(action, actionRequest, actionListener, actionFilterChain); this.callback.execute(action, actionRequest, actionListener, actionFilterChain);
} }

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest;
import org.elasticsearch.common.bytes.BytesReference;
import java.util.HashMap;
import java.util.Map;
class FakeRestRequest extends RestRequest {
private final Map<String, String> headers;
FakeRestRequest() {
this(new HashMap<String, String>());
}
FakeRestRequest(Map<String, String> headers) {
this.headers = headers;
}
@Override
public Method method() {
return Method.GET;
}
@Override
public String uri() {
return "/";
}
@Override
public String rawPath() {
return "/";
}
@Override
public boolean hasContent() {
return false;
}
@Override
public boolean contentUnsafe() {
return false;
}
@Override
public BytesReference content() {
return null;
}
@Override
public String header(String name) {
return headers.get(name);
}
@Override
public Iterable<Map.Entry<String, String>> headers() {
return headers.entrySet();
}
@Override
public boolean hasParam(String key) {
return false;
}
@Override
public String param(String key) {
return null;
}
@Override
public String param(String key, String defaultValue) {
return null;
}
@Override
public Map<String, String> params() {
return null;
}
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.client.*;
import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.AbstractClusterAdminClient; import org.elasticsearch.client.support.AbstractClusterAdminClient;
import org.elasticsearch.client.support.AbstractIndicesAdminClient; import org.elasticsearch.client.support.AbstractIndicesAdminClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -332,75 +331,6 @@ public class HeadersCopyClientTests extends ElasticsearchTestCase {
} }
} }
private static class FakeRestRequest extends RestRequest {
private final Map<String, String> headers;
private FakeRestRequest(Map<String, String> headers) {
this.headers = headers;
}
@Override
public Method method() {
return null;
}
@Override
public String uri() {
return null;
}
@Override
public String rawPath() {
return null;
}
@Override
public boolean hasContent() {
return false;
}
@Override
public boolean contentUnsafe() {
return false;
}
@Override
public BytesReference content() {
return null;
}
@Override
public String header(String name) {
return headers.get(name);
}
@Override
public Iterable<Map.Entry<String, String>> headers() {
return headers.entrySet();
}
@Override
public boolean hasParam(String key) {
return false;
}
@Override
public String param(String key) {
return null;
}
@Override
public String param(String key, String defaultValue) {
return null;
}
@Override
public Map<String, String> params() {
return null;
}
}
private static class NoOpClient extends AbstractClient implements AdminClient { private static class NoOpClient extends AbstractClient implements AdminClient {
@Override @Override

View File

@ -0,0 +1,270 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.equalTo;
public class RestFilterChainTests extends ElasticsearchTestCase {
@Test
public void testRestFilters() throws InterruptedException {
RestController restController = new RestController(ImmutableSettings.EMPTY);
int numFilters = randomInt(10);
Set<Integer> orders = new HashSet<>(numFilters);
while (orders.size() < numFilters) {
orders.add(randomInt(10));
}
List<RestFilter> filters = new ArrayList<>();
for (Integer order : orders) {
TestFilter testFilter = new TestFilter(order, randomFrom(Operation.values()));
filters.add(testFilter);
restController.registerFilter(testFilter);
}
ArrayList<RestFilter> restFiltersByOrder = Lists.newArrayList(filters);
Collections.sort(restFiltersByOrder, new Comparator<RestFilter>() {
@Override
public int compare(RestFilter o1, RestFilter o2) {
return Integer.compare(o2.order(), o1.order());
}
});
List<RestFilter> expectedRestFilters = Lists.newArrayList();
for (RestFilter filter : restFiltersByOrder) {
TestFilter testFilter = (TestFilter) filter;
expectedRestFilters.add(testFilter);
if (!(testFilter.callback == Operation.CONTINUE_PROCESSING) ) {
break;
}
}
restController.registerHandler(RestRequest.Method.GET, "/", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
channel.sendResponse(new TestResponse());
}
});
FakeRestRequest fakeRestRequest = new FakeRestRequest();
FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, 1);
restController.dispatchRequest(fakeRestRequest, fakeRestChannel);
assertThat(fakeRestChannel.await(), equalTo(true));
List<TestFilter> testFiltersByLastExecution = Lists.newArrayList();
for (RestFilter restFilter : filters) {
testFiltersByLastExecution.add((TestFilter)restFilter);
}
Collections.sort(testFiltersByLastExecution, new Comparator<TestFilter>() {
@Override
public int compare(TestFilter o1, TestFilter o2) {
return Long.compare(o1.executionToken, o2.executionToken);
}
});
ArrayList<TestFilter> finalTestFilters = Lists.newArrayList();
for (RestFilter filter : testFiltersByLastExecution) {
TestFilter testFilter = (TestFilter) filter;
finalTestFilters.add(testFilter);
if (!(testFilter.callback == Operation.CONTINUE_PROCESSING) ) {
break;
}
}
assertThat(finalTestFilters.size(), equalTo(expectedRestFilters.size()));
for (int i = 0; i < finalTestFilters.size(); i++) {
TestFilter testFilter = finalTestFilters.get(i);
assertThat(testFilter, equalTo(expectedRestFilters.get(i)));
assertThat(testFilter.runs.get(), equalTo(1));
}
}
@Test
public void testTooManyContinueProcessing() throws InterruptedException {
final int additionalContinueCount = randomInt(10);
TestFilter testFilter = new TestFilter(randomInt(), new Callback() {
@Override
public void execute(final RestRequest request, final RestChannel channel, final RestFilterChain filterChain) throws Exception {
for (int i = 0; i <= additionalContinueCount; i++) {
filterChain.continueProcessing(request, channel);
}
}
});
RestController restController = new RestController(ImmutableSettings.EMPTY);
restController.registerFilter(testFilter);
restController.registerHandler(RestRequest.Method.GET, "/", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
channel.sendResponse(new TestResponse());
}
});
FakeRestRequest fakeRestRequest = new FakeRestRequest();
FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, additionalContinueCount + 1);
restController.dispatchRequest(fakeRestRequest, fakeRestChannel);
fakeRestChannel.await();
assertThat(testFilter.runs.get(), equalTo(1));
assertThat(fakeRestChannel.responses.get(), equalTo(1));
assertThat(fakeRestChannel.errors.get(), equalTo(additionalContinueCount));
}
private static class FakeRestChannel extends RestChannel {
private final CountDownLatch latch;
AtomicInteger responses = new AtomicInteger();
AtomicInteger errors = new AtomicInteger();
protected FakeRestChannel(RestRequest request, int responseCount) {
super(request);
this.latch = new CountDownLatch(responseCount);
}
@Override
public XContentBuilder newBuilder() throws IOException {
return super.newBuilder();
}
@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource) throws IOException {
return super.newBuilder(autoDetectSource);
}
@Override
protected BytesStreamOutput newBytesOutput() {
return super.newBytesOutput();
}
@Override
public RestRequest request() {
return super.request();
}
@Override
public void sendResponse(RestResponse response) {
if (response.status() == RestStatus.OK) {
responses.incrementAndGet();
} else {
errors.incrementAndGet();
}
latch.countDown();
}
public boolean await() throws InterruptedException {
return latch.await(10, TimeUnit.SECONDS);
}
}
private static enum Operation implements Callback {
CONTINUE_PROCESSING {
@Override
public void execute(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
filterChain.continueProcessing(request, channel);
}
},
CHANNEL_RESPONSE {
@Override
public void execute(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
channel.sendResponse(new TestResponse());
}
}
}
private static interface Callback {
void execute(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception;
}
private final AtomicInteger counter = new AtomicInteger();
private class TestFilter extends RestFilter {
private final int order;
private final Callback callback;
AtomicInteger runs = new AtomicInteger();
volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list
TestFilter(int order, Callback callback) {
this.order = order;
this.callback = callback;
}
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
this.runs.incrementAndGet();
this.executionToken = counter.incrementAndGet();
this.callback.execute(request, channel, filterChain);
}
@Override
public int order() {
return order;
}
@Override
public String toString() {
return "[order:" + order + ", executionToken:" + executionToken + "]";
}
}
private static class TestResponse extends RestResponse {
@Override
public String contentType() {
return null;
}
@Override
public boolean contentThreadSafe() {
return false;
}
@Override
public BytesReference content() {
return null;
}
@Override
public RestStatus status() {
return RestStatus.OK;
}
}
}