Improved multi percolate api, to bundle all the requests by shard into one shard level request instead of sending each percolate request separately to the shards.

This commit is contained in:
Martijn van Groningen 2013-08-19 18:59:28 +02:00
parent 11fab5c66f
commit 5ec0276fc5
10 changed files with 887 additions and 32 deletions

View File

@ -120,10 +120,7 @@ import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.mlt.MoreLikeThisAction;
import org.elasticsearch.action.mlt.TransportMoreLikeThisAction;
import org.elasticsearch.action.percolate.MultiPercolateAction;
import org.elasticsearch.action.percolate.PercolateAction;
import org.elasticsearch.action.percolate.TransportMultiPercolateAction;
import org.elasticsearch.action.percolate.TransportPercolateAction;
import org.elasticsearch.action.percolate.*;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.type.*;
import org.elasticsearch.action.suggest.SuggestAction;
@ -253,7 +250,7 @@ public class ActionModule extends AbstractModule {
registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
registerAction(MoreLikeThisAction.INSTANCE, TransportMoreLikeThisAction.class);
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
// register Name -> GenericAction Map that can be injected to instances.

View File

@ -41,6 +41,7 @@ public class MultiPercolateResponse extends ActionResponse implements Iterable<M
}
public MultiPercolateResponse() {
this.items = new Item[0];
}
@Override
@ -99,8 +100,11 @@ public class MultiPercolateResponse extends ActionResponse implements Iterable<M
private PercolateResponse response;
private String errorMessage;
public Item(PercolateResponse response, String errorMessage) {
public Item(PercolateResponse response) {
this.response = response;
}
public Item(String errorMessage) {
this.errorMessage = errorMessage;
}

View File

@ -4,6 +4,7 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -19,6 +20,10 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
public PercolateShardRequest() {
}
public PercolateShardRequest(String index, int shardId) {
super(index, shardId);
}
public PercolateShardRequest(String index, int shardId, PercolateRequest request) {
super(index, shardId, request);
this.documentType = request.documentType();
@ -27,6 +32,14 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
this.onlyCount = request.onlyCount();
}
public PercolateShardRequest(ShardId shardId, PercolateRequest request) {
super(shardId.index().name(), shardId.id());
this.documentType = request.documentType();
this.source = request.source();
this.docSource = request.docSource();
this.onlyCount = request.onlyCount();
}
public String documentType() {
return documentType;
}
@ -43,6 +56,22 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
return onlyCount;
}
void documentType(String documentType) {
this.documentType = documentType;
}
void source(BytesReference source) {
this.source = source;
}
void docSource(BytesReference docSource) {
this.docSource = docSource;
}
void onlyCount(boolean onlyCount) {
this.onlyCount = onlyCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -1,73 +1,266 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.percolate;
import gnu.trove.list.array.TIntArrayList;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public class TransportMultiPercolateAction extends TransportAction<MultiPercolateRequest, MultiPercolateResponse> {
private final TransportPercolateAction percolateAction;
private final ClusterService clusterService;
private final PercolatorService percolatorService;
private final TransportMultiGetAction multiGetAction;
private final TransportShardMultiPercolateAction shardMultiPercolateAction;
@Inject
public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportPercolateAction percolateAction, ClusterService clusterService, TransportService transportService) {
public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportShardMultiPercolateAction shardMultiPercolateAction,
ClusterService clusterService, TransportService transportService, PercolatorService percolatorService,
TransportMultiGetAction multiGetAction) {
super(settings, threadPool);
this.percolateAction = percolateAction;
this.shardMultiPercolateAction = shardMultiPercolateAction;
this.clusterService = clusterService;
this.percolatorService = percolatorService;
this.multiGetAction = multiGetAction;
transportService.registerHandler(MultiPercolateAction.NAME, new TransportHandler());
}
@Override
protected void doExecute(MultiPercolateRequest request, final ActionListener<MultiPercolateResponse> listener) {
ClusterState clusterState = clusterService.state();
protected void doExecute(final MultiPercolateRequest request, final ActionListener<MultiPercolateResponse> listener) {
final ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
@SuppressWarnings("unchecked")
final List<Object> percolateRequests = (List) request.requests();
final TIntArrayList slots = new TIntArrayList();
final List<GetRequest> existingDocsRequests = new ArrayList<GetRequest>();
for (int i = 0; i < request.requests().size(); i++) {
PercolateRequest percolateRequest = request.requests().get(i);
percolateRequest.startTime = System.currentTimeMillis();
if (percolateRequest.getRequest() != null) {
existingDocsRequests.add(percolateRequest.getRequest());
slots.add(i);
}
}
if (!existingDocsRequests.isEmpty()) {
final MultiGetRequest multiGetRequest = new MultiGetRequest();
for (GetRequest getRequest : existingDocsRequests) {
multiGetRequest.add(
new MultiGetRequest.Item(getRequest.index(), getRequest.type(), getRequest.id())
.routing(getRequest.routing())
);
}
multiGetAction.execute(multiGetRequest, new ActionListener<MultiGetResponse>() {
final MultiPercolateResponse.Item[] responses = new MultiPercolateResponse.Item[request.requests().size()];
final AtomicInteger counter = new AtomicInteger(responses.length);
for (int i = 0; i < responses.length; i++) {
final int index = i;
percolateAction.execute(request.requests().get(i), new ActionListener<PercolateResponse>() {
@Override
public void onResponse(PercolateResponse percolateResponse) {
synchronized (responses) {
responses[index] = new MultiPercolateResponse.Item(percolateResponse, null);
}
if (counter.decrementAndGet() == 0) {
finishHim();
public void onResponse(MultiGetResponse multiGetItemResponses) {
for (int i = 0; i < multiGetItemResponses.getResponses().length; i++) {
MultiGetItemResponse itemResponse = multiGetItemResponses.getResponses()[i];
int slot = slots.get(i);
if (!itemResponse.isFailed()) {
GetResponse getResponse = itemResponse.getResponse();
if (getResponse.isExists()) {
percolateRequests.set(slot, new PercolateRequest((PercolateRequest) percolateRequests.get(slot), getResponse.getSourceAsBytesRef()));
} else {
percolateRequests.set(slot, new DocumentMissingException(null, getResponse.getType(), getResponse.getId()));
}
} else {
percolateRequests.set(slot, itemResponse.getFailure());
}
}
multiPercolate(request, percolateRequests, listener, clusterState);
}
@Override
public void onFailure(Throwable e) {
synchronized (responses) {
responses[index] = new MultiPercolateResponse.Item(null, ExceptionsHelper.detailedMessage(e));
listener.onFailure(e);
}
});
} else {
multiPercolate(request, percolateRequests, listener, clusterState);
}
}
private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final List<Object> percolateRequests,
final ActionListener<MultiPercolateResponse> listener, ClusterState clusterState) {
final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.size()];
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(multiPercolateRequest.requests().size());
final AtomicArray<Object> reducedResponses = new AtomicArray<Object>(percolateRequests.size());
// Resolving concrete indices and routing and grouping the requests by shard
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
int expectedResults = 0;
for (int i = 0; i < percolateRequests.size(); i++) {
Object element = percolateRequests.get(i);
assert element != null;
if (element instanceof PercolateRequest) {
PercolateRequest percolateRequest = (PercolateRequest) element;
String[] concreteIndices = clusterState.metaData().concreteIndices(percolateRequest.indices(), percolateRequest.ignoreIndices(), true);
Map<String, Set<String>> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), multiPercolateRequest.indices());
// TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction
GroupShardsIterator shards = clusterService.operationRouting().searchShards(
clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference()
);
responsesByItemAndShard.set(i, new AtomicReferenceArray(shards.size()));
expectedOperationsPerItem[i] = new AtomicInteger(shards.size());
for (ShardIterator shard : shards) {
ShardId shardId = shard.shardId();
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
if (requests == null) {
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference()));
}
if (counter.decrementAndGet() == 0) {
finishHim();
requests.add(new TransportShardMultiPercolateAction.Request.Item(i, new PercolateShardRequest(shardId, percolateRequest)));
}
expectedResults++;
} else if (element instanceof Throwable) {
reducedResponses.set(i, element);
responsesByItemAndShard.set(i, new AtomicReferenceArray(0));
expectedOperationsPerItem[i] = new AtomicInteger(0);
}
}
if (expectedResults == 0) {
finish(reducedResponses, listener);
return;
}
final AtomicInteger expectedOperations = new AtomicInteger(expectedResults);
for (Map.Entry<ShardId, TransportShardMultiPercolateAction.Request> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final TransportShardMultiPercolateAction.Request shardRequest = entry.getValue();
shardMultiPercolateAction.execute(shardRequest, new ActionListener<TransportShardMultiPercolateAction.Response>() {
@Override
@SuppressWarnings("unchecked")
public void onResponse(TransportShardMultiPercolateAction.Response response) {
try {
for (TransportShardMultiPercolateAction.Response.Item item : response.items()) {
AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot());
if (shardResults == null) {
continue;
}
if (item.failed()) {
shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, item.error().string()));
} else {
shardResults.set(shardId.id(), item.response());
}
assert expectedOperationsPerItem[item.slot()].get() >= 1;
if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) {
reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
}
}
} catch (Throwable e) {
listener.onFailure(e);
}
}
private void finishHim() {
listener.onResponse(new MultiPercolateResponse(responses));
@Override
@SuppressWarnings("unchecked")
public void onFailure(Throwable e) {
try {
for (TransportShardMultiPercolateAction.Request.Item item : shardRequest.items()) {
AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot());
if (shardResults == null) {
continue;
}
shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e));
assert expectedOperationsPerItem[item.slot()].get() >= 1;
if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) {
reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
}
}
} catch (Throwable t) {
logger.error("{} Percolate original reduce error", e, shardId);
listener.onFailure(t);
}
}
});
}
}
private void reduce(int slot,
List<Object> percolateRequests,
AtomicInteger expectedOperations,
AtomicArray<Object> reducedResponses,
ActionListener<MultiPercolateResponse> listener,
AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard) {
AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot);
PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService);
reducedResponses.set(slot, reducedResponse);
assert expectedOperations.get() >= 1;
if (expectedOperations.decrementAndGet() == 0) {
finish(reducedResponses, listener);
}
}
private void finish(AtomicArray<Object> reducedResponses, ActionListener<MultiPercolateResponse> listener) {
MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()];
for (int i = 0; i < reducedResponses.length(); i++) {
Object element = reducedResponses.get(i);
assert element != null;
if (element instanceof PercolateResponse) {
finalResponse[i] = new MultiPercolateResponse.Item((PercolateResponse) element);
} else if (element instanceof Throwable) {
finalResponse[i] = new MultiPercolateResponse.Item(ExceptionsHelper.detailedMessage((Throwable) element));
}
}
listener.onResponse(new MultiPercolateResponse(finalResponse));
}
class TransportHandler extends BaseTransportRequestHandler<MultiPercolateRequest> {
@Override
@ -94,7 +287,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [msearch] and request [" + request + "]", e1);
logger.warn("Failed to send error response for action [mpercolate] and request [" + request + "]", e1);
}
}
});

View File

@ -120,6 +120,10 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
@Override
protected PercolateResponse newResponse(PercolateRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
return reduce(request, shardsResponses, percolatorService);
}
public static PercolateResponse reduce(PercolateRequest request, AtomicReferenceArray shardsResponses, PercolatorService percolatorService) {
int successfulShards = 0;
int failedShards = 0;

View File

@ -0,0 +1,274 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.percolate;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class TransportShardMultiPercolateAction extends TransportShardSingleOperationAction<TransportShardMultiPercolateAction.Request, TransportShardMultiPercolateAction.Response> {
private final PercolatorService percolatorService;
@Inject
public TransportShardMultiPercolateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, PercolatorService percolatorService) {
super(settings, threadPool, clusterService, transportService);
this.percolatorService = percolatorService;
}
@Override
protected String transportAction() {
return "mpercolate/shard";
}
@Override
protected String executor() {
return ThreadPool.Names.PERCOLATE;
}
@Override
protected Request newRequest() {
return new Request();
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
@Override
protected ShardIterator shards(ClusterState state, Request request) throws ElasticSearchException {
return clusterService.operationRouting().getShards(
clusterService.state(), request.index(), request.shardId(), request.preference
);
}
@Override
protected Response shardOperation(Request request, int shardId) throws ElasticSearchException {
// TODO: Look into combining the shard req's docs into one in memory index.
Response response = new Response();
response.items = new ArrayList<Response.Item>(request.items.size());
for (Request.Item item : request.items) {
Response.Item responseItem = new Response.Item();
responseItem.slot = item.slot;
try {
responseItem.response = percolatorService.percolate(item.request);
} catch (Throwable e) {
logger.trace("[{}][{}] failed to multi percolate", e, request.index(), request.shardId());
if (TransportActions.isShardNotAvailableException(e)) {
throw new ElasticSearchException("", e);
} else {
responseItem.error = new StringText(ExceptionsHelper.detailedMessage(e));
}
}
response.items.add(responseItem);
}
return response;
}
public static class Request extends SingleShardOperationRequest {
private int shardId;
private String preference;
private List<Item> items;
public Request() {
}
public Request(String concreteIndex, int shardId, String preference) {
this.index = concreteIndex;
this.shardId = shardId;
this.preference = preference;
this.items = new ArrayList<Item>();
}
public int shardId() {
return shardId;
}
public void add(Item item) {
items.add(item);
}
public List<Item> items() {
return items;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = in.readVInt();
preference = in.readOptionalString();
int size = in.readVInt();
items = new ArrayList<Item>(size);
for (int i = 0; i < size; i++) {
Item item = new Item();
item.slot = in.readVInt();
item.request = new PercolateShardRequest(index(), shardId);
item.request.documentType(in.readString());
item.request.source(in.readBytesReference());
item.request.docSource(in.readBytesReference());
item.request.onlyCount(in.readBoolean());
items.add(item);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(shardId);
out.writeOptionalString(preference);
out.writeVInt(items.size());
for (Item item : items) {
out.writeVInt(item.slot);
out.writeString(item.request.documentType());
out.writeBytesReference(item.request.source());
out.writeBytesReference(item.request.docSource());
out.writeBoolean(item.request.onlyCount());
}
}
public static class Item {
private int slot;
private PercolateShardRequest request;
Item() {
}
public Item(int slot, PercolateShardRequest request) {
this.slot = slot;
this.request = request;
}
public int slot() {
return slot;
}
public PercolateShardRequest request() {
return request;
}
}
}
public static class Response extends ActionResponse {
private List<Item> items;
public List<Item> items() {
return items;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(items.size());
for (Item item : items) {
out.writeVInt(item.slot);
if (item.response != null) {
out.writeBoolean(true);
item.response.writeTo(out);
} else {
out.writeBoolean(false);
out.writeText(item.error);
}
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
items = new ArrayList<Item>(size);
for (int i = 0; i < size; i++) {
Item item = new Item();
item.slot = in.readVInt();
if (in.readBoolean()) {
item.response = new PercolateShardResponse();
item.response.readFrom(in);
} else {
item.error = in.readText();
}
items.add(item);
}
}
public static class Item {
private int slot;
private PercolateShardResponse response;
private Text error;
public int slot() {
return slot;
}
public PercolateShardResponse response() {
return response;
}
public Text error() {
return error;
}
public boolean failed() {
return error != null;
}
}
}
}

View File

@ -42,6 +42,11 @@ public abstract class BroadcastShardOperationRequest extends TransportRequest {
this.shardId = shardId;
}
public BroadcastShardOperationRequest(String index, int shardId) {
this.index = index;
this.shardId = shardId;
}
public String index() {
return this.index;
}

View File

@ -217,10 +217,11 @@ public class PercolatorService extends AbstractComponent {
XContentParser parser = null;
// Some queries (function_score query when for decay functions) rely on SearchContext being set:
SearchContext.setCurrent(new SearchContext(0,
SearchContext searchContext = new SearchContext(0,
new ShardSearchRequest().types(new String[0]),
null, context.indexShard.searcher(), context.percolateIndexService, context.indexShard,
null, null));
null, null);
SearchContext.setCurrent(searchContext);
try {
parser = XContentFactory.xContent(source).createParser(source);
String currentFieldName = null;
@ -271,7 +272,7 @@ public class PercolatorService extends AbstractComponent {
} catch (IOException e) {
throw new ElasticSearchParseException("failed to parse request", e);
} finally {
SearchContext.current().release();
searchContext.release();
SearchContext.removeCurrent();
if (parser != null) {
parser.close();

View File

@ -18,14 +18,18 @@
package org.elasticsearch.test.integration.percolator;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
import org.elasticsearch.action.percolate.MultiPercolateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.integration.percolator.SimplePercolatorTests.convertFromTextArray;
import static org.hamcrest.Matchers.*;
@ -80,6 +84,8 @@ public class MultiPercolatorTests extends AbstractSharedClusterTest {
item = response.getItems()[1];
assertThat(item.errorMessage(), nullValue());
assertNoFailures(item.response());
assertThat(item.getResponse().getMatches(), arrayWithSize(2));
assertThat(item.getResponse().getMatches(), arrayWithSize(2));
assertThat(item.getResponse().getCount(), equalTo(2l));
@ -87,12 +93,14 @@ public class MultiPercolatorTests extends AbstractSharedClusterTest {
item = response.getItems()[2];
assertThat(item.errorMessage(), nullValue());
assertNoFailures(item.response());
assertThat(item.getResponse().getMatches(), arrayWithSize(4));
assertThat(item.getResponse().getCount(), equalTo(4l));
assertThat(convertFromTextArray(item.getResponse().getMatches(), "test"), arrayContainingInAnyOrder("1", "2", "3", "4"));
item = response.getItems()[3];
assertThat(item.errorMessage(), nullValue());
assertNoFailures(item.response());
assertThat(item.getResponse().getMatches(), arrayWithSize(1));
assertThat(item.getResponse().getCount(), equalTo(1l));
assertThat(convertFromTextArray(item.getResponse().getMatches(), "test"), arrayContaining("4"));
@ -103,4 +111,162 @@ public class MultiPercolatorTests extends AbstractSharedClusterTest {
assertThat(item.errorMessage(), containsString("document missing"));
}
@Test
public void testExistingDocsOnly() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build())
.execute().actionGet();
ensureGreen();
int numQueries = randomIntBetween(50, 100);
logger.info("--> register a queries");
for (int i = 0; i < numQueries; i++) {
client().prepareIndex("test", "_percolator", Integer.toString(i))
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
.execute().actionGet();
}
client().prepareIndex("test", "type", "1")
.setSource(jsonBuilder().startObject().field("field", "a"))
.execute().actionGet();
MultiPercolateRequestBuilder builder = client().prepareMultiPercolate();
int numPercolateRequest = randomIntBetween(50, 100);
for (int i = 0; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
.setIndices("test").setDocumentType("type"));
}
MultiPercolateResponse response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest));
for (MultiPercolateResponse.Item item : response) {
assertThat(item.isFailure(), equalTo(false));
assertNoFailures(item.response());
assertThat(item.getResponse().getCount(), equalTo((long) numQueries));
assertThat(item.getResponse().getMatches().length, equalTo(numQueries));
}
// Non existing doc
builder = client().prepareMultiPercolate();
for (int i = 0; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
.setGetRequest(Requests.getRequest("test").type("type").id("2"))
.setIndices("test").setDocumentType("type"));
}
response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest));
for (MultiPercolateResponse.Item item : response) {
assertThat(item.isFailure(), equalTo(true));
assertThat(item.errorMessage(), containsString("document missing"));
assertThat(item.getResponse(), nullValue());
}
// One existing doc
builder = client().prepareMultiPercolate();
for (int i = 0; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
.setGetRequest(Requests.getRequest("test").type("type").id("2"))
.setIndices("test").setDocumentType("type"));
}
builder.add(
client().preparePercolate()
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
.setIndices("test").setDocumentType("type"));
response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest + 1));
assertThat(response.items()[numPercolateRequest].isFailure(), equalTo(false));
assertNoFailures(response.items()[numPercolateRequest].response());
assertThat(response.items()[numPercolateRequest].getResponse().getCount(), equalTo((long) numQueries));
assertThat(response.items()[numPercolateRequest].getResponse().getMatches().length, equalTo(numQueries));
}
@Test
public void testWithDocsOnly() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build())
.execute().actionGet();
ensureGreen();
int numQueries = randomIntBetween(50, 100);
logger.info("--> register a queries");
for (int i = 0; i < numQueries; i++) {
client().prepareIndex("test", "_percolator", Integer.toString(i))
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
.execute().actionGet();
}
MultiPercolateRequestBuilder builder = client().prepareMultiPercolate();
int numPercolateRequest = randomIntBetween(50, 100);
for (int i = 0; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field", "a").endObject())));
}
MultiPercolateResponse response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest));
for (MultiPercolateResponse.Item item : response) {
assertThat(item.isFailure(), equalTo(false));
assertNoFailures(item.response());
assertThat(item.getResponse().getCount(), equalTo((long) numQueries));
assertThat(item.getResponse().getMatches().length, equalTo(numQueries));
}
// All illegal json
builder = client().prepareMultiPercolate();
for (int i = 0; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setSource("illegal json"));
}
response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest));
for (MultiPercolateResponse.Item item : response) {
assertThat(item.isFailure(), equalTo(false));
assertThat(item.getResponse().getSuccessfulShards(), equalTo(0));
assertThat(item.getResponse().getShardFailures().length, equalTo(2));
for (ShardOperationFailedException shardFailure : item.getResponse().getShardFailures()) {
assertThat(shardFailure.reason(), containsString("Failed to derive xcontent from"));
assertThat(shardFailure.status().getStatus(), equalTo(500));
}
}
// one valid request
builder = client().prepareMultiPercolate();
for (int i = 0; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setSource("illegal json"));
}
builder.add(
client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field", "a").endObject())));
response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest + 1));
assertThat(response.items()[numPercolateRequest].isFailure(), equalTo(false));
assertNoFailures(response.items()[numPercolateRequest].getResponse());
assertThat(response.items()[numPercolateRequest].getResponse().getCount(), equalTo((long ) numQueries));
assertThat(response.items()[numPercolateRequest].getResponse().getMatches().length, equalTo(numQueries));
}
}

View File

@ -22,9 +22,14 @@ package org.elasticsearch.test.integration.percolator;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
import org.elasticsearch.action.percolate.MultiPercolateResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.node.internal.InternalNode;
@ -32,10 +37,17 @@ import org.elasticsearch.test.integration.AbstractNodesTests;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.integration.percolator.SimplePercolatorTests.convertFromTextArray;
import static org.elasticsearch.test.integration.percolator.TTLPercolatorTests.ensureGreen;
import static org.hamcrest.Matchers.*;
@ -251,4 +263,174 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
assertThat(response.getMatches(), arrayWithSize(1));
assertThat(response.getMatches()[0].id().string(), equalTo("100"));
}
@Test
public void testSinglePercolator_recovery() throws Exception {
multiPercolatorRecovery(false);
}
@Test
public void testMultiPercolator_recovery() throws Exception {
multiPercolatorRecovery(true);
}
// 3 nodes, 2 primary + 2 replicas per primary, so each node should have a copy of the data.
// We only start and stop nodes 2 and 3, so all requests should succeed and never be partial.
private void multiPercolatorRecovery(final boolean multiPercolate) throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "none").build();
logger.info("--> starting 3 nodes");
startNode("node1", settings);
startNode("node2", settings);
startNode("node3", settings);
final Client client = client("node1");
client.admin().indices().prepareDelete().execute().actionGet();
ensureGreen(client);
client.admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 2)
)
.execute().actionGet();
ensureGreen(client);
final int numQueries = randomIntBetween(50, 100);
logger.info("--> register a queries");
for (int i = 0; i < numQueries; i++) {
client().prepareIndex("test", "_percolator", Integer.toString(i))
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
.execute().actionGet();
}
client().prepareIndex("test", "type", "1")
.setSource(jsonBuilder().startObject().field("field", "a"))
.execute().actionGet();
final AtomicBoolean run = new AtomicBoolean(true);
final CountDownLatch done = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Runnable r = new Runnable() {
@Override
public void run() {
try {
XContentBuilder doc = null;
try {
doc = jsonBuilder().startObject().field("field", "a").endObject();
} catch (IOException e) {}
while (run.get()) {
/*NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo()
.execute().actionGet();
String node2Id = null;
String node3Id = null;
for (NodeInfo nodeInfo : nodesInfoResponse) {
if ("node2".equals(nodeInfo.getNode().getName())) {
node2Id = nodeInfo.getNode().id();
} else if ("node3".equals(nodeInfo.getNode().getName())) {
node3Id = nodeInfo.getNode().id();
}
}
String preference = "_prefer_node:" + (randomBoolean() ? node2Id : node3Id);*/
if (multiPercolate) {
MultiPercolateRequestBuilder builder = client()
.prepareMultiPercolate();
int numPercolateRequest = randomIntBetween(50, 100);
for (int i = 0; i < numPercolateRequest / 2; i++) {
builder.add(
client().preparePercolate()
// .setPreference(preference)
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(doc)));
}
for (int i = numPercolateRequest / 2; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
// .setPreference(preference)
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
.setIndices("test").setDocumentType("type")
);
}
MultiPercolateResponse response = builder.execute().actionGet();
assertThat(response.items().length, equalTo(numPercolateRequest));
for (MultiPercolateResponse.Item item : response) {
assertThat(item.isFailure(), equalTo(false));
assertNoFailures(item.getResponse());
assertThat(item.getResponse().getCount(), equalTo((long) numQueries));
assertThat(item.getResponse().getMatches().length, equalTo(numQueries));
}
} else {
PercolateResponse response;
if (randomBoolean()) {
response = client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(doc))
// .setPreference(preference)
.execute().actionGet();
} else {
response = client().preparePercolate()
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
.setIndices("test").setDocumentType("type")
// .setPreference(preference)
.execute().actionGet();
}
assertNoFailures(response);
assertThat(response.getCount(), equalTo((long) numQueries));
assertThat(response.getMatches().length, equalTo(numQueries));
}
}
} catch (Throwable t) {
logger.info("Error in percolate thread...", t);
run.set(false);
error.set(t);
} finally {
done.countDown();
}
}
};
new Thread(r).start();
try {
for (int i = 0; i < 4; i++) {
closeNode("node3");
client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.setWaitForNodes("2")
.execute().actionGet();
assertThat(error.get(), nullValue());
closeNode("node2");
client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.setWaitForNodes("1")
.execute().actionGet();
assertThat(error.get(), nullValue());
startNode("node3");
client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.setWaitForNodes("2")
.execute().actionGet();
assertThat(error.get(), nullValue());
startNode("node2");
client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.setWaitForNodes("3")
.execute().actionGet();
assertThat(error.get(), nullValue());
}
} finally {
run.set(false);
}
done.await();
assertThat(error.get(), nullValue());
}
}