Merge branch 'master' into plugin_java_version

This commit is contained in:
Robert Muir 2015-07-24 11:12:52 -04:00
commit 980e564caa
73 changed files with 993 additions and 774 deletions

View File

@ -374,9 +374,9 @@ public class MapperQueryParser extends QueryParser {
Query rangeQuery;
if (currentFieldType instanceof DateFieldMapper.DateFieldType && settings.timeZone() != null) {
DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) this.currentFieldType;
rangeQuery = dateFieldType.rangeQuery(part1, part2, startInclusive, endInclusive, settings.timeZone(), null, parseContext);
rangeQuery = dateFieldType.rangeQuery(part1, part2, startInclusive, endInclusive, settings.timeZone(), null);
} else {
rangeQuery = currentFieldType.rangeQuery(part1, part2, startInclusive, endInclusive, parseContext);
rangeQuery = currentFieldType.rangeQuery(part1, part2, startInclusive, endInclusive);
}
return rangeQuery;
} catch (RuntimeException e) {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.analyze;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,7 +32,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* A request to analyze a text associated with a specific index. Allow to provide
* the actual analyzer name to perform the analysis with.
*/
public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest> {
public class AnalyzeRequest extends SingleShardRequest<AnalyzeRequest> {
private String[] text;
@ -114,7 +114,7 @@ public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest>
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = null;
if (text == null || text.length == 0) {
validationException = addValidationError("text is missing", validationException);
}

View File

@ -18,13 +18,13 @@
*/
package org.elasticsearch.action.admin.indices.analyze;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequestBuilder;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
*
*/
public class AnalyzeRequestBuilder extends SingleCustomOperationRequestBuilder<AnalyzeRequest, AnalyzeResponse, AnalyzeRequestBuilder> {
public class AnalyzeRequestBuilder extends SingleShardOperationRequestBuilder<AnalyzeRequest, AnalyzeResponse, AnalyzeRequestBuilder> {
public AnalyzeRequestBuilder(ElasticsearchClient client, AnalyzeAction action) {
super(client, action, new AnalyzeRequest());
@ -34,15 +34,6 @@ public class AnalyzeRequestBuilder extends SingleCustomOperationRequestBuilder<A
super(client, action, new AnalyzeRequest(index).text(text));
}
/**
* Sets the index to use to analyzer the text (for example, if it holds specific analyzers
* registered).
*/
public AnalyzeRequestBuilder setIndex(String index) {
request.index(index);
return this;
}
/**
* Sets the analyzer name to use in order to analyze the text.
*

View File

@ -29,7 +29,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -54,7 +54,7 @@ import java.util.List;
/**
* Transport action used to execute analyze requests
*/
public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> {
public class TransportAnalyzeAction extends TransportSingleShardAction<AnalyzeRequest, AnalyzeResponse> {
private final IndicesService indicesService;
private final IndicesAnalysisService indicesAnalysisService;

View File

@ -19,16 +19,17 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetFieldMappingsIndexRequest> {
class GetFieldMappingsIndexRequest extends SingleShardRequest<GetFieldMappingsIndexRequest> {
private boolean probablySingleFieldRequest;
private boolean includeDefaults;
@ -42,7 +43,6 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
GetFieldMappingsIndexRequest(GetFieldMappingsRequest other, String index, boolean probablySingleFieldRequest) {
super(other);
this.preferLocal(other.local);
this.probablySingleFieldRequest = probablySingleFieldRequest;
this.includeDefaults = other.includeDefaults();
this.types = other.types();
@ -52,6 +52,11 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
this.originalIndices = new OriginalIndices(other);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public String[] types() {
return types;
}
@ -88,11 +93,6 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
protected void writeIndex(StreamOutput out) throws IOException {
out.writeString(index());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -103,8 +103,4 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
protected void readIndex(StreamInput in) throws IOException {
index(in.readString());
}
}

View File

@ -26,7 +26,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -58,7 +58,7 @@ import java.util.Iterator;
/**
* Transport action used to retrieve the mappings related to fields that belong to a specific index
*/
public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomOperationAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> {
public class TransportGetFieldMappingsIndexAction extends TransportSingleShardAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> {
private static final String ACTION_NAME = GetFieldMappingsAction.NAME + "[index]";

View File

@ -152,7 +152,7 @@ public class ExplainRequest extends SingleShardRequest<ExplainRequest> {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = super.validateNonNullIndex();
if (type == null) {
validationException = ValidateActions.addValidationError("type is missing", validationException);
}

View File

@ -88,7 +88,7 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(ExplainRequest request) {
return true;
}

View File

@ -118,7 +118,7 @@ public class GetRequest extends SingleShardRequest<GetRequest> {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = super.validateNonNullIndex();
if (type == null) {
validationException = ValidateActions.addValidationError("type is missing", validationException);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.get;
import com.carrotsearch.hppc.IntArrayList;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -54,6 +55,11 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
ignoreErrorsOnGeneratedFields = multiGetRequest.ignoreErrorsOnGeneratedFields;
}
@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}
public int shardId() {
return this.shardId;
}

View File

@ -58,7 +58,7 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(GetRequest request) {
return true;
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.get;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@ -68,7 +67,7 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(MultiGetShardRequest request) {
return true;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.percolate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
@ -76,7 +77,7 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(Request request) {
return false;
}
@ -127,6 +128,11 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
this.items = new ArrayList<>();
}
@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}
@Override
public String[] indices() {
List<String> indices = new ArrayList<>();

View File

@ -1,142 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
*
*/
public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> implements IndicesRequest {
ShardId internalShardId;
private boolean threadedOperation = true;
private boolean preferLocal = true;
private String index;
protected SingleCustomOperationRequest() {
}
protected SingleCustomOperationRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
public boolean operationThreaded() {
return threadedOperation;
}
/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
@SuppressWarnings("unchecked")
public final T operationThreaded(boolean threadedOperation) {
this.threadedOperation = threadedOperation;
return (T) this;
}
/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
*/
@SuppressWarnings("unchecked")
public final T preferLocal(boolean preferLocal) {
this.preferLocal = preferLocal;
return (T) this;
}
@SuppressWarnings("unchecked")
public T index(String index) {
this.index = index;
return (T)this;
}
public String index() {
return index;
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
@Override
public String[] indices() {
if (index == null) {
return Strings.EMPTY_ARRAY;
}
return new String[]{index};
}
/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
*/
public boolean preferLocalShard() {
return this.preferLocal;
}
public void beforeLocalFork() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
preferLocal = in.readBoolean();
readIndex(in);
}
protected void readIndex(StreamInput in) throws IOException {
index = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeBoolean(preferLocal);
writeIndex(out);
}
protected void writeIndex(StreamOutput out) throws IOException {
out.writeOptionalString(index);
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.IndicesAdminClient;
/**
*/
public abstract class SingleCustomOperationRequestBuilder<Request extends SingleCustomOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends SingleCustomOperationRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder> {
protected SingleCustomOperationRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
super(client, action, request);
}
/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setOperationThreaded(boolean threadedOperation) {
request.operationThreaded(threadedOperation);
return (RequestBuilder) this;
}
/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setPreferLocal(boolean preferLocal) {
request.preferLocal(preferLocal);
return (RequestBuilder) this;
}
}

View File

@ -1,314 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
/**
* Transport action used to send a read request to one of the shards that belong to an index.
* Supports retrying another shard in case of failure.
*/
public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
final String transportShardAction;
final String executor;
protected TransportSingleCustomOperationAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Class<Request> request, String executor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportShardAction = actionName + "[s]";
this.executor = executor;
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
/**
* Can return null to execute on this local node.
*/
protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);
/**
* Operation to be executed at the shard level. Can be called with shardId set to null, meaning that there is no
* shard involved and the operation just needs to be executed on the local node.
*/
protected abstract Response shardOperation(Request request, ShardId shardId);
protected abstract Response newResponse();
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.concreteIndex());
}
protected abstract boolean resolveIndex(Request request);
private class AsyncSingleAction {
private final ActionListener<Response> listener;
private final ShardsIterator shardsIt;
private final InternalRequest internalRequest;
private final DiscoveryNodes nodes;
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;
ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
String concreteSingleIndex;
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request);
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
this.shardsIt = shards(clusterState, internalRequest);
}
public void start() {
performFirst();
}
private void onFailure(ShardRouting shardRouting, Throwable e) {
if (logger.isTraceEnabled() && e != null) {
logger.trace(shardRouting.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e);
}
perform(e);
}
/**
* First get should try and use a shard that exists on a local node for better performance
*/
private void performFirst() {
if (shardsIt == null) {
// just execute it on the local node
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
Response response = shardOperation(internalRequest.request(), null);
listener.onResponse(response);
} catch (Throwable e) {
onFailure(null, e);
}
}
});
return;
} else {
try {
final Response response = shardOperation(internalRequest.request(), null);
listener.onResponse(response);
return;
} catch (Throwable e) {
onFailure(null, e);
}
}
return;
}
if (internalRequest.request().preferLocalShard()) {
boolean foundLocal = false;
ShardRouting shardX;
while ((shardX = shardsIt.nextOrNull()) != null) {
final ShardRouting shard = shardX;
if (shard.currentNodeId().equals(nodes.localNodeId())) {
foundLocal = true;
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
} catch (Throwable e) {
shardsIt.reset();
onFailure(shard, e);
}
}
});
return;
} else {
try {
final Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
return;
} catch (Throwable e) {
shardsIt.reset();
onFailure(shard, e);
}
}
}
}
if (!foundLocal) {
// no local node get, go remote
shardsIt.reset();
perform(null);
}
} else {
perform(null);
}
}
private void perform(final Throwable lastException) {
final ShardRouting shard = shardsIt == null ? null : shardsIt.nextOrNull();
if (shard == null) {
Throwable failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(null, "No shard available for [" + internalRequest.request() + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug("failed to execute [" + internalRequest.request() + "]", failure);
}
}
listener.onFailure(failure);
} else {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
// we don't prefer local shard, so try and do it here
if (!internalRequest.request().preferLocalShard()) {
try {
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
} catch (Throwable e) {
onFailure(shard, e);
}
}
});
} else {
final Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
}
} catch (Throwable e) {
onFailure(shard, e);
}
} else {
perform(lastException);
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
internalRequest.request().internalShardId = shard.shardId();
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(shard, exp);
}
});
}
}
}
}
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/
protected class InternalRequest {
final Request request;
final String concreteIndex;
InternalRequest(Request request, String concreteIndex) {
this.request = request;
this.concreteIndex = concreteIndex;
}
public Request request() {
return request;
}
public String concreteIndex() {
return concreteIndex;
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
@ -35,10 +36,17 @@ import java.io.IOException;
*/
public abstract class SingleShardRequest<T extends SingleShardRequest> extends ActionRequest<T> implements IndicesRequest {
ShardId internalShardId;
protected String index;
public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
/**
* The concrete index name
*
* Whether index property is optional depends on the concrete implementation. If index property is required the
* concrete implementation should use {@link #validateNonNullIndex()} to check if the index property has been set
*/
@Nullable
protected String index;
ShardId internalShardId;
private boolean threadedOperation = true;
protected SingleShardRequest() {
@ -57,8 +65,10 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
this.index = index;
}
@Override
public ActionRequestValidationException validate() {
/**
* @return a validation exception if the index property hasn't been set
*/
protected ActionRequestValidationException validateNonNullIndex() {
ActionRequestValidationException validationException = null;
if (index == null) {
validationException = ValidateActions.addValidationError("index is missing", validationException);
@ -66,6 +76,13 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
return validationException;
}
/**
* @return The concrete index this request is targeted for or <code>null</code> if index is optional.
* Whether index property is optional depends on the concrete implementation. If index property
* is required the concrete implementation should use {@link #validateNonNullIndex()} to check
* if the index property has been set
*/
@Nullable
public String index() {
return index;
}
@ -111,7 +128,7 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
index = in.readString();
index = in.readOptionalString();
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@ -119,7 +136,7 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeString(index);
out.writeOptionalString(index);
}
}

View File

@ -32,9 +32,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
@ -43,7 +44,9 @@ import org.elasticsearch.transport.*;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/**
* A base class for single shard read operations.
* A base class for operations that need to perform a read operation on a single shard copy. If the operation fails,
* the read operation can be performed on other shard copies. Concrete implementations can provide their own list
* of candidate shards to try the read operation on.
*/
public abstract class TransportSingleShardAction<Request extends SingleShardRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@ -88,7 +91,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected abstract Response newResponse();
protected abstract boolean resolveIndex();
protected abstract boolean resolveIndex(Request request);
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
@ -102,12 +105,17 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
protected abstract ShardIterator shards(ClusterState state, InternalRequest request);
/**
* Returns the candidate shards to execute the operation on or <code>null</code> the execute
* the operation locally (the node that received the request)
*/
@Nullable
protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);
class AsyncSingleAction {
private final ActionListener<Response> listener;
private final ShardIterator shardIt;
private final ShardsIterator shardIt;
private final InternalRequest internalRequest;
private final DiscoveryNodes nodes;
private volatile Throwable lastFailure;
@ -126,7 +134,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
String concreteSingleIndex;
if (resolveIndex()) {
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request);
} else {
concreteSingleIndex = request.index();
@ -143,7 +151,32 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
public void start() {
perform(null);
if (shardIt == null) {
// just execute it on the local node
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
perform(exp);
}
});
} else {
perform(null);
}
}
private void onFailure(ShardRouting shardRouting, Throwable e) {
@ -163,10 +196,10 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
if (shardRouting == null) {
Throwable failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(shardIt.shardId(), null, failure);
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: failed to execute [{}]", failure, shardIt.shardId(), internalRequest.request());
logger.debug("{}: failed to execute [{}]", failure, null, internalRequest.request());
}
}
listener.onFailure(failure);
@ -174,7 +207,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardIt.shardId()));
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
internalRequest.request().internalShardId = shardRouting.shardId();
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.termvectors;
import com.carrotsearch.hppc.IntArrayList;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -47,6 +48,11 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
requests = new ArrayList<>();
}
@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}
public int shardId() {
return this.shardId;
}

View File

@ -167,6 +167,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
this.version = other.version();
this.versionType = VersionType.fromValue(other.versionType().getValue());
this.startTime = other.startTime();
this.filterSettings = other.filterSettings();
}
public TermVectorsRequest(MultiGetRequest.Item item) {
@ -472,7 +473,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = super.validateNonNullIndex();
if (type == null) {
validationException = ValidateActions.addValidationError("type is missing", validationException);
}

View File

@ -63,7 +63,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
return false;
}

View File

@ -65,7 +65,7 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(TermVectorsRequest request) {
return true;
}

View File

@ -420,7 +420,10 @@ public class MetaDataCreateIndexService extends AbstractComponent {
.put(indexMetaData, false)
.build();
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}", request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());
String maybeShadowIndicator = IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) ? "s" : "";
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}{}], mappings {}",
request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(),
indexMetaData.numberOfReplicas(), maybeShadowIndicator, mappings.keySet());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks().isEmpty()) {

View File

@ -83,10 +83,10 @@ public class AllocationId implements ToXContent {
/**
* Creates a new allocation id representing a cancelled relocation.
*
* <p/>
* Note that this is expected to be called on the allocation id
* of the *source* shard
* */
*/
public static AllocationId cancelRelocation(AllocationId allocationId) {
assert allocationId.getRelocationId() != null;
return new AllocationId(allocationId.getId(), null);
@ -94,7 +94,7 @@ public class AllocationId implements ToXContent {
/**
* Creates a new allocation id finalizing a relocation.
*
* <p/>
* Note that this is expected to be called on the allocation id
* of the *target* shard and thus it only needs to clear the relocating id.
*/
@ -120,9 +120,16 @@ public class AllocationId implements ToXContent {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (this == o) {
return true;
}
if (o == null) {
return false;
}
AllocationId that = (AllocationId) o;
if (!id.equals(that.id)) return false;
if (!id.equals(that.id)) {
return false;
}
return !(relocationId != null ? !relocationId.equals(that.relocationId) : that.relocationId != null);
}

View File

@ -88,7 +88,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
void add(ShardRouting shard) {
// TODO use Set with ShardIds for faster lookup.
for (ShardRouting shardRouting : shards) {
if (shardRouting.shardId().equals(shard.shardId())) {
if (shardRouting.isSameShard(shard)) {
throw new IllegalStateException("Trying to add a shard [" + shard.shardId().index().name() + "][" + shard.shardId().id() + "] to a node [" + nodeId + "] where it already exists");
}
}

View File

@ -420,7 +420,7 @@ public final class ShardRouting implements Streamable, ToXContent {
void relocate(String relocatingNodeId) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.STARTED : this;
assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
this.allocationId = AllocationId.newRelocation(allocationId);
@ -467,7 +467,7 @@ public final class ShardRouting implements Streamable, ToXContent {
restoreSource = null;
unassignedInfo = null; // we keep the unassigned data until the shard is started
if (allocationId.getRelocationId() != null) {
// target relocation
// relocation target
allocationId = AllocationId.finishRelocation(allocationId);
}
state = ShardRoutingState.STARTED;
@ -498,6 +498,106 @@ public final class ShardRouting implements Streamable, ToXContent {
primary = false;
}
/** returns true if this routing has the same shardId as another */
public boolean isSameShard(ShardRouting other) {
return index.equals(other.index) && shardId == other.shardId;
}
/**
* returns true if this routing has the same allocation ID as another.
* <p/>
* Note: if both shard routing has a null as their {@link #allocationId()}, this method returns false as the routing describe
* no allocation at all..
**/
public boolean isSameAllocation(ShardRouting other) {
boolean b = this.allocationId != null && other.allocationId != null && this.allocationId.getId().equals(other.allocationId.getId());
assert b == false || this.currentNodeId.equals(other.currentNodeId) : "ShardRoutings have the same allocation id but not the same node. This [" + this + "], other [" + other + "]";
return b;
}
/** returns true if the routing is the relocation target of the given routing */
public boolean isRelocationTargetOf(ShardRouting other) {
boolean b = this.allocationId != null && other.allocationId != null && this.state == ShardRoutingState.INITIALIZING &&
this.allocationId.getId().equals(other.allocationId.getRelocationId());
assert b == false || other.state == ShardRoutingState.RELOCATING :
"ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]";
assert b == false || other.allocationId.getId().equals(this.allocationId.getRelocationId()) :
"ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId. This [" + this + "], other [" + other + "]";
assert b == false || other.currentNodeId().equals(this.relocatingNodeId) :
"ShardRouting is a relocation target but source current node id isn't equal to target relocating node. This [" + this + "], other [" + other + "]";
assert b == false || this.currentNodeId().equals(other.relocatingNodeId) :
"ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]";
assert b == false || isSameShard(other) :
"ShardRouting is a relocation target but both routings are not of the same shard. This [" + this + "], other [" + other + "]";
assert b == false || this.primary == other.primary :
"ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]";
return b;
}
/** returns true if the routing is the relocation source for the given routing */
public boolean isRelocationSourceOf(ShardRouting other) {
boolean b = this.allocationId != null && other.allocationId != null && other.state == ShardRoutingState.INITIALIZING &&
other.allocationId.getId().equals(this.allocationId.getRelocationId());
assert b == false || this.state == ShardRoutingState.RELOCATING :
"ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]";
assert b == false || this.allocationId.getId().equals(other.allocationId.getRelocationId()) :
"ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId. This [" + this + "], other [" + other + "]";
assert b == false || this.currentNodeId().equals(other.relocatingNodeId) :
"ShardRouting is a relocation source but current node isn't equal to other's relocating node. This [" + this + "], other [" + other + "]";
assert b == false || other.currentNodeId().equals(this.relocatingNodeId) :
"ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]";
assert b == false || isSameShard(other) :
"ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]";
assert b == false || this.primary == other.primary :
"ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]";
return b;
}
/** returns true if the current routing is identical to the other routing in all but meta fields, i.e., version and unassigned info */
public boolean equalsIgnoringMetaData(ShardRouting other) {
if (primary != other.primary) {
return false;
}
if (shardId != other.shardId) {
return false;
}
if (currentNodeId != null ? !currentNodeId.equals(other.currentNodeId) : other.currentNodeId != null) {
return false;
}
if (index != null ? !index.equals(other.index) : other.index != null) {
return false;
}
if (relocatingNodeId != null ? !relocatingNodeId.equals(other.relocatingNodeId) : other.relocatingNodeId != null) {
return false;
}
if (allocationId != null ? !allocationId.equals(other.allocationId) : other.allocationId != null) {
return false;
}
if (state != other.state) {
return false;
}
if (restoreSource != null ? !restoreSource.equals(other.restoreSource) : other.restoreSource != null) {
return false;
}
return true;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -508,32 +608,8 @@ public final class ShardRouting implements Streamable, ToXContent {
return false;
}
ShardRouting that = (ShardRouting) o;
if (primary != that.primary) {
return false;
}
if (shardId != that.shardId) {
return false;
}
if (currentNodeId != null ? !currentNodeId.equals(that.currentNodeId) : that.currentNodeId != null) {
return false;
}
if (index != null ? !index.equals(that.index) : that.index != null) {
return false;
}
if (relocatingNodeId != null ? !relocatingNodeId.equals(that.relocatingNodeId) : that.relocatingNodeId != null) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
if (state != that.state) {
return false;
}
if (restoreSource != null ? !restoreSource.equals(that.restoreSource) : that.restoreSource != null) {
return false;
}
return true;
// TODO: add version + unassigned info check. see #12387
return equalsIgnoringMetaData(that);
}
private long hashVersion = version - 1;

View File

@ -95,7 +95,11 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
/**
* When a shard moves from started back to initializing, for example, during shadow replica
*/
REINITIALIZED;
REINITIALIZED,
/**
* A better replica location is identified and causes the existing replica allocation to be cancelled.
*/
REALLOCATED_REPLICA;
}
private final Reason reason;

View File

@ -338,7 +338,7 @@ public class AllocationService extends AbstractComponent {
}
for (ShardRouting shard : currentRoutingNode) {
if (shard.allocationId().getId().equals(startedShard.allocationId().getId())) {
if (shard.isSameAllocation(startedShard)) {
if (shard.active()) {
logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard);
} else {
@ -363,8 +363,7 @@ public class AllocationService extends AbstractComponent {
if (sourceRoutingNode != null) {
while (sourceRoutingNode.hasNext()) {
ShardRouting shard = sourceRoutingNode.next();
if (shard.allocationId().getId().equals(startedShard.allocationId().getRelocationId())) {
assert shard.relocating() : "source shard for relocation is not marked as relocating. source " + shard + ", started target " + startedShard;
if (shard.isRelocationSourceOf(startedShard)) {
dirty = true;
sourceRoutingNode.remove();
break;
@ -397,7 +396,7 @@ public class AllocationService extends AbstractComponent {
boolean matchedShard = false;
while (matchedNode.hasNext()) {
ShardRouting routing = matchedNode.next();
if (routing.allocationId().getId().equals(failedShard.allocationId().getId())) {
if (routing.isSameAllocation(failedShard)) {
matchedShard = true;
logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
break;
@ -428,7 +427,7 @@ public class AllocationService extends AbstractComponent {
RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId());
if (relocatingFromNode != null) {
for (ShardRouting shardRouting : relocatingFromNode) {
if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) {
if (shardRouting.isRelocationSourceOf(failedShard)) {
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), shardRouting, unassignedInfo.shortSummary());
routingNodes.cancelRelocation(shardRouting);
break;
@ -441,7 +440,7 @@ public class AllocationService extends AbstractComponent {
// and the shard copy needs to be marked as unassigned
if (failedShard.relocatingNodeId() != null) {
// handle relocation source shards. we need to find the target initializing shard that is recovering from, and remove it...
// handle relocation source shards. we need to find the target initializing shard that is recovering, and remove it...
assert failedShard.initializing() == false; // should have been dealt with and returned
assert failedShard.relocating();
@ -449,10 +448,7 @@ public class AllocationService extends AbstractComponent {
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting shardRouting = initializingNode.next();
if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) {
assert shardRouting.initializing() : shardRouting;
assert failedShard.allocationId().getId().equals(shardRouting.allocationId().getRelocationId())
: "found target shard's allocation relocation id is different than source";
if (shardRouting.isRelocationTargetOf(failedShard)) {
logger.trace("{} is removed due to the failure of the source shard", shardRouting);
initializingNode.remove();
}

View File

@ -177,7 +177,7 @@ public class CancelAllocationCommand implements AllocationCommand {
RoutingNode relocatingFromNode = allocation.routingNodes().node(shardRouting.relocatingNodeId());
if (relocatingFromNode != null) {
for (ShardRouting fromShardRouting : relocatingFromNode) {
if (fromShardRouting.shardId().equals(shardRouting.shardId()) && fromShardRouting.state() == RELOCATING) {
if (fromShardRouting.isSameShard(shardRouting) && fromShardRouting.state() == RELOCATING) {
allocation.routingNodes().cancelRelocation(fromShardRouting);
break;
}
@ -201,7 +201,7 @@ public class CancelAllocationCommand implements AllocationCommand {
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting initializingShardRouting = initializingNode.next();
if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.initializing()) {
if (initializingShardRouting.isRelocationTargetOf(shardRouting)) {
initializingNode.remove();
}
}

View File

@ -38,7 +38,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
* node. The default is <tt>4</tt></li>
* <p/>
* <li><tt>cluster.routing.allocation.node_concurrent_recoveries</tt> -
* restricts the number of concurrent recovery operations on a single node. The
* restricts the number of total concurrent shards initializing on a single node. The
* default is <tt>2</tt></li>
* </ul>
* <p/>
@ -106,7 +106,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
int currentRecoveries = 0;
for (ShardRouting shard : node) {
if (shard.initializing() || shard.relocating()) {
if (shard.initializing()) {
currentRecoveries++;
}
}

View File

@ -126,6 +126,7 @@ public class GatewayAllocator extends AbstractComponent {
}); // sort for priority ordering
changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation);
return changed;
}

View File

@ -25,11 +25,11 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
@ -40,7 +40,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import java.util.Iterator;
import java.util.Map;
/**
@ -51,6 +50,62 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
super(settings);
}
/**
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that has full sync id match compared to not having one in
* the previous recovery.
*/
public boolean processExistingRecoveries(RoutingAllocation allocation) {
boolean changed = false;
for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) {
nodes.next();
for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) {
ShardRouting shard = it.next();
if (shard.primary() == true) {
continue;
}
if (shard.initializing() == false) {
continue;
}
if (shard.relocatingNodeId() != null) {
continue;
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
if (shardStores.hasData() == false) {
logger.trace("{}: fetching new stores for initializing shard", shard);
continue; // still fetching
}
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard);
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
if (primaryStore == null || primaryStore.allocated() == false) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
continue;
}
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
if (currentNode.equals(nodeWithHighestMatch) == false
&& matchingNodes.isNodeMatchBySyncID(currentNode) == false
&& matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch) == true) {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]"));
changed = true;
}
}
}
}
return changed;
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
@ -236,7 +291,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
highestMatchNode = cursor.key;
}
}
nodeWithHighestMatch = highestMatchNode;
this.nodeWithHighestMatch = highestMatchNode;
}
/**
@ -248,6 +303,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
return this.nodeWithHighestMatch;
}
public boolean isNodeMatchBySyncID(DiscoveryNode node) {
return nodesToSize.get(node) == Long.MAX_VALUE;
}
/**
* Did we manage to find any data, regardless how well they matched or not.
*/

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.engine;
import com.google.common.base.Preconditions;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -45,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
@ -57,11 +55,7 @@ import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
@ -288,7 +282,7 @@ public abstract class Engine implements Closeable {
try {
final Searcher retVal = newSearcher(source, searcher, manager);
success = true;
return retVal;
return config().getWrappingService().wrap(engineConfig, retVal);
} finally {
if (!success) {
manager.release(searcher);

View File

@ -22,7 +22,6 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.similarities.Similarity;
@ -35,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
@ -77,6 +77,7 @@ public final class EngineConfig {
private final boolean forceNewTranslog;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
private final IndexSearcherWrappingService wrappingService;
/**
* Index setting for index concurrency / number of threadstates in the indexwriter.
@ -143,7 +144,7 @@ public final class EngineConfig {
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, IndexSearcherWrappingService wrappingService, TranslogConfig translogConfig) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
@ -157,6 +158,7 @@ public final class EngineConfig {
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
this.wrappingService = wrappingService;
this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false);
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
@ -421,6 +423,10 @@ public final class EngineConfig {
return queryCachingPolicy;
}
public IndexSearcherWrappingService getWrappingService() {
return wrappingService;
}
/**
* Returns the translog config for this engine
*/

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;
/**
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
* and {@link IndexSearcher} managed by the {@link Engine}.
*/
public interface IndexSearcherWrapper {
/**
* @param reader The provided directory reader to be wrapped to add custom functionality
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
* the provided directory reader
*/
DirectoryReader wrap(DirectoryReader reader);
/**
* @param searcher The provided index searcher to be wrapped to add custom functionality
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
* the provided index searcher
*/
IndexSearcher wrap(IndexSearcher searcher) throws EngineException;
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.engine.Engine.Searcher;
import java.util.Set;
/**
* Service responsible for wrapping the {@link DirectoryReader} and {@link IndexSearcher} of a {@link Searcher} via the
* configured {@link IndexSearcherWrapper} instance. This allows custom functionally to be added the {@link Searcher}
* before being used to do an operation (search, get, field stats etc.)
*/
// TODO: This needs extension point is a bit hacky now, because the IndexSearch from the engine can only be wrapped once,
// if we allowed the IndexSearcher to be wrapped multiple times then a custom IndexSearcherWrapper needs have good
// control over its location in the wrapping chain
public final class IndexSearcherWrappingService {
private final IndexSearcherWrapper wrapper;
// for unit tests:
IndexSearcherWrappingService() {
this.wrapper = null;
}
@Inject
// Use a Set parameter here, because constructor parameter can't be optional
// and I prefer to keep the `wrapper` field final.
public IndexSearcherWrappingService(Set<IndexSearcherWrapper> wrappers) {
if (wrappers.size() > 1) {
throw new IllegalStateException("wrapping of the index searcher by more than one wrappers is forbidden, found the following wrappers [" + wrappers + "]");
}
if (wrappers.isEmpty()) {
this.wrapper = null;
} else {
this.wrapper = wrappers.iterator().next();
}
}
/**
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
* gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned.
*
* This is invoked each time a {@link Searcher} is requested to do an operation. (for example search)
*/
public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException {
if (wrapper == null) {
return engineSearcher;
}
DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader());
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
IndexSearcher indexSearcher = wrapper.wrap(innerIndexSearcher);
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
engineSearcher.close();
}
};
}
}
}

View File

@ -30,7 +30,6 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.FieldDataType;
@ -186,6 +185,7 @@ public abstract class MappedFieldType extends FieldType {
fieldDataType = new FieldDataType(typeName());
}
@Override
public abstract MappedFieldType clone();
@Override
@ -449,7 +449,7 @@ public abstract class MappedFieldType extends FieldType {
return new TermsQuery(names.indexName(), bytesRefs);
}
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return new TermRangeQuery(names().indexName(),
lowerTerm == null ? null : indexedValueForSearch(lowerTerm),
upperTerm == null ? null : indexedValueForSearch(upperTerm),

View File

@ -30,7 +30,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Fuzziness;
@ -42,8 +41,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@ -163,7 +160,7 @@ public class ByteFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newIntRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : (int)parseValue(lowerTerm),
upperTerm == null ? null : (int)parseValue(upperTerm),

View File

@ -51,12 +51,15 @@ import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.core.LongFieldMapper.CustomLongNumericField;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.internal.SearchContext;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -124,6 +127,7 @@ public class DateFieldMapper extends NumberFieldMapper {
return fieldMapper;
}
@Override
protected void setupFieldType(BuilderContext context) {
if (Version.indexCreated(context.indexSettings()).before(Version.V_2_0_0_beta1) &&
!fieldType().dateTimeFormatter().format().contains("epoch_")) {
@ -277,6 +281,7 @@ public class DateFieldMapper extends NumberFieldMapper {
this.dateMathParser = ref.dateMathParser;
}
@Override
public DateFieldType clone() {
return new DateFieldType(this);
}
@ -390,8 +395,8 @@ public class DateFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return rangeQuery(lowerTerm, upperTerm, includeLower, includeUpper, null, null, context);
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return rangeQuery(lowerTerm, upperTerm, includeLower, includeUpper, null, null);
}
@Override
@ -419,7 +424,7 @@ public class DateFieldMapper extends NumberFieldMapper {
);
}
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable DateTimeZone timeZone, @Nullable DateMathParser forcedDateParser, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable DateTimeZone timeZone, @Nullable DateMathParser forcedDateParser) {
return new LateParsingQuery(lowerTerm, upperTerm, includeLower, includeUpper, timeZone, forcedDateParser);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.mapper.core;
import com.carrotsearch.hppc.DoubleArrayList;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Field;
@ -32,7 +33,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Fuzziness;
@ -46,7 +46,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
@ -169,7 +168,7 @@ public class DoubleFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newDoubleRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : parseDoubleValue(lowerTerm),
upperTerm == null ? null : parseDoubleValue(upperTerm),

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.mapper.core;
import com.carrotsearch.hppc.FloatArrayList;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Field;
@ -32,7 +33,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -47,7 +47,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
@ -170,7 +169,7 @@ public class FloatFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newFloatRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : parseValue(lowerTerm),
upperTerm == null ? null : parseValue(upperTerm),

View File

@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -44,8 +43,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@ -172,7 +169,7 @@ public class IntegerFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newIntRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : parseValue(lowerTerm),
upperTerm == null ? null : parseValue(upperTerm),

View File

@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -44,8 +43,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@ -170,7 +167,7 @@ public class LongFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newLongRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : parseLongValue(lowerTerm),
upperTerm == null ? null : parseLongValue(upperTerm),

View File

@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -44,8 +43,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@ -168,7 +165,7 @@ public class ShortFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newIntRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : (int)parseValue(lowerTerm),
upperTerm == null ? null : (int)parseValue(upperTerm),

View File

@ -29,7 +29,6 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -47,8 +46,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.index.mapper.core.LongFieldMapper.CustomLongNumericField;
import org.elasticsearch.index.mapper.core.NumberFieldMapper;
import org.elasticsearch.index.query.QueryParseContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@ -209,7 +206,7 @@ public class IpFieldMapper extends NumberFieldMapper {
}
@Override
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) {
return NumericRangeQuery.newLongRange(names().indexName(), numericPrecisionStep(),
lowerTerm == null ? null : parseValue(lowerTerm),
upperTerm == null ? null : parseValue(upperTerm),

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.FieldNamesFieldMapper;
import org.elasticsearch.index.mapper.object.ObjectMapper;
@ -111,7 +110,7 @@ public class ExistsQueryParser implements QueryParser {
}
// if _field_names are not indexed, we need to go the slow way
if (filter == null && fieldType != null) {
filter = fieldType.rangeQuery(null, null, true, true, parseContext);
filter = fieldType.rangeQuery(null, null, true, true);
}
if (filter == null) {
filter = new TermRangeQuery(field, null, null, true, true);

View File

@ -133,7 +133,7 @@ public class MissingQueryParser implements QueryParser {
}
// if _field_names are not indexed, we need to go the slow way
if (filter == null && fieldType != null) {
filter = fieldType.rangeQuery(null, null, true, true, parseContext);
filter = fieldType.rangeQuery(null, null, true, true);
}
if (filter == null) {
filter = new TermRangeQuery(field, null, null, true, true);

View File

@ -123,14 +123,14 @@ public class RangeQueryParser implements QueryParser {
MappedFieldType mapper = parseContext.fieldMapper(fieldName);
if (mapper != null) {
if (mapper instanceof DateFieldMapper.DateFieldType) {
query = ((DateFieldMapper.DateFieldType) mapper).rangeQuery(from, to, includeLower, includeUpper, timeZone, forcedDateParser, parseContext);
query = ((DateFieldMapper.DateFieldType) mapper).rangeQuery(from, to, includeLower, includeUpper, timeZone, forcedDateParser);
} else {
if (timeZone != null) {
throw new QueryParsingException(parseContext, "[range] time_zone can not be applied to non date field ["
+ fieldName + "]");
}
//LUCENE 4 UPGRADE Mapper#rangeQuery should use bytesref as well?
query = mapper.rangeQuery(from, to, includeLower, includeUpper, parseContext);
query = mapper.rangeQuery(from, to, includeLower, includeUpper);
}
}
if (query == null) {

View File

@ -45,16 +45,16 @@ public class IndexedGeoBoundingBoxQuery {
private static Query westGeoBoundingBoxFilter(GeoPoint topLeft, GeoPoint bottomRight, GeoPointFieldMapper.GeoPointFieldType fieldType) {
BooleanQuery filter = new BooleanQuery();
filter.setMinimumNumberShouldMatch(1);
filter.add(fieldType.lonFieldType().rangeQuery(null, bottomRight.lon(), true, true, null), Occur.SHOULD);
filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), null, true, true, null), Occur.SHOULD);
filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true, null), Occur.MUST);
filter.add(fieldType.lonFieldType().rangeQuery(null, bottomRight.lon(), true, true), Occur.SHOULD);
filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), null, true, true), Occur.SHOULD);
filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true), Occur.MUST);
return new ConstantScoreQuery(filter);
}
private static Query eastGeoBoundingBoxFilter(GeoPoint topLeft, GeoPoint bottomRight, GeoPointFieldMapper.GeoPointFieldType fieldType) {
BooleanQuery filter = new BooleanQuery();
filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), bottomRight.lon(), true, true, null), Occur.MUST);
filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true, null), Occur.MUST);
filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), bottomRight.lon(), true, true), Occur.MUST);
filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true), Occur.MUST);
return new ConstantScoreQuery(filter);
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.QueryCachingPolicy;
@ -169,6 +168,7 @@ public class IndexShard extends AbstractIndexShardComponent {
protected volatile IndexShardState state;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
private final IndexSearcherWrappingService wrappingService;
@Nullable
private RecoveryState recoveryState;
@ -198,12 +198,13 @@ public class IndexShard extends AbstractIndexShardComponent {
IndicesQueryCache indicesQueryCache, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory,
ClusterService clusterService, ShardPath path, BigArrays bigArrays) {
ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
super(shardId, indexSettingsService.getSettings());
this.codecService = codecService;
this.warmer = warmer;
this.deletionPolicy = deletionPolicy;
this.similarityService = similarityService;
this.wrappingService = wrappingService;
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
this.engineFactory = factory;
@ -337,14 +338,16 @@ public class IndexShard extends AbstractIndexShardComponent {
if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
}
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
}
try {
if (currentRouting != null) {
assert newRouting.version() > currentRouting.version() : "expected: " + newRouting.version() + " > " + currentRouting.version();
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(newRouting)) {
// if its the same routing except for some metadata info, return
if (currentRouting.equalsIgnoringMetaData(newRouting)) {
this.shardRouting = newRouting; // might have a new version
return;
}
@ -723,12 +726,12 @@ public class IndexShard extends AbstractIndexShardComponent {
public org.apache.lucene.util.Version minimumCompatibleVersion() {
org.apache.lucene.util.Version luceneVersion = null;
for(Segment segment : engine().segments(false)) {
for (Segment segment : engine().segments(false)) {
if (luceneVersion == null || luceneVersion.onOrAfter(segment.getVersion())) {
luceneVersion = segment.getVersion();
}
}
return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion;
return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion;
}
public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
@ -1113,7 +1116,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
@ -1360,7 +1363,7 @@ public class IndexShard extends AbstractIndexShardComponent {
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.IndexSearcherWrapper;
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
@ -73,6 +76,10 @@ public class IndexShardModule extends AbstractModule {
bind(StoreRecoveryService.class).asEagerSingleton();
bind(ShardPercolateService.class).asEagerSingleton();
bind(ShardTermVectorsService.class).asEagerSingleton();
bind(IndexSearcherWrappingService.class).asEagerSingleton();
// this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService
Multibinder<IndexSearcherWrapper> multibinder
= Multibinder.newSetBinder(binder(), IndexSearcherWrapper.class);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
@ -66,13 +67,13 @@ public final class ShadowIndexShard extends IndexShard {
IndexService indexService, @Nullable IndicesWarmer warmer,
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
EngineFactory factory, ClusterService clusterService,
ShardPath path, BigArrays bigArrays) throws IOException {
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, shardPercolateService, codecService,
termVectorsService, indexFieldDataService, indexService,
warmer, deletionPolicy, similarityService,
factory, clusterService, path, bigArrays);
factory, clusterService, path, bigArrays, wrappingService);
}
/**

View File

@ -63,7 +63,11 @@ public final class ShardUtils {
if (reader instanceof ElasticsearchLeafReader) {
return (ElasticsearchLeafReader) reader;
} else {
return getElasticsearchLeafReader(FilterLeafReader.unwrap(reader));
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
}
}
return null;

View File

@ -507,7 +507,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// for example: a shard that recovers from one node and now needs to recover to another node,
// or a replica allocated and then allocating a primary because the primary failed on another node
boolean shardHasBeenRemoved = false;
if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
if (currentRoutingEntry.isSameAllocation(shardRouting) == false) {
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
@ -526,22 +526,19 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
shardHasBeenRemoved = true;
}
}
if (shardHasBeenRemoved == false && (shardRouting.equals(indexShard.routingEntry()) == false || shardRouting.version() > indexShard.routingEntry().version())) {
if (shardRouting.primary() && indexShard.routingEntry().primary() == false && shardRouting.initializing() && indexShard.allowsPrimaryPromotion() == false) {
logger.debug("{} reinitialize shard on primary promotion", indexShard.shardId());
indexService.removeShard(shardId, "promoted to primary");
} else {
// if we happen to remove the shardRouting by id above we don't need to jump in here!
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
}
if (shardHasBeenRemoved == false) {
// shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there.
assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() :
"shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
}
}
if (shardRouting.initializing()) {
applyInitializingShard(event.state(),indexMetaData, shardRouting);
applyInitializingShard(event.state(), indexMetaData, shardRouting);
}
}
}

View File

@ -196,7 +196,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
private void purgeShards(List<IndexShard> shardsToPurge) {
for (IndexShard shardToPurge : shardsToPurge) {
Query query = shardToPurge.indexService().mapperService().smartNameFieldType(TTLFieldMapper.NAME).rangeQuery(null, System.currentTimeMillis(), false, true, null);
Query query = shardToPurge.indexService().mapperService().smartNameFieldType(TTLFieldMapper.NAME).rangeQuery(null, System.currentTimeMillis(), false, true);
Engine.Searcher searcher = shardToPurge.acquireSearcher("indices_ttl");
try {
logger.debug("[{}][{}] purging shard", shardToPurge.routingEntry().index(), shardToPurge.routingEntry().id());

View File

@ -180,7 +180,7 @@ public class InternalSettingsPreparer {
static Settings replacePromptPlaceholders(Settings settings, Terminal terminal) {
UnmodifiableIterator<Map.Entry<String, String>> iter = settings.getAsMap().entrySet().iterator();
Settings.Builder builder = Settings.builder();
Settings.Builder builder = Settings.builder().classLoader(settings.getClassLoaderIfSet());
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();

View File

@ -60,7 +60,6 @@ public class RestAnalyzeAction extends BaseRestHandler {
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"));
analyzeRequest.text(texts);
analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard()));
analyzeRequest.analyzer(request.param("analyzer"));
analyzeRequest.field(request.param("field"));
analyzeRequest.tokenizer(request.param("tokenizer"));
@ -93,8 +92,6 @@ public class RestAnalyzeAction extends BaseRestHandler {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("prefer_local".equals(currentFieldName) && token == XContentParser.Token.VALUE_BOOLEAN) {
analyzeRequest.preferLocal(parser.booleanValue());
} else if ("text".equals(currentFieldName) && token == XContentParser.Token.VALUE_STRING) {
analyzeRequest.text(parser.text());
} else if ("text".equals(currentFieldName) && token == XContentParser.Token.START_ARRAY) {

View File

@ -50,7 +50,7 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
/**
* Defaults to 30 seconds
*/
MEDIUM(TimeValue.timeValueSeconds(25)),
MEDIUM(TimeValue.timeValueSeconds(30)),
/**
* Defaults to 60 seconds

View File

@ -302,8 +302,8 @@ public class TermVectorsUnitTests extends ElasticsearchTestCase {
request = new MultiTermVectorsRequest();
request.add(new TermVectorsRequest(), bytes);
checkParsedParameters(request);
}
void checkParsedParameters(MultiTermVectorsRequest request) {
Set<String> ids = new HashSet<>();
ids.add("1");
@ -325,4 +325,30 @@ public class TermVectorsUnitTests extends ElasticsearchTestCase {
}
}
@Test // issue #12311
public void testMultiParserFilter() throws Exception {
byte[] data = Streams.copyToBytesFromClasspath("/org/elasticsearch/action/termvectors/multiRequest3.json");
BytesReference bytes = new BytesArray(data);
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
request.add(new TermVectorsRequest(), bytes);
checkParsedFilterParameters(request);
}
void checkParsedFilterParameters(MultiTermVectorsRequest multiRequest) {
int id = 1;
for (TermVectorsRequest request : multiRequest.requests) {
assertThat(request.index(), equalTo("testidx"));
assertThat(request.type(), equalTo("test"));
assertThat(request.id(), equalTo(id+""));
assertNotNull(request.filterSettings());
assertThat(request.filterSettings().maxNumTerms, equalTo(20));
assertThat(request.filterSettings().minTermFreq, equalTo(1));
assertThat(request.filterSettings().maxTermFreq, equalTo(20));
assertThat(request.filterSettings().minDocFreq, equalTo(1));
assertThat(request.filterSettings().maxDocFreq, equalTo(20));
assertThat(request.filterSettings().minWordLength, equalTo(1));
assertThat(request.filterSettings().maxWordLength, equalTo(20));
id++;
}
}
}

View File

@ -0,0 +1,16 @@
{
"ids": ["1","2"],
"parameters": {
"_index": "testidx",
"_type": "test",
"filter": {
"max_num_terms": 20,
"min_term_freq": 1,
"max_term_freq": 20,
"min_doc_freq": 1,
"max_doc_freq": 20,
"min_word_length": 1,
"max_word_length": 20
}
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -48,6 +49,177 @@ public class ShardRoutingTests extends ElasticsearchTestCase {
}
}
public void testIsSameAllocation() {
ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED, 1);
ShardRouting unassignedShard1 = TestShardRouting.newShardRouting("test", 1, null, false, ShardRoutingState.UNASSIGNED, 1);
ShardRouting initializingShard0 = TestShardRouting.newShardRouting("test", 0, "1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
ShardRouting initializingShard1 = TestShardRouting.newShardRouting("test", 1, "1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
ShardRouting startedShard0 = new ShardRouting(initializingShard0);
startedShard0.moveToStarted();
ShardRouting startedShard1 = new ShardRouting(initializingShard1);
startedShard1.moveToStarted();
// test identity
assertTrue(initializingShard0.isSameAllocation(initializingShard0));
// test same allocation different state
assertTrue(initializingShard0.isSameAllocation(startedShard0));
// test unassigned is false even to itself
assertFalse(unassignedShard0.isSameAllocation(unassignedShard0));
// test different shards/nodes/state
assertFalse(unassignedShard0.isSameAllocation(unassignedShard1));
assertFalse(unassignedShard0.isSameAllocation(initializingShard0));
assertFalse(unassignedShard0.isSameAllocation(initializingShard1));
assertFalse(unassignedShard0.isSameAllocation(startedShard1));
}
public void testIsSameShard() {
ShardRouting index1Shard0a = randomShardRouting("index1", 0);
ShardRouting index1Shard0b = randomShardRouting("index1", 0);
ShardRouting index1Shard1 = randomShardRouting("index1", 1);
ShardRouting index2Shard0 = randomShardRouting("index2", 0);
ShardRouting index2Shard1 = randomShardRouting("index2", 1);
assertTrue(index1Shard0a.isSameShard(index1Shard0a));
assertTrue(index1Shard0a.isSameShard(index1Shard0b));
assertFalse(index1Shard0a.isSameShard(index1Shard1));
assertFalse(index1Shard0a.isSameShard(index2Shard0));
assertFalse(index1Shard0a.isSameShard(index2Shard1));
}
private ShardRouting randomShardRouting(String index, int shard) {
ShardRoutingState state = randomFrom(ShardRoutingState.values());
return TestShardRouting.newShardRouting(index, shard, state == ShardRoutingState.UNASSIGNED ? null : "1", state != ShardRoutingState.UNASSIGNED && randomBoolean(), state, randomInt(5));
}
public void testIsSourceTargetRelocation() {
ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED, 1);
ShardRouting initializingShard0 = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
ShardRouting initializingShard1 = TestShardRouting.newShardRouting("test", 1, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
ShardRouting startedShard0 = new ShardRouting(initializingShard0);
startedShard0.moveToStarted();
ShardRouting startedShard1 = new ShardRouting(initializingShard1);
startedShard1.moveToStarted();
ShardRouting sourceShard0a = new ShardRouting(startedShard0);
sourceShard0a.relocate("node2");
ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard();
ShardRouting sourceShard0b = new ShardRouting(startedShard0);
sourceShard0b.relocate("node2");
ShardRouting sourceShard1 = new ShardRouting(startedShard1);
sourceShard1.relocate("node2");
// test true scenarios
assertTrue(targetShard0a.isRelocationTargetOf(sourceShard0a));
assertTrue(sourceShard0a.isRelocationSourceOf(targetShard0a));
// test two shards are not mixed
assertFalse(targetShard0a.isRelocationTargetOf(sourceShard1));
assertFalse(sourceShard1.isRelocationSourceOf(targetShard0a));
// test two allocations are not mixed
assertFalse(targetShard0a.isRelocationTargetOf(sourceShard0b));
assertFalse(sourceShard0b.isRelocationSourceOf(targetShard0a));
// test different shard states
assertFalse(targetShard0a.isRelocationTargetOf(unassignedShard0));
assertFalse(sourceShard0a.isRelocationTargetOf(unassignedShard0));
assertFalse(unassignedShard0.isRelocationSourceOf(targetShard0a));
assertFalse(unassignedShard0.isRelocationSourceOf(sourceShard0a));
assertFalse(targetShard0a.isRelocationTargetOf(initializingShard0));
assertFalse(sourceShard0a.isRelocationTargetOf(initializingShard0));
assertFalse(initializingShard0.isRelocationSourceOf(targetShard0a));
assertFalse(initializingShard0.isRelocationSourceOf(sourceShard0a));
assertFalse(targetShard0a.isRelocationTargetOf(startedShard0));
assertFalse(sourceShard0a.isRelocationTargetOf(startedShard0));
assertFalse(startedShard0.isRelocationSourceOf(targetShard0a));
assertFalse(startedShard0.isRelocationSourceOf(sourceShard0a));
}
public void testEqualsIgnoringVersion() {
ShardRouting routing = randomShardRouting("test", 0);
ShardRouting otherRouting = new ShardRouting(routing);
assertTrue("expected equality\nthis " + routing + ",\nother " + otherRouting, routing.equalsIgnoringMetaData(otherRouting));
otherRouting = new ShardRouting(routing, 1);
assertTrue("expected equality\nthis " + routing + ",\nother " + otherRouting, routing.equalsIgnoringMetaData(otherRouting));
otherRouting = new ShardRouting(routing);
Integer[] changeIds = new Integer[]{0, 1, 2, 3, 4, 5, 6};
for (int changeId : randomSubsetOf(randomIntBetween(1, changeIds.length), changeIds)) {
switch (changeId) {
case 0:
// change index
otherRouting = TestShardRouting.newShardRouting(otherRouting.index() + "a", otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(),
otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo());
break;
case 1:
// change shard id
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id() + 1, otherRouting.currentNodeId(), otherRouting.relocatingNodeId(),
otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo());
break;
case 2:
// change current node
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId() == null ? "1" : otherRouting.currentNodeId() + "_1", otherRouting.relocatingNodeId(),
otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo());
break;
case 3:
// change relocating node
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(),
otherRouting.relocatingNodeId() == null ? "1" : otherRouting.relocatingNodeId() + "_1",
otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo());
break;
case 4:
// change restore source
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(),
otherRouting.restoreSource() == null ? new RestoreSource(new SnapshotId("test", "s1"), Version.CURRENT, "test") :
new RestoreSource(otherRouting.restoreSource().snapshotId(), Version.CURRENT, otherRouting.index() + "_1"),
otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo());
break;
case 5:
// change primary flag
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(),
otherRouting.restoreSource(), otherRouting.primary() == false, otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo());
break;
case 6:
// change state
ShardRoutingState newState;
do {
newState = randomFrom(ShardRoutingState.values());
} while (newState == otherRouting.state());
UnassignedInfo unassignedInfo = otherRouting.unassignedInfo();
if (unassignedInfo == null && (newState == ShardRoutingState.UNASSIGNED || newState == ShardRoutingState.INITIALIZING)) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test");
}
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(),
otherRouting.restoreSource(), otherRouting.primary(), newState, otherRouting.version(), unassignedInfo);
break;
}
if (randomBoolean()) {
// change version
otherRouting = new ShardRouting(otherRouting, otherRouting.version() + 1);
}
if (randomBoolean()) {
// change unassigned info
otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(),
otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(),
otherRouting.unassignedInfo() == null ? new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") :
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, otherRouting.unassignedInfo().getMessage() + "_1"));
}
logger.debug("comparing\nthis {} to\nother {}", routing, otherRouting);
assertFalse("expected non-equality\nthis " + routing + ",\nother " + otherRouting, routing.equalsIgnoringMetaData(otherRouting));
}
}
public void testFrozenOnRoutingTable() {
MetaData metaData = MetaData.builder()

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.test.ElasticsearchTestCase;
/**
* A helper that allows to create shard routing instances within tests, while not requiring to expose
* different simplified constructors on the ShardRouting itself.
@ -26,19 +28,19 @@ package org.elasticsearch.cluster.routing;
public class TestShardRouting {
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, null, buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, allocationId, true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId,
@ -61,4 +63,17 @@ public class TestShardRouting {
throw new IllegalStateException("illegal state");
}
}
private static UnassignedInfo buildUnassignedInfo(ShardRoutingState state) {
switch (state) {
case UNASSIGNED:
case INITIALIZING:
return new UnassignedInfo(ElasticsearchTestCase.randomFrom(UnassignedInfo.Reason.values()), "auto generated for test");
case STARTED:
case RELOCATING:
return null;
default:
throw new IllegalStateException("illegal state");
}
}
}

View File

@ -60,7 +60,8 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
UnassignedInfo.Reason.ALLOCATION_FAILED,
UnassignedInfo.Reason.NODE_LEFT,
UnassignedInfo.Reason.REROUTE_CANCELLED,
UnassignedInfo.Reason.REINITIALIZED};
UnassignedInfo.Reason.REINITIALIZED,
UnassignedInfo.Reason.REALLOCATED_REPLICA};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}

View File

@ -219,6 +219,39 @@ public class ReplicaShardAllocatorTests extends ElasticsearchAllocationTestCase
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
}
@Test
public void testCancelRecoveryBetterSyncId() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, false, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node3, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId));
}
@Test
public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node3, false, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}
@Test
public void testNotCancellingRecovery() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}
private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED);
}
@ -242,6 +275,25 @@ public class ReplicaShardAllocatorTests extends ElasticsearchAllocationTestCase
return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null);
}
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), false, ShardRoutingState.INITIALIZING, 10))
.build())
)
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null);
}
class TestAllocator extends ReplicaShardAllocator {
private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> data = null;

View File

@ -236,15 +236,15 @@ public class InternalEngineTests extends ElasticsearchTestCase {
}
protected InternalEngine createEngine(Store store, Path translogPath) {
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy());
protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) {
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers);
}
protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false);
protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) {
return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy, wrappers), false);
}
public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
@ -255,7 +255,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
}, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig);
return config;
}
@ -493,6 +493,32 @@ public class InternalEngineTests extends ElasticsearchTestCase {
;
}
@Test
public void testIndexSearcherWrapper() throws Exception {
final AtomicInteger counter = new AtomicInteger();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) {
counter.incrementAndGet();
return reader;
}
@Override
public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
counter.incrementAndGet();
return searcher;
}
};
Store store = createStore();
Path translog = createTempDir("translog-test");
InternalEngine engine = createEngine(store, translog, wrapper);
Engine.Searcher searcher = engine.acquireSearcher("test");
assertThat(counter.get(), equalTo(2));
searcher.close();
IOUtils.close(store, engine);
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.acquireSearcher("test");
@ -1985,7 +2011,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig);
try {
new InternalEngine(brokenConfig, false);

View File

@ -47,7 +47,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
@ -226,7 +225,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig);
return config;
}

View File

@ -242,7 +242,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest {
NumericRangeQuery<Long> rangeQuery;
try {
SearchContext.setCurrent(new TestSearchContext());
rangeQuery = (NumericRangeQuery<Long>) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("10:00:00", "11:00:00", true, true, null).rewrite(null);
rangeQuery = (NumericRangeQuery<Long>) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("10:00:00", "11:00:00", true, true).rewrite(null);
} finally {
SearchContext.removeCurrent();
}
@ -268,7 +268,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest {
NumericRangeQuery<Long> rangeQuery;
try {
SearchContext.setCurrent(new TestSearchContext());
rangeQuery = (NumericRangeQuery<Long>) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("Jan 02 10:00:00", "Jan 02 11:00:00", true, true, null).rewrite(null);
rangeQuery = (NumericRangeQuery<Long>) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("Jan 02 10:00:00", "Jan 02 11:00:00", true, true).rewrite(null);
} finally {
SearchContext.removeCurrent();
}

View File

@ -205,7 +205,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
routing = TestShardRouting.newShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), null, null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
routing = TestShardRouting.newShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.allocationId(), shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shard.deleteShardState();

View File

@ -29,6 +29,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -220,4 +222,19 @@ public class InternalSettingsPreparerTests extends ElasticsearchTestCase {
assertThat(settings.get("name"), is("prompted name 0"));
assertThat(settings.get("node.name"), is("prompted name 0"));
}
@Test
public void testPreserveSettingsClassloader() {
final ClassLoader classLoader = URLClassLoader.newInstance(new URL[0]);
Settings settings = settingsBuilder()
.put("foo", "bar")
.put("path.home", createTempDir())
.classLoader(classLoader)
.build();
Tuple<Settings, Environment> tuple = InternalSettingsPreparer.prepareSettings(settings, randomBoolean());
Settings preparedSettings = tuple.v1();
assertThat(preparedSettings.getClassLoaderIfSet(), is(classLoader));
}
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
import org.elasticsearch.test.junit.listeners.AssertionErrorThreadDumpPrinter;
import org.elasticsearch.test.junit.listeners.LoggingListener;
import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
import org.elasticsearch.test.search.MockSearchService;
@ -78,7 +79,8 @@ import static com.google.common.collect.Lists.newArrayList;
*/
@Listeners({
ReproduceInfoPrinter.class,
LoggingListener.class
LoggingListener.class,
AssertionErrorThreadDumpPrinter.class
})
// remove this entire annotation on upgrade to 5.3!
@ThreadLeakFilters(defaultFilters = true, filters = {
@ -550,44 +552,7 @@ public abstract class ElasticsearchTestCase extends LuceneTestCase {
protected static final void printStackDump(ESLogger logger) {
// print stack traces if we can't create any native thread anymore
Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
logger.error(formatThreadStacks(allStackTraces));
}
/** Dump threads and their current stack trace. */
private static String formatThreadStacks(Map<Thread, StackTraceElement[]> threads) {
StringBuilder message = new StringBuilder();
int cnt = 1;
final Formatter f = new Formatter(message, Locale.ENGLISH);
for (Map.Entry<Thread, StackTraceElement[]> e : threads.entrySet()) {
if (e.getKey().isAlive()) {
f.format(Locale.ENGLISH, "\n %2d) %s", cnt++, threadName(e.getKey())).flush();
}
if (e.getValue().length == 0) {
message.append("\n at (empty stack)");
} else {
for (StackTraceElement ste : e.getValue()) {
message.append("\n at ").append(ste);
}
}
}
return message.toString();
}
private static String threadName(Thread t) {
return "Thread[" +
"id=" + t.getId() +
", name=" + t.getName() +
", state=" + t.getState() +
", group=" + groupName(t.getThreadGroup()) +
"]";
}
private static String groupName(ThreadGroup threadGroup) {
if (threadGroup == null) {
return "{null group}";
} else {
return threadGroup.getName();
}
logger.error(StackTraces.formatThreadStacks(allStackTraces));
}
/**

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import java.util.Formatter;
import java.util.Locale;
import java.util.Map;
public class StackTraces {
/** Dump threads and their current stack trace. */
public static String formatThreadStacks(Map<Thread, StackTraceElement[]> threads) {
StringBuilder message = new StringBuilder();
int cnt = 1;
final Formatter f = new Formatter(message, Locale.ENGLISH);
for (Map.Entry<Thread, StackTraceElement[]> e : threads.entrySet()) {
if (e.getKey().isAlive()) {
f.format(Locale.ENGLISH, "\n %2d) %s", cnt++, threadName(e.getKey())).flush();
}
if (e.getValue().length == 0) {
message.append("\n at (empty stack)");
} else {
for (StackTraceElement ste : e.getValue()) {
message.append("\n at ").append(ste);
}
}
}
return message.toString();
}
private static String groupName(ThreadGroup threadGroup) {
if (threadGroup == null) {
return "{null group}";
} else {
return threadGroup.getName();
}
}
private static String threadName(Thread t) {
return "Thread[" +
"id=" + t.getId() +
", name=" + t.getName() +
", state=" + t.getState() +
", group=" + groupName(t.getThreadGroup()) +
"]";
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.junit.listeners;
import org.elasticsearch.test.StackTraces;
import org.junit.runner.notification.Failure;
import org.junit.runner.notification.RunListener;
import java.util.Map;
public class AssertionErrorThreadDumpPrinter extends RunListener {
@Override
public void testFailure(Failure failure) throws Exception {
if (failure.getException() instanceof AssertionError) {
Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
String threadStacks = StackTraces.formatThreadStacks(allStackTraces);
System.err.println(threadStacks);
}
}
}

View File

@ -825,3 +825,9 @@ For the record, official plugins which can use this new simplified form are:
Fields used in alias filters no longer have to exist in the mapping upon alias creation time. Alias filters are now
parsed at request time and then the fields in filters are resolved from the mapping, whereas before alias filters were
parsed at alias creation time and the parsed form was kept around in memory.
=== _analyze API
The `prefer_local` has been removed from the _analyze api. The _analyze api is a light operation and the caller shouldn't
be concerned about whether it executes on the node that receives the request or another node.