Merge branch 'master' into feature/multi_cluster_search
This commit is contained in:
commit
19f9cb307a
|
@ -17,6 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import java.nio.file.Path
|
||||
import org.eclipse.jgit.lib.Repository
|
||||
import org.eclipse.jgit.lib.RepositoryBuilder
|
||||
import org.gradle.plugins.ide.eclipse.model.SourceFolder
|
||||
|
@ -29,8 +30,9 @@ subprojects {
|
|||
description = "Elasticsearch subproject ${project.path}"
|
||||
}
|
||||
|
||||
Path rootPath = rootDir.toPath()
|
||||
// setup pom license info, but only for artifacts that are part of elasticsearch
|
||||
configure(subprojects.findAll { it.path.startsWith(':x-plugins') == false }) {
|
||||
configure(subprojects.findAll { it.projectDir.toPath().startsWith(rootPath) }) {
|
||||
|
||||
// we only use maven publish to add tasks for pom generation
|
||||
plugins.withType(MavenPublishPlugin).whenPluginAdded {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch;
|
|||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -36,13 +37,15 @@ import org.elasticsearch.transport.TcpTransport;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
|
||||
|
@ -56,19 +59,19 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
static final Version UNKNOWN_VERSION_ADDED = Version.fromId(0);
|
||||
|
||||
/**
|
||||
* Passed in the {@link Params} of {@link #toXContent(XContentBuilder, org.elasticsearch.common.xcontent.ToXContent.Params, Throwable)}
|
||||
* Passed in the {@link Params} of {@link #generateThrowableXContent(XContentBuilder, Params, Throwable)}
|
||||
* to control if the {@code caused_by} element should render. Unlike most parameters to {@code toXContent} methods this parameter is
|
||||
* internal only and not available as a URL parameter.
|
||||
*/
|
||||
public static final String REST_EXCEPTION_SKIP_CAUSE = "rest.exception.cause.skip";
|
||||
/**
|
||||
* Passed in the {@link Params} of {@link #toXContent(XContentBuilder, org.elasticsearch.common.xcontent.ToXContent.Params, Throwable)}
|
||||
* Passed in the {@link Params} of {@link #generateThrowableXContent(XContentBuilder, Params, Throwable)}
|
||||
* to control if the {@code stack_trace} element should render. Unlike most parameters to {@code toXContent} methods this parameter is
|
||||
* internal only and not available as a URL parameter. Use the {@code error_trace} parameter instead.
|
||||
*/
|
||||
public static final String REST_EXCEPTION_SKIP_STACK_TRACE = "rest.exception.stacktrace.skip";
|
||||
public static final boolean REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT = true;
|
||||
public static final boolean REST_EXCEPTION_SKIP_CAUSE_DEFAULT = false;
|
||||
private static final boolean REST_EXCEPTION_SKIP_CAUSE_DEFAULT = false;
|
||||
private static final String INDEX_HEADER_KEY = "es.index";
|
||||
private static final String INDEX_HEADER_KEY_UUID = "es.index_uuid";
|
||||
private static final String SHARD_HEADER_KEY = "es.shard";
|
||||
|
@ -160,6 +163,10 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
return headers.get(key);
|
||||
}
|
||||
|
||||
protected Map<String, List<String>> getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the rest status code associated with this exception.
|
||||
*/
|
||||
|
@ -257,64 +264,56 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
Throwable ex = ExceptionsHelper.unwrapCause(this);
|
||||
if (ex != this) {
|
||||
toXContent(builder, params, this);
|
||||
generateThrowableXContent(builder, params, this);
|
||||
} else {
|
||||
builder.field(TYPE, getExceptionName());
|
||||
builder.field(REASON, getMessage());
|
||||
for (String key : headers.keySet()) {
|
||||
if (key.startsWith("es.")) {
|
||||
List<String> values = headers.get(key);
|
||||
xContentHeader(builder, key.substring("es.".length()), values);
|
||||
}
|
||||
}
|
||||
innerToXContent(builder, params);
|
||||
renderHeader(builder, params);
|
||||
if (params.paramAsBoolean(REST_EXCEPTION_SKIP_STACK_TRACE, REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT) == false) {
|
||||
builder.field(STACK_TRACE, ExceptionsHelper.stackTrace(this));
|
||||
}
|
||||
innerToXContent(builder, params, this, getExceptionName(), getMessage(), headers, getCause());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Renders additional per exception information into the xcontent
|
||||
*/
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
causeToXContent(builder, params);
|
||||
}
|
||||
protected static void innerToXContent(XContentBuilder builder, Params params,
|
||||
Throwable throwable, String type, String message, Map<String, List<String>> headers,
|
||||
Throwable cause) throws IOException {
|
||||
builder.field(TYPE, type);
|
||||
builder.field(REASON, message);
|
||||
|
||||
/**
|
||||
* Renders a cause exception as xcontent
|
||||
*/
|
||||
protected void causeToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
final Throwable cause = getCause();
|
||||
if (cause != null && params.paramAsBoolean(REST_EXCEPTION_SKIP_CAUSE, REST_EXCEPTION_SKIP_CAUSE_DEFAULT) == false) {
|
||||
builder.field(CAUSED_BY);
|
||||
builder.startObject();
|
||||
toXContent(builder, params, cause);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
||||
protected final void renderHeader(XContentBuilder builder, Params params) throws IOException {
|
||||
boolean hasHeader = false;
|
||||
Set<String> customHeaders = new HashSet<>();
|
||||
for (String key : headers.keySet()) {
|
||||
if (key.startsWith("es.")) {
|
||||
continue;
|
||||
headerToXContent(builder, key.substring("es.".length()), headers.get(key));
|
||||
} else {
|
||||
customHeaders.add(key);
|
||||
}
|
||||
if (hasHeader == false) {
|
||||
builder.startObject(HEADER);
|
||||
hasHeader = true;
|
||||
}
|
||||
List<String> values = headers.get(key);
|
||||
xContentHeader(builder, key, values);
|
||||
|
||||
if (throwable instanceof ElasticsearchException) {
|
||||
ElasticsearchException exception = (ElasticsearchException) throwable;
|
||||
exception.metadataToXContent(builder, params);
|
||||
}
|
||||
if (hasHeader) {
|
||||
|
||||
if (params.paramAsBoolean(REST_EXCEPTION_SKIP_CAUSE, REST_EXCEPTION_SKIP_CAUSE_DEFAULT) == false) {
|
||||
if (cause != null) {
|
||||
builder.field(CAUSED_BY);
|
||||
builder.startObject();
|
||||
generateThrowableXContent(builder, params, cause);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
||||
private void xContentHeader(XContentBuilder builder, String key, List<String> values) throws IOException {
|
||||
if (customHeaders.isEmpty() == false) {
|
||||
builder.startObject(HEADER);
|
||||
for (String header : customHeaders) {
|
||||
headerToXContent(builder, header, headers.get(header));
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
if (params.paramAsBoolean(REST_EXCEPTION_SKIP_STACK_TRACE, REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT) == false) {
|
||||
builder.field(STACK_TRACE, ExceptionsHelper.stackTrace(throwable));
|
||||
}
|
||||
}
|
||||
|
||||
private static void headerToXContent(XContentBuilder builder, String key, List<String> values) throws IOException {
|
||||
if (values != null && values.isEmpty() == false) {
|
||||
if (values.size() == 1) {
|
||||
builder.field(key, values.get(0));
|
||||
|
@ -329,25 +328,73 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
}
|
||||
|
||||
/**
|
||||
* Static toXContent helper method that also renders non {@link org.elasticsearch.ElasticsearchException} instances as XContent.
|
||||
* Renders additional per exception information into the XContent
|
||||
*/
|
||||
public static void toXContent(XContentBuilder builder, Params params, Throwable ex) throws IOException {
|
||||
ex = ExceptionsHelper.unwrapCause(ex);
|
||||
if (ex instanceof ElasticsearchException) {
|
||||
((ElasticsearchException) ex).toXContent(builder, params);
|
||||
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Static toXContent helper method that renders {@link org.elasticsearch.ElasticsearchException} or {@link Throwable} instances
|
||||
* as XContent, delegating the rendering to {@link #toXContent(XContentBuilder, Params)}
|
||||
* or {@link #innerToXContent(XContentBuilder, Params, Throwable, String, String, Map, Throwable)}.
|
||||
*
|
||||
* This method is usually used when the {@link Throwable} is rendered as a part of another XContent object.
|
||||
*/
|
||||
public static void generateThrowableXContent(XContentBuilder builder, Params params, Throwable t) throws IOException {
|
||||
t = ExceptionsHelper.unwrapCause(t);
|
||||
|
||||
if (t instanceof ElasticsearchException) {
|
||||
((ElasticsearchException) t).toXContent(builder, params);
|
||||
} else {
|
||||
builder.field(TYPE, getExceptionName(ex));
|
||||
builder.field(REASON, ex.getMessage());
|
||||
if (ex.getCause() != null) {
|
||||
builder.field(CAUSED_BY);
|
||||
innerToXContent(builder, params, t, getExceptionName(t), t.getMessage(), emptyMap(), t.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Render any exception as a xcontent, encapsulated within a field or object named "error". The level of details that are rendered
|
||||
* depends on the value of the "detailed" parameter: when it's false only a simple message based on the type and message of the
|
||||
* exception is rendered. When it's true all detail are provided including guesses root causes, cause and potentially stack
|
||||
* trace.
|
||||
*
|
||||
* This method is usually used when the {@link Exception} is rendered as a full XContent object.
|
||||
*/
|
||||
public static void generateFailureXContent(XContentBuilder builder, Params params, @Nullable Exception e, boolean detailed)
|
||||
throws IOException {
|
||||
// No exception to render as an error
|
||||
if (e == null) {
|
||||
builder.field(ERROR, "unknown");
|
||||
return;
|
||||
}
|
||||
|
||||
// Render the exception with a simple message
|
||||
if (detailed == false) {
|
||||
String message = "No ElasticsearchException found";
|
||||
Throwable t = e;
|
||||
for (int counter = 0; counter < 10 && t != null; counter++) {
|
||||
if (t instanceof ElasticsearchException) {
|
||||
message = t.getClass().getSimpleName() + "[" + t.getMessage() + "]";
|
||||
break;
|
||||
}
|
||||
t = t.getCause();
|
||||
}
|
||||
builder.field(ERROR, message);
|
||||
return;
|
||||
}
|
||||
|
||||
// Render the exception with all details
|
||||
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(e);
|
||||
builder.startObject(ERROR);
|
||||
{
|
||||
builder.startArray(ROOT_CAUSE);
|
||||
for (ElasticsearchException rootCause : rootCauses) {
|
||||
builder.startObject();
|
||||
toXContent(builder, params, ex.getCause());
|
||||
rootCause.toXContent(builder, new DelegatingMapParams(singletonMap(REST_EXCEPTION_SKIP_CAUSE, "true"), params));
|
||||
builder.endObject();
|
||||
}
|
||||
if (params.paramAsBoolean(REST_EXCEPTION_SKIP_STACK_TRACE, REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT) == false) {
|
||||
builder.field(STACK_TRACE, ExceptionsHelper.stackTrace(ex));
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
generateThrowableXContent(builder, params, e);
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -877,22 +924,6 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
return null;
|
||||
}
|
||||
|
||||
public static void renderException(XContentBuilder builder, Params params, Exception e) throws IOException {
|
||||
builder.startObject(ERROR);
|
||||
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(e);
|
||||
builder.field(ROOT_CAUSE);
|
||||
builder.startArray();
|
||||
for (ElasticsearchException rootCause : rootCauses) {
|
||||
builder.startObject();
|
||||
rootCause.toXContent(builder, new ToXContent.DelegatingMapParams(
|
||||
Collections.singletonMap(ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE, "true"), params));
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
ElasticsearchException.toXContent(builder, params, e);
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
// lower cases and adds underscores to transitions in a name
|
||||
private static String toUnderscoreCase(String value) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
|
|
@ -105,7 +105,7 @@ public final class TaskOperationFailure implements Writeable, ToXContent {
|
|||
if (reason != null) {
|
||||
builder.field("reason");
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, params, reason);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, reason);
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
|
|
|
@ -19,21 +19,21 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
|
@ -49,9 +49,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
|
@ -116,18 +114,15 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
@Override
|
||||
protected synchronized void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask,
|
||||
ActionListener<TaskInfo> listener) {
|
||||
final BanLock banLock = new BanLock(nodes -> removeBanOnNodes(cancellableTask, nodes));
|
||||
Set<String> childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
|
||||
if (childNodes != null) {
|
||||
if (childNodes.isEmpty()) {
|
||||
// The task has no child tasks, so we can return immediately
|
||||
logger.trace("cancelling task {} with no children", cancellableTask.getId());
|
||||
listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false));
|
||||
} else {
|
||||
// The task has some child tasks, we need to wait for until ban is set on all nodes
|
||||
logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes);
|
||||
DiscoveryNodes childNodes = clusterService.state().nodes();
|
||||
final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes));
|
||||
boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
|
||||
if (canceled) {
|
||||
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
|
||||
// /In case the task has some child tasks, we need to wait for until ban is set on all nodes
|
||||
logger.trace("cancelling task {} on child nodes", cancellableTask.getId());
|
||||
String nodeId = clusterService.localNode().getId();
|
||||
AtomicInteger responses = new AtomicInteger(childNodes.size());
|
||||
AtomicInteger responses = new AtomicInteger(childNodes.getSize());
|
||||
List<Exception> failures = new ArrayList<>();
|
||||
setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener<Void>() {
|
||||
@Override
|
||||
|
@ -157,7 +152,8 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
logger.trace("task {} doesn't have any children that should be cancelled", cancellableTask.getId());
|
||||
}
|
||||
} else {
|
||||
logger.trace("task {} is already cancelled", cancellableTask.getId());
|
||||
|
@ -170,26 +166,22 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
return true;
|
||||
}
|
||||
|
||||
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, ActionListener<Void> listener) {
|
||||
private void setBanOnNodes(String reason, CancellableTask task, DiscoveryNodes nodes, ActionListener<Void> listener) {
|
||||
sendSetBanRequest(nodes,
|
||||
BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason),
|
||||
listener);
|
||||
}
|
||||
|
||||
private void removeBanOnNodes(CancellableTask task, Set<String> nodes) {
|
||||
private void removeBanOnNodes(CancellableTask task, DiscoveryNodes nodes) {
|
||||
sendRemoveBanRequest(nodes,
|
||||
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId())));
|
||||
}
|
||||
|
||||
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, ActionListener<Void> listener) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
|
||||
if (discoveryNode != null) {
|
||||
// Check if node still in the cluster
|
||||
logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
|
||||
private void sendSetBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request, ActionListener<Void> listener) {
|
||||
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getNodes()) {
|
||||
logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node.key,
|
||||
request.ban);
|
||||
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request,
|
||||
transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
|
@ -198,42 +190,30 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key);
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster",
|
||||
request.parentTaskId, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sendRemoveBanRequest(Set<String> nodes, BanParentTaskRequest request) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
|
||||
if (discoveryNode != null) {
|
||||
// Check if node still in the cluster
|
||||
logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler
|
||||
private void sendRemoveBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request) {
|
||||
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getNodes()) {
|
||||
logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key);
|
||||
transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler
|
||||
.INSTANCE_SAME);
|
||||
} else {
|
||||
logger.debug("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " +
|
||||
"the cluster", request.parentTaskId, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class BanLock {
|
||||
private final Consumer<Set<String>> finish;
|
||||
private final Runnable finish;
|
||||
private final AtomicInteger counter;
|
||||
private final AtomicReference<Set<String>> nodes = new AtomicReference<>();
|
||||
private final int nodesSize;
|
||||
|
||||
public BanLock(Consumer<Set<String>> finish) {
|
||||
public BanLock(int nodesSize, Runnable finish) {
|
||||
counter = new AtomicInteger(0);
|
||||
this.finish = finish;
|
||||
this.nodesSize = nodesSize;
|
||||
}
|
||||
|
||||
public void onBanSet() {
|
||||
|
@ -242,15 +222,14 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
}
|
||||
|
||||
public void onTaskFinished(Set<String> nodes) {
|
||||
this.nodes.set(nodes);
|
||||
if (counter.addAndGet(nodes.size()) == 0) {
|
||||
public void onTaskFinished() {
|
||||
if (counter.addAndGet(nodesSize) == 0) {
|
||||
finish();
|
||||
}
|
||||
}
|
||||
|
||||
public void finish() {
|
||||
finish.accept(nodes.get());
|
||||
finish.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -322,5 +301,4 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -120,7 +120,6 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
|
|||
return;
|
||||
}
|
||||
GetTaskRequest nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
|
||||
taskManager.registerChildTask(thisTask, node.getId());
|
||||
transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
|
||||
new TransportResponseHandler<GetTaskResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -207,7 +207,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
builder.field(Fields.ALLOCATED, allocationStatus.value());
|
||||
if (storeException != null) {
|
||||
builder.startObject(Fields.STORE_EXCEPTION);
|
||||
ElasticsearchException.toXContent(builder, params, storeException);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, storeException);
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
|
|
|
@ -131,6 +131,9 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
|
|||
}
|
||||
|
||||
}
|
||||
if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
|
||||
throw new IllegalArgumentException("cannot provide a routing partition size value when shrinking an index");
|
||||
}
|
||||
targetIndex.cause("shrink_index");
|
||||
Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings);
|
||||
settingsBuilder.put("index.number_of_shards", numShards);
|
||||
|
|
|
@ -63,7 +63,7 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
|
|||
builder.field(Fields._ID, failure.getId());
|
||||
builder.field(Fields.STATUS, failure.getStatus().getStatus());
|
||||
builder.startObject(Fields.ERROR);
|
||||
ElasticsearchException.toXContent(builder, params, failure.getCause());
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, failure.getCause());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
|
@ -173,7 +173,7 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
|
|||
builder.field(ID_FIELD, id);
|
||||
}
|
||||
builder.startObject(CAUSE_FIELD);
|
||||
ElasticsearchException.toXContent(builder, params, cause);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, cause);
|
||||
builder.endObject();
|
||||
builder.field(STATUS_FIELD, status.getStatus());
|
||||
return builder;
|
||||
|
|
|
@ -137,7 +137,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
|
|||
builder.field(Fields._INDEX, failure.getIndex());
|
||||
builder.field(Fields._TYPE, failure.getType());
|
||||
builder.field(Fields._ID, failure.getId());
|
||||
ElasticsearchException.renderException(builder, params, failure.getFailure());
|
||||
ElasticsearchException.generateFailureXContent(builder, params, failure.getFailure(), true);
|
||||
builder.endObject();
|
||||
} else {
|
||||
GetResponse getResponse = response.getResponse();
|
||||
|
|
|
@ -84,7 +84,7 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
|
|||
if (failure == null) {
|
||||
ingestDocument.toXContent(builder, params);
|
||||
} else {
|
||||
ElasticsearchException.renderException(builder, params, failure);
|
||||
ElasticsearchException.generateFailureXContent(builder, params, failure, true);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -99,10 +98,10 @@ class SimulateProcessorResult implements Writeable, ToXContent {
|
|||
|
||||
if (failure != null && ingestDocument != null) {
|
||||
builder.startObject("ignored_error");
|
||||
ElasticsearchException.renderException(builder, params, failure);
|
||||
ElasticsearchException.generateFailureXContent(builder, params, failure, true);
|
||||
builder.endObject();
|
||||
} else if (failure != null) {
|
||||
ElasticsearchException.renderException(builder, params, failure);
|
||||
ElasticsearchException.generateFailureXContent(builder, params, failure, true);
|
||||
}
|
||||
|
||||
if (ingestDocument != null) {
|
||||
|
|
|
@ -156,7 +156,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
|
|||
for (Item item : items) {
|
||||
builder.startObject();
|
||||
if (item.isFailure()) {
|
||||
ElasticsearchException.renderException(builder, params, item.getFailure());
|
||||
ElasticsearchException.generateFailureXContent(builder, params, item.getFailure(), true);
|
||||
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
|
||||
} else {
|
||||
item.getResponse().innerToXContent(builder, params);
|
||||
|
|
|
@ -103,6 +103,7 @@ public class SearchPhaseExecutionException extends ElasticsearchException {
|
|||
return shardFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable getCause() {
|
||||
Throwable cause = super.getCause();
|
||||
if (cause == null) {
|
||||
|
@ -131,7 +132,7 @@ public class SearchPhaseExecutionException extends ElasticsearchException {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("phase", phaseName);
|
||||
final boolean group = params.paramAsBoolean("group_shard_failures", true); // we group by default
|
||||
builder.field("grouped", group); // notify that it's grouped
|
||||
|
@ -144,15 +145,20 @@ public class SearchPhaseExecutionException extends ElasticsearchException {
|
|||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
super.innerToXContent(builder, params);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void causeToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (super.getCause() != null) {
|
||||
// if the cause is null we inject a guessed root cause that will then be rendered twice so wi disable it manually
|
||||
super.causeToXContent(builder, params);
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
Throwable ex = ExceptionsHelper.unwrapCause(this);
|
||||
if (ex != this) {
|
||||
generateThrowableXContent(builder, params, this);
|
||||
} else {
|
||||
// We don't have a cause when all shards failed, but we do have shards failures so we can "guess" a cause
|
||||
// (see {@link #getCause()}). Here, we use super.getCause() because we don't want the guessed exception to
|
||||
// be rendered twice (one in the "cause" field, one in "failed_shards")
|
||||
innerToXContent(builder, params, this, getExceptionName(), getMessage(), getHeaders(), super.getCause());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,4 +31,9 @@ public class SearchTask extends CancellableTask {
|
|||
super(id, type, action, description, parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -161,7 +161,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
|
|||
if (cause != null) {
|
||||
builder.field("reason");
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, params, cause);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, cause);
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
|
|
|
@ -125,7 +125,7 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
|
|||
if (reason != null) {
|
||||
builder.field("reason");
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, params, reason);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, reason);
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
|
|
|
@ -175,7 +175,6 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
// no node connected, act as failure
|
||||
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
|
||||
} else {
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler<ShardResponse>() {
|
||||
@Override
|
||||
public ShardResponse newInstance() {
|
||||
|
|
|
@ -318,7 +318,6 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards);
|
||||
if (task != null) {
|
||||
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
}
|
||||
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -160,7 +160,6 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
}
|
||||
}
|
||||
};
|
||||
taskManager.registerChildTask(task, nodes.getLocalNodeId());
|
||||
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
|
@ -173,7 +172,6 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
logger.debug("no known master node, scheduling a retry");
|
||||
retry(null, masterChangePredicate);
|
||||
} else {
|
||||
taskManager.registerChildTask(task, nodes.getMasterNode().getId());
|
||||
transportService.sendRequest(nodes.getMasterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener, TransportMasterNodeAction.this::newResponse) {
|
||||
@Override
|
||||
public void handleException(final TransportException exp) {
|
||||
|
|
|
@ -199,7 +199,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
TransportRequest nodeRequest = newNodeRequest(nodeId, request);
|
||||
if (task != null) {
|
||||
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
}
|
||||
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -375,7 +374,7 @@ public class ReplicationResponse extends ActionResponse {
|
|||
builder.field(_NODE, nodeId);
|
||||
builder.field(REASON);
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, params, cause);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, cause);
|
||||
builder.endObject();
|
||||
builder.field(STATUS, status);
|
||||
builder.field(PRIMARY, primary);
|
||||
|
|
|
@ -119,7 +119,6 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
|
||||
ShardRequest shardRequest = newShardRequest(request, shardId);
|
||||
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, clusterService.localNode().getId());
|
||||
replicatedBroadcastShardAction.execute(shardRequest, shardActionListener);
|
||||
}
|
||||
|
||||
|
|
|
@ -664,7 +664,6 @@ public abstract class TransportReplicationAction<
|
|||
return;
|
||||
}
|
||||
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
|
||||
performLocalAction(state, primary, node);
|
||||
} else {
|
||||
|
|
|
@ -278,7 +278,6 @@ public abstract class TransportTasksAction<
|
|||
} else {
|
||||
NodeTaskRequest nodeRequest = new NodeTaskRequest(request);
|
||||
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
|
||||
new TransportResponseHandler<NodeTasksResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -133,7 +133,7 @@ public class MultiTermVectorsResponse extends ActionResponse implements Iterable
|
|||
builder.field(Fields._INDEX, failure.getIndex());
|
||||
builder.field(Fields._TYPE, failure.getType());
|
||||
builder.field(Fields._ID, failure.getId());
|
||||
ElasticsearchException.renderException(builder, params, failure.getCause());
|
||||
ElasticsearchException.generateFailureXContent(builder, params, failure.getCause(), true);
|
||||
builder.endObject();
|
||||
} else {
|
||||
TermVectorsResponse getResponse = response.getResponse();
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.carrotsearch.hppc.LongArrayList;
|
|||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
|
@ -70,6 +71,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
|
||||
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
|
||||
|
@ -192,6 +194,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
public static final Setting<Boolean> INDEX_SHADOW_REPLICAS_SETTING =
|
||||
Setting.boolSetting(SETTING_SHADOW_REPLICAS, false, Property.IndexScope);
|
||||
|
||||
public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size";
|
||||
public static final Setting<Integer> INDEX_ROUTING_PARTITION_SIZE_SETTING =
|
||||
Setting.intSetting(SETTING_ROUTING_PARTITION_SIZE, 1, 1, Property.IndexScope);
|
||||
|
||||
public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem";
|
||||
public static final Setting<Boolean> INDEX_SHARED_FILESYSTEM_SETTING =
|
||||
Setting.boolSetting(SETTING_SHARED_FILESYSTEM, false, Property.IndexScope);
|
||||
|
@ -242,11 +248,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
public static final String INDEX_ROUTING_INCLUDE_GROUP_PREFIX = "index.routing.allocation.include";
|
||||
public static final String INDEX_ROUTING_EXCLUDE_GROUP_PREFIX = "index.routing.allocation.exclude";
|
||||
public static final Setting<Settings> INDEX_ROUTING_REQUIRE_GROUP_SETTING =
|
||||
Setting.groupSetting(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope);
|
||||
Setting.groupSetting(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.IndexScope);
|
||||
public static final Setting<Settings> INDEX_ROUTING_INCLUDE_GROUP_SETTING =
|
||||
Setting.groupSetting(INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope);
|
||||
Setting.groupSetting(INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.IndexScope);
|
||||
public static final Setting<Settings> INDEX_ROUTING_EXCLUDE_GROUP_SETTING =
|
||||
Setting.groupSetting(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope);
|
||||
Setting.groupSetting(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.IndexScope);
|
||||
public static final Setting<Settings> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
|
||||
Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!!
|
||||
|
||||
|
@ -272,6 +278,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
public static final String INDEX_STATE_FILE_PREFIX = "state-";
|
||||
private final int routingNumShards;
|
||||
private final int routingFactor;
|
||||
private final int routingPartitionSize;
|
||||
|
||||
private final int numberOfShards;
|
||||
private final int numberOfReplicas;
|
||||
|
@ -310,7 +317,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
|
||||
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
|
||||
Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion,
|
||||
int routingNumShards, ActiveShardCount waitForActiveShards) {
|
||||
int routingNumShards, int routingPartitionSize, ActiveShardCount waitForActiveShards) {
|
||||
|
||||
this.index = index;
|
||||
this.version = version;
|
||||
|
@ -334,6 +341,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
|
||||
this.routingNumShards = routingNumShards;
|
||||
this.routingFactor = routingNumShards / numberOfShards;
|
||||
this.routingPartitionSize = routingPartitionSize;
|
||||
this.waitForActiveShards = waitForActiveShards;
|
||||
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
|
||||
}
|
||||
|
@ -413,6 +421,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
return numberOfReplicas;
|
||||
}
|
||||
|
||||
public int getRoutingPartitionSize() {
|
||||
return routingPartitionSize;
|
||||
}
|
||||
|
||||
public boolean isRoutingPartitionedIndex() {
|
||||
return routingPartitionSize != 1;
|
||||
}
|
||||
|
||||
public int getTotalNumberOfShards() {
|
||||
return totalNumberOfShards;
|
||||
}
|
||||
|
@ -809,6 +825,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
return routingNumShards == null ? numberOfShards() : routingNumShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of shards.
|
||||
*
|
||||
* @return the provided value or -1 if it has not been set.
|
||||
*/
|
||||
public int numberOfShards() {
|
||||
return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1);
|
||||
}
|
||||
|
@ -818,10 +839,29 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of replicas.
|
||||
*
|
||||
* @return the provided value or -1 if it has not been set.
|
||||
*/
|
||||
public int numberOfReplicas() {
|
||||
return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1);
|
||||
}
|
||||
|
||||
public Builder routingPartitionSize(int routingPartitionSize) {
|
||||
settings = Settings.builder().put(settings).put(SETTING_ROUTING_PARTITION_SIZE, routingPartitionSize).build();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the routing partition size.
|
||||
*
|
||||
* @return the provided value or -1 if it has not been set.
|
||||
*/
|
||||
public int routingPartitionSize() {
|
||||
return settings.getAsInt(SETTING_ROUTING_PARTITION_SIZE, -1);
|
||||
}
|
||||
|
||||
public Builder creationDate(long creationDate) {
|
||||
settings = Settings.builder().put(settings).put(SETTING_CREATION_DATE, creationDate).build();
|
||||
return this;
|
||||
|
@ -885,7 +925,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
}
|
||||
|
||||
public Builder putInSyncAllocationIds(int shardId, Set<String> allocationIds) {
|
||||
inSyncAllocationIds.put(shardId, new HashSet(allocationIds));
|
||||
inSyncAllocationIds.put(shardId, new HashSet<>(allocationIds));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -964,6 +1004,12 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]");
|
||||
}
|
||||
|
||||
int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
|
||||
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
|
||||
throw new IllegalArgumentException("routing partition size [" + routingPartitionSize + "] should be a positive number"
|
||||
+ " less than the number of shards [" + getRoutingNumShards() + "] for [" + index + "]");
|
||||
}
|
||||
|
||||
// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
|
||||
ImmutableOpenIntMap.Builder<Set<String>> filledInSyncAllocationIds = ImmutableOpenIntMap.builder();
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
|
@ -1032,7 +1078,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
|
||||
return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
|
||||
tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
|
||||
indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards);
|
||||
indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards);
|
||||
}
|
||||
|
||||
public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata;
|
|||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
|
@ -592,6 +593,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
.put(IndexMetaData.SETTING_VERSION_CREATED, sourceMetaData.getCreationVersion())
|
||||
.put(IndexMetaData.SETTING_VERSION_UPGRADED, sourceMetaData.getUpgradedVersion())
|
||||
.put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate))
|
||||
.put(IndexMetaData.SETTING_ROUTING_PARTITION_SIZE, sourceMetaData.getRoutingPartitionSize())
|
||||
.put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName())
|
||||
.put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID());
|
||||
if (sourceMetaData.getMinimumCompatibleVersion() != null) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
|
@ -197,12 +198,16 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
|
|||
Index createdIndex = null;
|
||||
final String temporaryIndexName = UUIDs.randomBase64UUID();
|
||||
try {
|
||||
// use the provided values, otherwise just pick valid dummy values
|
||||
int dummyPartitionSize = IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.get(request.settings);
|
||||
int dummyShards = request.settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS,
|
||||
dummyPartitionSize == 1 ? 1 : dummyPartitionSize + 1);
|
||||
|
||||
//create index service for parsing and validating "mappings"
|
||||
Settings dummySettings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(request.settings)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, dummyShards)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
||||
.build();
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster.node;
|
|||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -28,6 +29,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class DiscoveryNodeFilters {
|
||||
|
||||
|
@ -36,6 +38,25 @@ public class DiscoveryNodeFilters {
|
|||
OR
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the IP addresses in a group of {@link Settings} by looking for the keys
|
||||
* "_ip", "_host_ip", and "_publish_ip" and ensuring each of their comma separated values
|
||||
* is a valid IP address.
|
||||
*/
|
||||
public static final Consumer<Settings> IP_VALIDATOR = (settings) -> {
|
||||
Map<String, String> settingsMap = settings.getAsMap();
|
||||
for (Map.Entry<String, String> entry : settingsMap.entrySet()) {
|
||||
String propertyKey = entry.getKey();
|
||||
if ("_ip".equals(propertyKey) || "_host_ip".equals(propertyKey) || "_publish_ip".equals(propertyKey)) {
|
||||
for (String value : Strings.tokenizeToStringArray(entry.getValue(), ",")) {
|
||||
if (InetAddresses.isInetAddress(value) == false) {
|
||||
throw new IllegalArgumentException("invalid IP address [" + value + "] for [" + propertyKey + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
public static DiscoveryNodeFilters buildFromSettings(OpType opType, String prefix, Settings settings) {
|
||||
return buildFromKeyValue(opType, settings.getByPrefix(prefix).getAsMap());
|
||||
}
|
||||
|
@ -43,7 +64,7 @@ public class DiscoveryNodeFilters {
|
|||
public static DiscoveryNodeFilters buildFromKeyValue(OpType opType, Map<String, String> filters) {
|
||||
Map<String, String[]> bFilters = new HashMap<>();
|
||||
for (Map.Entry<String, String> entry : filters.entrySet()) {
|
||||
String[] values = Strings.splitStringByCommaToArray(entry.getValue());
|
||||
String[] values = Strings.tokenizeToStringArray(entry.getValue(), ",");
|
||||
if (values.length > 0) {
|
||||
bFilters.put(entry.getKey(), values);
|
||||
}
|
||||
|
|
|
@ -92,13 +92,10 @@ public class OperationRouting extends AbstractComponent {
|
|||
final Set<String> effectiveRouting = routing.get(index);
|
||||
if (effectiveRouting != null) {
|
||||
for (String r : effectiveRouting) {
|
||||
int shardId = generateShardId(indexMetaData, null, r);
|
||||
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
|
||||
if (indexShard == null) {
|
||||
throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
|
||||
final int routingPartitionSize = indexMetaData.getRoutingPartitionSize();
|
||||
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
|
||||
set.add(shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset)));
|
||||
}
|
||||
// we might get duplicates, but that's ok, they will override one another
|
||||
set.add(indexShard);
|
||||
}
|
||||
} else {
|
||||
for (IndexShardRoutingTable indexShard : indexRouting) {
|
||||
|
@ -187,6 +184,14 @@ public class OperationRouting extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {
|
||||
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
|
||||
if (indexShard == null) {
|
||||
throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
|
||||
}
|
||||
return indexShard;
|
||||
}
|
||||
|
||||
protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) {
|
||||
IndexRoutingTable indexRouting = clusterState.routingTable().index(index);
|
||||
if (indexRouting == null) {
|
||||
|
@ -213,15 +218,33 @@ public class OperationRouting extends AbstractComponent {
|
|||
return new ShardId(indexMetaData.getIndex(), generateShardId(indexMetaData, id, routing));
|
||||
}
|
||||
|
||||
static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) {
|
||||
final int hash;
|
||||
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
|
||||
final String effectiveRouting;
|
||||
final int partitionOffset;
|
||||
|
||||
if (routing == null) {
|
||||
hash = Murmur3HashFunction.hash(id);
|
||||
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
|
||||
effectiveRouting = id;
|
||||
} else {
|
||||
hash = Murmur3HashFunction.hash(routing);
|
||||
effectiveRouting = routing;
|
||||
}
|
||||
|
||||
if (indexMetaData.isRoutingPartitionedIndex()) {
|
||||
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
|
||||
} else {
|
||||
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
|
||||
partitionOffset = 0;
|
||||
}
|
||||
|
||||
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
|
||||
}
|
||||
|
||||
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
|
||||
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
|
||||
|
||||
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
|
||||
// of original index to hash documents
|
||||
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -262,7 +262,7 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable<N
|
|||
}
|
||||
if (storeException != null) {
|
||||
builder.startObject("store_exception");
|
||||
ElasticsearchException.toXContent(builder, params, storeException);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, storeException);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,12 +78,12 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|||
public static final String NAME = "awareness";
|
||||
|
||||
public static final Setting<String[]> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
|
||||
new Setting<>("cluster.routing.allocation.awareness.attributes", "", Strings::splitStringByCommaToArray , Property.Dynamic,
|
||||
new Setting<>("cluster.routing.allocation.awareness.attributes", "", s -> Strings.tokenizeToStringArray(s, ","), Property.Dynamic,
|
||||
Property.NodeScope);
|
||||
public static final Setting<Settings> CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING =
|
||||
Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope);
|
||||
|
||||
private String[] awarenessAttributes;
|
||||
private volatile String[] awarenessAttributes;
|
||||
|
||||
private volatile Map<String, String[]> forcedAwarenessAttributes;
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
|
||||
|
||||
|
@ -68,11 +69,11 @@ public class FilterAllocationDecider extends AllocationDecider {
|
|||
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
|
||||
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
|
||||
public static final Setting<Settings> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING =
|
||||
Setting.groupSetting(CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope);
|
||||
Setting.groupSetting(CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope);
|
||||
public static final Setting<Settings> CLUSTER_ROUTING_INCLUDE_GROUP_SETTING =
|
||||
Setting.groupSetting(CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope);
|
||||
Setting.groupSetting(CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope);
|
||||
public static final Setting<Settings> CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING =
|
||||
Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope);
|
||||
Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
private volatile DiscoveryNodeFilters clusterRequireFilters;
|
||||
private volatile DiscoveryNodeFilters clusterIncludeFilters;
|
||||
|
|
|
@ -95,12 +95,11 @@ public class ParsingException extends ElasticsearchException {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (lineNumber != UNKNOWN_POSITION) {
|
||||
builder.field("line", lineNumber);
|
||||
builder.field("col", columnNumber);
|
||||
}
|
||||
super.innerToXContent(builder, params);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -73,9 +73,8 @@ public class CircuitBreakingException extends ElasticsearchException {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("bytes_wanted", bytesWanted);
|
||||
builder.field("bytes_limit", byteLimit);
|
||||
super.innerToXContent(builder, params);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING,
|
||||
IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING,
|
||||
IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING,
|
||||
IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING,
|
||||
IndexMetaData.INDEX_SHADOW_REPLICAS_SETTING,
|
||||
IndexMetaData.INDEX_SHARED_FILESYSTEM_SETTING,
|
||||
IndexMetaData.INDEX_READ_ONLY_SETTING,
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper;
|
|||
|
||||
import com.carrotsearch.hppc.ObjectHashSet;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
|
||||
|
@ -384,6 +385,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
|
|||
MapperUtils.collect(newMapper.mapping().root(), objectMappers, fieldMappers);
|
||||
checkFieldUniqueness(newMapper.type(), objectMappers, fieldMappers, fullPathObjectMappers, fieldTypes);
|
||||
checkObjectsCompatibility(objectMappers, updateAllTypes, fullPathObjectMappers);
|
||||
checkPartitionedIndexConstraints(newMapper);
|
||||
|
||||
// update lookup data-structures
|
||||
// this will in particular make sure that the merged fields are compatible with other types
|
||||
|
@ -598,6 +600,20 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkPartitionedIndexConstraints(DocumentMapper newMapper) {
|
||||
if (indexSettings.getIndexMetaData().isRoutingPartitionedIndex()) {
|
||||
if (newMapper.parentFieldMapper().active()) {
|
||||
throw new IllegalArgumentException("mapping type name [" + newMapper.type() + "] cannot have a "
|
||||
+ "_parent field for the partitioned index [" + indexSettings.getIndex().getName() + "]");
|
||||
}
|
||||
|
||||
if (!newMapper.routingFieldMapper().required()) {
|
||||
throw new IllegalArgumentException("mapping type [" + newMapper.type() + "] must have routing "
|
||||
+ "required for partitioned index [" + indexSettings.getIndex().getName() + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public DocumentMapper parse(String mappingType, CompressedXContent mappingSource, boolean applyDefault) throws MapperParsingException {
|
||||
return documentParser.parse(mappingType, mappingSource, applyDefault ? defaultMappingSource : null);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.index.query;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
|
@ -60,11 +59,6 @@ public class QueryShardException extends ElasticsearchException {
|
|||
return RestStatus.BAD_REQUEST;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
super.innerToXContent(builder, params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
|
|
|
@ -181,7 +181,16 @@ public class Node implements Closeable {
|
|||
*/
|
||||
public static final Setting<Boolean> NODE_LOCAL_STORAGE_SETTING = Setting.boolSetting("node.local_storage", true, Property.NodeScope);
|
||||
public static final Setting<String> NODE_NAME_SETTING = Setting.simpleString("node.name", Property.NodeScope);
|
||||
public static final Setting<Settings> NODE_ATTRIBUTES = Setting.groupSetting("node.attr.", Property.NodeScope);
|
||||
public static final Setting<Settings> NODE_ATTRIBUTES = Setting.groupSetting("node.attr.", (settings) -> {
|
||||
Map<String, String> settingsMap = settings.getAsMap();
|
||||
for (Map.Entry<String, String> entry : settingsMap.entrySet()) {
|
||||
String value = entry.getValue();
|
||||
if (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(value.length() - 1))) {
|
||||
throw new IllegalArgumentException("node.attr." + entry.getKey() + " cannot have leading or trailing whitespace " +
|
||||
"[" + value + "]");
|
||||
}
|
||||
}
|
||||
}, Property.NodeScope);
|
||||
public static final Setting<String> BREAKER_TYPE_KEY = new Setting<>("indices.breaker.type", "hierarchy", (s) -> {
|
||||
switch (s) {
|
||||
case "hierarchy":
|
||||
|
|
|
@ -31,7 +31,10 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
|
||||
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT;
|
||||
|
||||
|
||||
public class BytesRestResponse extends RestResponse {
|
||||
|
@ -89,7 +92,7 @@ public class BytesRestResponse extends RestResponse {
|
|||
this.content = BytesArray.EMPTY;
|
||||
this.contentType = TEXT_CONTENT_TYPE;
|
||||
} else {
|
||||
try (final XContentBuilder builder = convert(channel, status, e)) {
|
||||
try (final XContentBuilder builder = build(channel, status, e)) {
|
||||
this.content = builder.bytes();
|
||||
this.contentType = builder.contentType().mediaType();
|
||||
}
|
||||
|
@ -116,49 +119,25 @@ public class BytesRestResponse extends RestResponse {
|
|||
|
||||
private static final Logger SUPPRESSED_ERROR_LOGGER = ESLoggerFactory.getLogger("rest.suppressed");
|
||||
|
||||
private static XContentBuilder convert(RestChannel channel, RestStatus status, Exception e) throws IOException {
|
||||
XContentBuilder builder = channel.newErrorBuilder().startObject();
|
||||
if (e == null) {
|
||||
builder.field("error", "unknown");
|
||||
} else if (channel.detailedErrorsEnabled()) {
|
||||
final ToXContent.Params params;
|
||||
if (channel.request().paramAsBoolean("error_trace", !ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT)) {
|
||||
params = new ToXContent.DelegatingMapParams(
|
||||
Collections.singletonMap(ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE, "false"), channel.request());
|
||||
} else {
|
||||
private static XContentBuilder build(RestChannel channel, RestStatus status, Exception e) throws IOException {
|
||||
ToXContent.Params params = channel.request();
|
||||
if (params.paramAsBoolean("error_trace", !REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT)) {
|
||||
params = new ToXContent.DelegatingMapParams(singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false"), params);
|
||||
} else if (e != null) {
|
||||
Supplier<?> messageSupplier = () -> new ParameterizedMessage("path: {}, params: {}",
|
||||
channel.request().rawPath(), channel.request().params());
|
||||
|
||||
if (status.getStatus() < 500) {
|
||||
SUPPRESSED_ERROR_LOGGER.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("path: {}, params: {}",
|
||||
channel.request().rawPath(), channel.request().params()), e);
|
||||
SUPPRESSED_ERROR_LOGGER.debug(messageSupplier, e);
|
||||
} else {
|
||||
SUPPRESSED_ERROR_LOGGER.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("path: {}, params: {}",
|
||||
channel.request().rawPath(), channel.request().params()), e);
|
||||
SUPPRESSED_ERROR_LOGGER.warn(messageSupplier, e);
|
||||
}
|
||||
params = channel.request();
|
||||
}
|
||||
ElasticsearchException.renderException(builder, params, e);
|
||||
} else {
|
||||
builder.field("error", simpleMessage(e));
|
||||
}
|
||||
|
||||
XContentBuilder builder = channel.newErrorBuilder().startObject();
|
||||
ElasticsearchException.generateFailureXContent(builder, params, e, channel.detailedErrorsEnabled());
|
||||
builder.field("status", status.getStatus());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
/*
|
||||
* Builds a simple error string from the message of the first ElasticsearchException
|
||||
*/
|
||||
private static String simpleMessage(Throwable t) throws IOException {
|
||||
int counter = 0;
|
||||
Throwable next = t;
|
||||
while (next != null && counter++ < 10) {
|
||||
if (t instanceof ElasticsearchException) {
|
||||
return next.getClass().getSimpleName() + "[" + next.getMessage() + "]";
|
||||
}
|
||||
next = next.getCause();
|
||||
}
|
||||
|
||||
return "No ElasticsearchException found";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,8 +87,7 @@ public class ScriptException extends ElasticsearchException {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
super.innerToXContent(builder, params);
|
||||
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("script_stack", scriptStack);
|
||||
builder.field("script", script);
|
||||
builder.field("lang", lang);
|
||||
|
|
|
@ -607,7 +607,7 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust
|
|||
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
|
||||
builder.prettyPrint();
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, ToXContent.EMPTY_PARAMS, e);
|
||||
ElasticsearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, e);
|
||||
builder.endObject();
|
||||
logger.warn("failed to load/compile script [{}]: {}", scriptNameExt.v1(), builder.string());
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -72,12 +72,11 @@ public class SearchParseException extends SearchContextException {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (lineNumber != UNKNOWN_POSITION) {
|
||||
builder.field("line", lineNumber);
|
||||
builder.field("col", columnNumber);
|
||||
}
|
||||
super.innerToXContent(builder, params);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
/**
|
||||
* A task that can be canceled
|
||||
*/
|
||||
public class CancellableTask extends Task {
|
||||
public abstract class CancellableTask extends Task {
|
||||
|
||||
private final AtomicReference<String> reason = new AtomicReference<>();
|
||||
|
||||
|
@ -51,6 +51,11 @@ public class CancellableTask extends Task {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this task should can potentially have children that needs to be cancelled when the parent is cancelled.
|
||||
*/
|
||||
public abstract boolean shouldCancelChildrenOnCancellation();
|
||||
|
||||
public boolean isCancelled() {
|
||||
return reason.get() != null;
|
||||
}
|
||||
|
|
|
@ -125,15 +125,18 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
/**
|
||||
* Cancels a task
|
||||
* <p>
|
||||
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
|
||||
* Returns true if cancellation was started successful, null otherwise.
|
||||
*
|
||||
* After starting cancellation on the parent task, the task manager tries to cancel all children tasks
|
||||
* of the current task. Once cancellation of the children tasks is done, the listener is triggered.
|
||||
*/
|
||||
public Set<String> cancel(CancellableTask task, String reason, Consumer<Set<String>> listener) {
|
||||
public boolean cancel(CancellableTask task, String reason, Runnable listener) {
|
||||
CancellableTaskHolder holder = cancellableTasks.get(task.getId());
|
||||
if (holder != null) {
|
||||
logger.trace("cancelling task with id {}", task.getId());
|
||||
return holder.cancel(reason, listener);
|
||||
}
|
||||
return null;
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -344,17 +347,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
}
|
||||
}
|
||||
|
||||
public void registerChildTask(Task task, String node) {
|
||||
if (task == null || task instanceof CancellableTask == false) {
|
||||
// We don't have a cancellable task - not much we can do here
|
||||
return;
|
||||
}
|
||||
CancellableTaskHolder holder = cancellableTasks.get(task.getId());
|
||||
if (holder != null) {
|
||||
holder.registerChildTaskNode(node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the calling thread, waiting for the task to vanish from the TaskManager.
|
||||
*/
|
||||
|
@ -378,11 +370,9 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
|
||||
private final CancellableTask task;
|
||||
|
||||
private final Set<String> nodesWithChildTasks = new HashSet<>();
|
||||
|
||||
private volatile String cancellationReason = null;
|
||||
|
||||
private volatile Consumer<Set<String>> cancellationListener = null;
|
||||
private volatile Runnable cancellationListener = null;
|
||||
|
||||
public CancellableTaskHolder(CancellableTask task) {
|
||||
this.task = task;
|
||||
|
@ -391,33 +381,33 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
/**
|
||||
* Marks task as cancelled.
|
||||
* <p>
|
||||
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
|
||||
* Returns true if cancellation was successful, false otherwise.
|
||||
*/
|
||||
public Set<String> cancel(String reason, Consumer<Set<String>> listener) {
|
||||
Set<String> nodes;
|
||||
public boolean cancel(String reason, Runnable listener) {
|
||||
final boolean cancelled;
|
||||
synchronized (this) {
|
||||
assert reason != null;
|
||||
if (cancellationReason == null) {
|
||||
cancellationReason = reason;
|
||||
cancellationListener = listener;
|
||||
nodes = Collections.unmodifiableSet(nodesWithChildTasks);
|
||||
cancelled = true;
|
||||
} else {
|
||||
// Already cancelled by somebody else
|
||||
nodes = null;
|
||||
cancelled = false;
|
||||
}
|
||||
}
|
||||
if (nodes != null) {
|
||||
if (cancelled) {
|
||||
task.cancel(reason);
|
||||
}
|
||||
return nodes;
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks task as cancelled.
|
||||
* <p>
|
||||
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
|
||||
* Returns true if cancellation was successful, false otherwise.
|
||||
*/
|
||||
public Set<String> cancel(String reason) {
|
||||
public boolean cancel(String reason) {
|
||||
return cancel(reason, null);
|
||||
}
|
||||
|
||||
|
@ -425,14 +415,12 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
* Marks task as finished.
|
||||
*/
|
||||
public void finish() {
|
||||
Consumer<Set<String>> listener = null;
|
||||
Set<String> nodes = null;
|
||||
Runnable listener = null;
|
||||
synchronized (this) {
|
||||
if (cancellationReason != null) {
|
||||
// The task was cancelled, we need to notify the listener
|
||||
if (cancellationListener != null) {
|
||||
listener = cancellationListener;
|
||||
nodes = Collections.unmodifiableSet(nodesWithChildTasks);
|
||||
cancellationListener = null;
|
||||
}
|
||||
} else {
|
||||
|
@ -442,7 +430,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
// We need to call the listener outside of the synchronised section to avoid potential bottle necks
|
||||
// in the listener synchronization
|
||||
if (listener != null) {
|
||||
listener.accept(nodes);
|
||||
listener.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -454,14 +442,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
public CancellableTask getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
public synchronized void registerChildTaskNode(String nodeId) {
|
||||
if (cancellationReason == null) {
|
||||
nodesWithChildTasks.add(nodeId);
|
||||
} else {
|
||||
throw new TaskCancelledException("cannot register child task request, the task is already cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -230,7 +230,7 @@ public final class TaskResult implements Writeable, ToXContent {
|
|||
private static BytesReference toXContent(Exception error) throws IOException {
|
||||
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, ToXContent.EMPTY_PARAMS, error);
|
||||
ElasticsearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, error);
|
||||
builder.endObject();
|
||||
return builder.bytes();
|
||||
}
|
||||
|
|
|
@ -515,7 +515,6 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
final TransportResponseHandler<T> handler) {
|
||||
request.setParentTask(localNode.getId(), parentTask.getId());
|
||||
try {
|
||||
taskManager.registerChildTask(parentTask, connection.getNode().getId());
|
||||
sendRequest(connection, action, request, options, handler);
|
||||
} catch (TaskCancelledException ex) {
|
||||
// The parent task is already cancelled - just fail the request
|
||||
|
|
|
@ -253,7 +253,7 @@ public class ESExceptionTests extends ESTestCase {
|
|||
}
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, PARAMS, ex);
|
||||
ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex);
|
||||
builder.endObject();
|
||||
|
||||
String expected = "{\"type\":\"file_not_found_exception\",\"reason\":\"foo not found\"}";
|
||||
|
@ -264,7 +264,7 @@ public class ESExceptionTests extends ESTestCase {
|
|||
ParsingException ex = new ParsingException(1, 2, "foobar", null);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, PARAMS, ex);
|
||||
ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex);
|
||||
builder.endObject();
|
||||
String expected = "{\"type\":\"parsing_exception\",\"reason\":\"foobar\",\"line\":1,\"col\":2}";
|
||||
assertEquals(expected, builder.string());
|
||||
|
@ -274,7 +274,7 @@ public class ESExceptionTests extends ESTestCase {
|
|||
ElasticsearchException ex = new RemoteTransportException("foobar", new FileNotFoundException("foo not found"));
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, PARAMS, ex);
|
||||
ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex);
|
||||
builder.endObject();
|
||||
|
||||
XContentBuilder otherBuilder = XContentFactory.jsonBuilder();
|
||||
|
@ -292,7 +292,7 @@ public class ESExceptionTests extends ESTestCase {
|
|||
ex.addHeader("test_multi", "some value", "another value");
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, PARAMS, ex);
|
||||
ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex);
|
||||
builder.endObject();
|
||||
assertThat(builder.string(), Matchers.anyOf( // iteration order depends on platform
|
||||
equalTo("{\"type\":\"parsing_exception\",\"reason\":\"foobar\",\"line\":1,\"col\":2,\"header\":{\"test_multi\":[\"some value\",\"another value\"],\"test\":\"some value\"}}"),
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
|
@ -67,7 +68,9 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
// in the JSON. Since the stack can be large, it only checks the beginning of the JSON.
|
||||
assertExceptionAsJson(e, true, startsWith("{\"type\":\"exception\",\"reason\":\"foo\"," +
|
||||
"\"caused_by\":{\"type\":\"illegal_state_exception\",\"reason\":\"bar\"," +
|
||||
"\"stack_trace\":\"java.lang.IllegalStateException: bar"));
|
||||
"\"stack_trace\":\"java.lang.IllegalStateException: bar" +
|
||||
(Constants.WINDOWS ? "\\r\\n" : "\\n") +
|
||||
"\\tat org.elasticsearch."));
|
||||
}
|
||||
|
||||
public void testToXContentWithHeaders() throws IOException {
|
||||
|
|
|
@ -91,7 +91,12 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,7 +131,12 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,11 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
|
|||
super(id, type, action, description, parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isBlocked() {
|
||||
return blocked;
|
||||
}
|
||||
|
@ -242,7 +247,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
|
|||
import java.util.HashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -307,4 +308,31 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|||
.build();
|
||||
assertFalse(client().admin().indices().prepareCreate("test-idx-3").setSettings(settings).setTimeout("100ms").get().isShardsAcked());
|
||||
}
|
||||
|
||||
public void testInvalidPartitionSize() {
|
||||
BiFunction<Integer, Integer, Boolean> createPartitionedIndex = (shards, partitionSize) -> {
|
||||
CreateIndexResponse response;
|
||||
|
||||
try {
|
||||
response = prepareCreate("test_" + shards + "_" + partitionSize)
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", shards)
|
||||
.put("index.routing_partition_size", partitionSize))
|
||||
.execute().actionGet();
|
||||
} catch (IllegalStateException | IllegalArgumentException e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return response.isAcknowledged();
|
||||
};
|
||||
|
||||
assertFalse(createPartitionedIndex.apply(3, 6));
|
||||
assertFalse(createPartitionedIndex.apply(3, 0));
|
||||
assertFalse(createPartitionedIndex.apply(3, 3));
|
||||
|
||||
assertTrue(createPartitionedIndex.apply(3, 1));
|
||||
assertTrue(createPartitionedIndex.apply(3, 2));
|
||||
|
||||
assertTrue(createPartitionedIndex.apply(1, 1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1163,7 +1163,8 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public void execute() throws Exception {
|
||||
this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<>(null, new Response()));
|
||||
// Using the diamond operator (<>) prevents Eclipse from being able to compile this code
|
||||
this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<Request, Response>(null, new Response()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -144,5 +146,15 @@ public class FilteringAllocationIT extends ESIntegTestCase {
|
|||
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
assertThat(clusterState.routingTable().index("test").numberOfNodesShardsAreAllocatedOn(), equalTo(2));
|
||||
}
|
||||
|
||||
public void testInvalidIPFilterClusterSettings() {
|
||||
String ipKey = randomFrom("_ip", "_host_ip", "_publish_ip");
|
||||
Setting<Settings> filterSetting = randomFrom(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,
|
||||
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING);
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().put(filterSetting.getKey() + ipKey, "192.168.1.1."))
|
||||
.execute().actionGet());
|
||||
assertEquals("invalid IP address [192.168.1.1.] for [" + ipKey + "]", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -245,6 +245,17 @@ public class DiscoveryNodeFiltersTests extends ESTestCase {
|
|||
assertThat(filters.match(node), equalTo(true));
|
||||
}
|
||||
|
||||
public void testCommaSeparatedValuesTrimmed() {
|
||||
DiscoveryNode node = new DiscoveryNode("", "", "", "", "192.1.1.54", localAddress, singletonMap("tag", "B"), emptySet(), null);
|
||||
|
||||
Settings settings = shuffleSettings(Settings.builder()
|
||||
.put("xxx." + randomFrom("_ip", "_host_ip", "_publish_ip"), "192.1.1.1, 192.1.1.54")
|
||||
.put("xxx.tag", "A, B")
|
||||
.build());
|
||||
DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings);
|
||||
assertTrue(filters.match(node));
|
||||
}
|
||||
|
||||
private Settings shuffleSettings(Settings source) {
|
||||
Settings.Builder settings = Settings.builder();
|
||||
List<String> keys = new ArrayList<>(source.getAsMap().keySet());
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -35,6 +34,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -43,8 +43,8 @@ import java.util.TreeMap;
|
|||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.object.HasToString.hasToString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.object.HasToString.hasToString;
|
||||
|
||||
public class OperationRoutingTests extends ESTestCase{
|
||||
|
||||
|
@ -75,6 +75,142 @@ public class OperationRoutingTests extends ESTestCase{
|
|||
}
|
||||
}
|
||||
|
||||
public void testPartitionedIndex() {
|
||||
// make sure the same routing value always has each _id fall within the configured partition size
|
||||
for (int shards = 1; shards < 5; shards++) {
|
||||
for (int partitionSize = 1; partitionSize == 1 || partitionSize < shards; partitionSize++) {
|
||||
IndexMetaData metaData = IndexMetaData.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(shards)
|
||||
.routingPartitionSize(partitionSize)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
String routing = randomUnicodeOfLengthBetween(1, 50);
|
||||
Set<Integer> shardSet = new HashSet<>();
|
||||
|
||||
for (int k = 0; k < 150; k++) {
|
||||
String id = randomUnicodeOfLengthBetween(1, 50);
|
||||
|
||||
shardSet.add(OperationRouting.generateShardId(metaData, id, routing));
|
||||
}
|
||||
|
||||
assertEquals(partitionSize, shardSet.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testPartitionedIndexShrunk() {
|
||||
Map<String, Map<String, Integer>> routingIdToShard = new HashMap<>();
|
||||
|
||||
Map<String, Integer> routingA = new HashMap<>();
|
||||
routingA.put("a_0", 1);
|
||||
routingA.put("a_1", 2);
|
||||
routingA.put("a_2", 2);
|
||||
routingA.put("a_3", 2);
|
||||
routingA.put("a_4", 1);
|
||||
routingA.put("a_5", 2);
|
||||
routingIdToShard.put("a", routingA);
|
||||
|
||||
Map<String, Integer> routingB = new HashMap<>();
|
||||
routingB.put("b_0", 0);
|
||||
routingB.put("b_1", 0);
|
||||
routingB.put("b_2", 0);
|
||||
routingB.put("b_3", 0);
|
||||
routingB.put("b_4", 3);
|
||||
routingB.put("b_5", 3);
|
||||
routingIdToShard.put("b", routingB);
|
||||
|
||||
Map<String, Integer> routingC = new HashMap<>();
|
||||
routingC.put("c_0", 1);
|
||||
routingC.put("c_1", 1);
|
||||
routingC.put("c_2", 0);
|
||||
routingC.put("c_3", 0);
|
||||
routingC.put("c_4", 0);
|
||||
routingC.put("c_5", 1);
|
||||
routingIdToShard.put("c", routingC);
|
||||
|
||||
Map<String, Integer> routingD = new HashMap<>();
|
||||
routingD.put("d_0", 2);
|
||||
routingD.put("d_1", 2);
|
||||
routingD.put("d_2", 3);
|
||||
routingD.put("d_3", 3);
|
||||
routingD.put("d_4", 3);
|
||||
routingD.put("d_5", 3);
|
||||
routingIdToShard.put("d", routingD);
|
||||
|
||||
IndexMetaData metaData = IndexMetaData.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.setRoutingNumShards(8)
|
||||
.numberOfShards(4)
|
||||
.routingPartitionSize(3)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
|
||||
for (Map.Entry<String, Map<String, Integer>> routingIdEntry : routingIdToShard.entrySet()) {
|
||||
String routing = routingIdEntry.getKey();
|
||||
|
||||
for (Map.Entry<String, Integer> idEntry : routingIdEntry.getValue().entrySet()) {
|
||||
String id = idEntry.getKey();
|
||||
int shard = idEntry.getValue();
|
||||
|
||||
assertEquals(shard, OperationRouting.generateShardId(metaData, id, routing));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testPartitionedIndexBWC() {
|
||||
Map<String, Map<String, Integer>> routingIdToShard = new HashMap<>();
|
||||
|
||||
Map<String, Integer> routingA = new HashMap<>();
|
||||
routingA.put("a_0", 3);
|
||||
routingA.put("a_1", 2);
|
||||
routingA.put("a_2", 2);
|
||||
routingA.put("a_3", 3);
|
||||
routingIdToShard.put("a", routingA);
|
||||
|
||||
Map<String, Integer> routingB = new HashMap<>();
|
||||
routingB.put("b_0", 5);
|
||||
routingB.put("b_1", 0);
|
||||
routingB.put("b_2", 0);
|
||||
routingB.put("b_3", 0);
|
||||
routingIdToShard.put("b", routingB);
|
||||
|
||||
Map<String, Integer> routingC = new HashMap<>();
|
||||
routingC.put("c_0", 4);
|
||||
routingC.put("c_1", 4);
|
||||
routingC.put("c_2", 3);
|
||||
routingC.put("c_3", 4);
|
||||
routingIdToShard.put("c", routingC);
|
||||
|
||||
Map<String, Integer> routingD = new HashMap<>();
|
||||
routingD.put("d_0", 3);
|
||||
routingD.put("d_1", 4);
|
||||
routingD.put("d_2", 4);
|
||||
routingD.put("d_3", 4);
|
||||
routingIdToShard.put("d", routingD);
|
||||
|
||||
IndexMetaData metaData = IndexMetaData.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(6)
|
||||
.routingPartitionSize(2)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
|
||||
for (Map.Entry<String, Map<String, Integer>> routingIdEntry : routingIdToShard.entrySet()) {
|
||||
String routing = routingIdEntry.getKey();
|
||||
|
||||
for (Map.Entry<String, Integer> idEntry : routingIdEntry.getValue().entrySet()) {
|
||||
String id = idEntry.getKey();
|
||||
int shard = idEntry.getValue();
|
||||
|
||||
assertEquals(shard, OperationRouting.generateShardId(metaData, id, routing));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that all changes to the hash-function / shard selection are BWC
|
||||
*/
|
||||
|
|
|
@ -32,10 +32,14 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||
|
@ -803,4 +807,50 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(4)); // +1 for relocating shard.
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); // Still 1 unassigned.
|
||||
}
|
||||
|
||||
public void testMultipleAwarenessAttributes() {
|
||||
AllocationService strategy = createAllocationService(Settings.builder()
|
||||
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone, rack")
|
||||
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a, b")
|
||||
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "rack.values", "c, d")
|
||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||
.build());
|
||||
|
||||
logger.info("Building initial routing table for 'testUnbalancedZones'");
|
||||
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
|
||||
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(
|
||||
org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)
|
||||
).metaData(metaData).routingTable(initialRoutingTable).build();
|
||||
|
||||
logger.info("--> adding two nodes in different zones and do rerouting");
|
||||
Map<String, String> nodeAAttributes = new HashMap<>();
|
||||
nodeAAttributes.put("zone", "a");
|
||||
nodeAAttributes.put("rack", "c");
|
||||
Map<String, String> nodeBAttributes = new HashMap<>();
|
||||
nodeBAttributes.put("zone", "b");
|
||||
nodeBAttributes.put("rack", "d");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("A-0", nodeAAttributes))
|
||||
.add(newNode("B-0", nodeBAttributes))
|
||||
).build();
|
||||
clusterState = strategy.reroute(clusterState, "reroute");
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0));
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||
|
||||
logger.info("--> start the shards (primaries)");
|
||||
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||
|
||||
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
logger.info("--> all replicas are allocated and started since we have one node in each zone and rack");
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDeci
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
@ -191,4 +193,16 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
|||
.build();
|
||||
return service.reroute(clusterState, "reroute", false);
|
||||
}
|
||||
|
||||
public void testInvalidIPFilter() {
|
||||
String ipKey = randomFrom("_ip", "_host_ip", "_publish_ip");
|
||||
Setting<Settings> filterSetting = randomFrom(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING,
|
||||
IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING);
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
|
||||
IndexScopedSettings indexScopedSettings = new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
|
||||
indexScopedSettings.updateDynamicSettings(Settings.builder().put(filterSetting.getKey() + ipKey, "192..168.1.1").build(),
|
||||
Settings.builder().put(Settings.EMPTY), Settings.builder(), "test ip validation");
|
||||
});
|
||||
assertEquals("invalid IP address [192..168.1.1] for [" + ipKey + "]", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -584,6 +584,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
}
|
||||
ensureGreen("test");
|
||||
|
||||
// make sure all nodes have the updated cluster state with the latest routing table
|
||||
final long clusterStateVersionOnMaster = internalCluster().clusterService(internalCluster().getMasterName())
|
||||
.state().getVersion();
|
||||
assertBusy(() -> {
|
||||
for (String node : nodes) {
|
||||
assertThat(internalCluster().clusterService(node).state().getVersion(),
|
||||
greaterThanOrEqualTo(clusterStateVersionOnMaster));
|
||||
}
|
||||
});
|
||||
|
||||
logger.info("validating successful docs");
|
||||
for (String node : nodes) {
|
||||
try {
|
||||
|
|
|
@ -234,4 +234,37 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
|
|||
MergeReason.MAPPING_UPDATE, random().nextBoolean()));
|
||||
assertThat(e.getMessage(), containsString("[_all] is disabled in 6.0"));
|
||||
}
|
||||
|
||||
public void testPartitionedConstraints() {
|
||||
// partitioned index must have routing
|
||||
IllegalArgumentException noRoutingException = expectThrows(IllegalArgumentException.class, () -> {
|
||||
client().admin().indices().prepareCreate("test-index")
|
||||
.addMapping("type", "{\"type\":{}}")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", 4)
|
||||
.put("index.routing_partition_size", 2))
|
||||
.execute().actionGet();
|
||||
});
|
||||
assertTrue(noRoutingException.getMessage(), noRoutingException.getMessage().contains("must have routing"));
|
||||
|
||||
// partitioned index cannot have parent/child relationships
|
||||
IllegalArgumentException parentException = expectThrows(IllegalArgumentException.class, () -> {
|
||||
client().admin().indices().prepareCreate("test-index")
|
||||
.addMapping("parent", "{\"parent\":{\"_routing\":{\"required\":true}}}")
|
||||
.addMapping("child", "{\"child\": {\"_routing\":{\"required\":true}, \"_parent\": {\"type\": \"parent\"}}}")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", 4)
|
||||
.put("index.routing_partition_size", 2))
|
||||
.execute().actionGet();
|
||||
});
|
||||
assertTrue(parentException.getMessage(), parentException.getMessage().contains("cannot have a _parent field"));
|
||||
|
||||
// valid partitioned index
|
||||
assertTrue(client().admin().indices().prepareCreate("test-index")
|
||||
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", 4)
|
||||
.put("index.routing_partition_size", 2))
|
||||
.execute().actionGet().isAcknowledged());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|||
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import org.junit.After;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -306,7 +305,6 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase {
|
|||
"get template with " + Arrays.toString(names));
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/8802")
|
||||
public void testBrokenMapping() throws Exception {
|
||||
// clean all templates setup by the framework.
|
||||
client().admin().indices().prepareDeleteTemplate("*").get();
|
||||
|
@ -764,4 +762,63 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase {
|
|||
assertEquals("value1", searchResponse.getHits().getAt(0).field("field1").value().toString());
|
||||
assertNull(searchResponse.getHits().getAt(0).field("field2"));
|
||||
}
|
||||
|
||||
public void testPartitionedTemplate() throws Exception {
|
||||
// clean all templates setup by the framework.
|
||||
client().admin().indices().prepareDeleteTemplate("*").get();
|
||||
|
||||
// check get all templates on an empty index.
|
||||
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get();
|
||||
assertThat(response.getIndexTemplates(), empty());
|
||||
|
||||
// provide more partitions than shards
|
||||
IllegalArgumentException eBadSettings = expectThrows(IllegalArgumentException.class,
|
||||
() -> client().admin().indices().preparePutTemplate("template_1")
|
||||
.setPatterns(Collections.singletonList("te*"))
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", "5")
|
||||
.put("index.routing_partition_size", "6"))
|
||||
.get());
|
||||
assertThat(eBadSettings.getMessage(), containsString("partition size [6] should be a positive number "
|
||||
+ "less than the number of shards [5]"));
|
||||
|
||||
// provide an invalid mapping for a partitioned index
|
||||
IllegalArgumentException eBadMapping = expectThrows(IllegalArgumentException.class,
|
||||
() -> client().admin().indices().preparePutTemplate("template_2")
|
||||
.setPatterns(Collections.singletonList("te*"))
|
||||
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":false}}}")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", "6")
|
||||
.put("index.routing_partition_size", "3"))
|
||||
.get());
|
||||
assertThat(eBadMapping.getMessage(), containsString("must have routing required for partitioned index"));
|
||||
|
||||
// no templates yet
|
||||
response = client().admin().indices().prepareGetTemplates().get();
|
||||
assertEquals(0, response.getIndexTemplates().size());
|
||||
|
||||
// a valid configuration that only provides the partition size
|
||||
assertAcked(client().admin().indices().preparePutTemplate("just_partitions")
|
||||
.setPatterns(Collections.singletonList("te*"))
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing_partition_size", "6"))
|
||||
.get());
|
||||
|
||||
// create an index with too few shards
|
||||
IllegalArgumentException eBadIndex = expectThrows(IllegalArgumentException.class,
|
||||
() -> prepareCreate("test_bad", Settings.builder()
|
||||
.put("index.number_of_shards", 5))
|
||||
.get());
|
||||
|
||||
assertThat(eBadIndex.getMessage(), containsString("partition size [6] should be a positive number "
|
||||
+ "less than the number of shards [5]"));
|
||||
|
||||
// finally, create a valid index
|
||||
prepareCreate("test_good", Settings.builder()
|
||||
.put("index.number_of_shards", 7))
|
||||
.get();
|
||||
|
||||
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test_good").get();
|
||||
assertEquals("6", getSettingsResponse.getIndexToSettings().get("test_good").getAsMap().get("index.routing_partition_size"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.routing;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.mockito.internal.util.collections.Sets;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class PartitionedRoutingIT extends ESIntegTestCase {
|
||||
|
||||
public void testVariousPartitionSizes() throws Exception {
|
||||
for (int shards = 1; shards <= 4; shards++) {
|
||||
for (int partitionSize = 1; partitionSize < shards; partitionSize++) {
|
||||
String index = "index_" + shards + "_" + partitionSize;
|
||||
|
||||
client().admin().indices().prepareCreate(index)
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", shards)
|
||||
.put("index.routing_partition_size", partitionSize))
|
||||
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}")
|
||||
.execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
Map<String, Set<String>> routingToDocumentIds = generateRoutedDocumentIds(index);
|
||||
|
||||
verifyGets(index, routingToDocumentIds);
|
||||
verifyBroadSearches(index, routingToDocumentIds, shards);
|
||||
verifyRoutedSearches(index, routingToDocumentIds, Sets.newSet(partitionSize));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testShrinking() throws Exception {
|
||||
// creates random routing groups and repeatedly halves the index until it is down to 1 shard
|
||||
// verifying that the count is correct for each shrunken index
|
||||
final int partitionSize = 3;
|
||||
final int originalShards = 8;
|
||||
int currentShards = originalShards;
|
||||
String index = "index_" + currentShards;
|
||||
|
||||
client().admin().indices().prepareCreate(index)
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", currentShards)
|
||||
.put("index.routing_partition_size", partitionSize))
|
||||
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}")
|
||||
.execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
Map<String, Set<String>> routingToDocumentIds = generateRoutedDocumentIds(index);
|
||||
|
||||
while (true) {
|
||||
int factor = originalShards / currentShards;
|
||||
|
||||
verifyGets(index, routingToDocumentIds);
|
||||
verifyBroadSearches(index, routingToDocumentIds, currentShards);
|
||||
|
||||
// we need the floor and ceiling of the routing_partition_size / factor since the partition size of the shrunken
|
||||
// index will be one of those, depending on the routing value
|
||||
verifyRoutedSearches(index, routingToDocumentIds,
|
||||
Math.floorDiv(partitionSize, factor) == 0 ?
|
||||
Sets.newSet(1, 2) :
|
||||
Sets.newSet(Math.floorDiv(partitionSize, factor), -Math.floorDiv(-partitionSize, factor)));
|
||||
|
||||
client().admin().indices().prepareUpdateSettings(index)
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes().values().toArray(DiscoveryNode.class)[0].getName())
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
|
||||
currentShards = Math.floorDiv(currentShards, 2);
|
||||
|
||||
if (currentShards == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
String previousIndex = index;
|
||||
index = "index_" + currentShards;
|
||||
|
||||
logger.info("--> shrinking index [" + previousIndex + "] to [" + index + "]");
|
||||
client().admin().indices().prepareShrinkIndex(previousIndex, index)
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", currentShards)
|
||||
.build()).get();
|
||||
ensureGreen();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyRoutedSearches(String index, Map<String, Set<String>> routingToDocumentIds, Set<Integer> expectedShards) {
|
||||
for (Map.Entry<String, Set<String>> routingEntry : routingToDocumentIds.entrySet()) {
|
||||
String routing = routingEntry.getKey();
|
||||
int expectedDocuments = routingEntry.getValue().size();
|
||||
|
||||
SearchResponse response = client().prepareSearch()
|
||||
.setQuery(QueryBuilders.termQuery("_routing", routing))
|
||||
.setRouting(routing)
|
||||
.setIndices(index)
|
||||
.setSize(100)
|
||||
.execute().actionGet();
|
||||
|
||||
logger.info("--> routed search on index [" + index + "] visited [" + response.getTotalShards()
|
||||
+ "] shards for routing [" + routing + "] and got hits [" + response.getHits().totalHits() + "]");
|
||||
|
||||
assertTrue(response.getTotalShards() + " was not in " + expectedShards + " for " + index,
|
||||
expectedShards.contains(response.getTotalShards()));
|
||||
assertEquals(expectedDocuments, response.getHits().totalHits());
|
||||
|
||||
Set<String> found = new HashSet<>();
|
||||
response.getHits().forEach(h -> found.add(h.getId()));
|
||||
|
||||
assertEquals(routingEntry.getValue(), found);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyBroadSearches(String index, Map<String, Set<String>> routingToDocumentIds, int expectedShards) {
|
||||
for (Map.Entry<String, Set<String>> routingEntry : routingToDocumentIds.entrySet()) {
|
||||
String routing = routingEntry.getKey();
|
||||
int expectedDocuments = routingEntry.getValue().size();
|
||||
|
||||
SearchResponse response = client().prepareSearch()
|
||||
.setQuery(QueryBuilders.termQuery("_routing", routing))
|
||||
.setIndices(index)
|
||||
.setSize(100)
|
||||
.execute().actionGet();
|
||||
|
||||
assertEquals(expectedShards, response.getTotalShards());
|
||||
assertEquals(expectedDocuments, response.getHits().totalHits());
|
||||
|
||||
Set<String> found = new HashSet<>();
|
||||
response.getHits().forEach(h -> found.add(h.getId()));
|
||||
|
||||
assertEquals(routingEntry.getValue(), found);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyGets(String index, Map<String, Set<String>> routingToDocumentIds) {
|
||||
for (Map.Entry<String, Set<String>> routingEntry : routingToDocumentIds.entrySet()) {
|
||||
String routing = routingEntry.getKey();
|
||||
|
||||
for (String id : routingEntry.getValue()) {
|
||||
assertTrue(client().prepareGet(index, "type", id).setRouting(routing).execute().actionGet().isExists());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Set<String>> generateRoutedDocumentIds(String index) {
|
||||
Map<String, Set<String>> routingToDocumentIds = new HashMap<>();
|
||||
int numRoutingValues = randomIntBetween(5, 15);
|
||||
|
||||
for (int i = 0; i < numRoutingValues; i++) {
|
||||
String routingValue = String.valueOf(i);
|
||||
int numDocuments = randomIntBetween(10, 100);
|
||||
routingToDocumentIds.put(String.valueOf(routingValue), new HashSet<>());
|
||||
|
||||
for (int k = 0; k < numDocuments; k++) {
|
||||
String id = routingValue + "_" + String.valueOf(k);
|
||||
routingToDocumentIds.get(routingValue).add(id);
|
||||
|
||||
client().prepareIndex(index, "type", id)
|
||||
.setRouting(routingValue)
|
||||
.setSource("foo", "bar")
|
||||
.get();
|
||||
}
|
||||
}
|
||||
|
||||
client().admin().indices().prepareRefresh(index).get();
|
||||
|
||||
return routingToDocumentIds;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
|
|||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.compress.NotXContentException;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -51,6 +52,7 @@ import org.elasticsearch.env.Environment;
|
|||
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
|
@ -162,6 +164,33 @@ public class MockRepository extends FsRepository {
|
|||
blockOnDataFiles = blocked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
final int numIterations = 5;
|
||||
int count = 0;
|
||||
NotXContentException ex = null;
|
||||
RepositoryData repositoryData = null;
|
||||
while (count < numIterations) {
|
||||
try {
|
||||
repositoryData = super.getRepositoryData();
|
||||
} catch (NotXContentException e) {
|
||||
ex = e;
|
||||
}
|
||||
if (repositoryData != null) {
|
||||
break;
|
||||
}
|
||||
count++;
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (ex != null) {
|
||||
throw ex;
|
||||
}
|
||||
return repositoryData;
|
||||
}
|
||||
|
||||
public void blockOnControlFiles(boolean blocked) {
|
||||
blockOnControlFiles = blocked;
|
||||
}
|
||||
|
|
|
@ -216,6 +216,7 @@ And the response:
|
|||
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
|
||||
1475247709 17:01:49 elasticsearch green 1 1 0 0 0 0 0 0 - 100.0%
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/0 0/0 [01]/]
|
||||
// TESTRESPONSE[s/1475247709 17:01:49 elasticsearch/\\d+ \\d+:\\d+:\\d+ docs_integTest/ _cat]
|
||||
|
||||
We can see that our cluster named "elasticsearch" is up with a green status.
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
[partintro]
|
||||
--
|
||||
Elasticsearch ships with defaults which are intended to give a good out of
|
||||
the box experience. Full text search, highlighting, aggregations, indexing
|
||||
the box experience. Full text search, highlighting, aggregations, and indexing
|
||||
should all just work without the user having to change anything.
|
||||
|
||||
Once you better understand how you want to use Elasticsearch, however,
|
||||
|
|
|
@ -17,17 +17,17 @@ it is advisable to avoid going beyond a couple tens of megabytes per request
|
|||
even if larger requests seem to perform better.
|
||||
|
||||
[float]
|
||||
=== Use multiple workers/threads to send data to elasticsearch
|
||||
=== Use multiple workers/threads to send data to Elasticsearch
|
||||
|
||||
A single thread sending bulk requests is unlikely to be able to max out the
|
||||
indexing capacity of an elasticsearch cluster. In order to use all resources
|
||||
indexing capacity of an Elasticsearch cluster. In order to use all resources
|
||||
of the cluster, you should send data from multiple threads or processes. In
|
||||
addition to making better use of the resources of the cluster, this should
|
||||
help reduce the cost of each fsync.
|
||||
|
||||
Make sure to watch for `TOO_MANY_REQUESTS (429)` response codes
|
||||
(`EsRejectedExecutionException` with the Java client), which is the way that
|
||||
elasticsearch tells you that it cannot keep up with the current indexing rate.
|
||||
Elasticsearch tells you that it cannot keep up with the current indexing rate.
|
||||
When it happens, you should pause indexing a bit before trying again, ideally
|
||||
with randomized exponential backoff.
|
||||
|
||||
|
@ -39,7 +39,7 @@ number of workers until either I/O or CPU is saturated on the cluster.
|
|||
=== Increase the refresh interval
|
||||
|
||||
The default <<dynamic-index-settings,`index.refresh_interval`>> is `1s`, which
|
||||
forces elasticsearch to create a new segment every second.
|
||||
forces Elasticsearch to create a new segment every second.
|
||||
Increasing this value (to say, `30s`) will allow larger segments to flush and
|
||||
decreases future merge pressure.
|
||||
|
||||
|
|
|
@ -169,9 +169,9 @@ the query need to be matched exactly while other parts should still take
|
|||
stemming into account?
|
||||
|
||||
Fortunately, the `query_string` and `simple_query_string` queries have a feature
|
||||
that allows to solve exactly this problem: `quote_field_suffix`. It allows to
|
||||
tell Elasticsearch that words that appear in between quotes should be redirected
|
||||
to a different field, see below:
|
||||
that solve this exact problem: `quote_field_suffix`. This tell Elasticsearch
|
||||
that the words that appear in between quotes are to be redirected to a different
|
||||
field, see below:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
|
@ -218,7 +218,7 @@ GET index/_search
|
|||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took": 2,/"took": "$body.took",/]
|
||||
|
||||
In that case, since `ski` was in-between quotes, it was searched on the
|
||||
In the above case, since `ski` was in-between quotes, it was searched on the
|
||||
`body.exact` field due to the `quote_field_suffix` parameter, so only document
|
||||
`1` matched. This allows users to mix exact search with stemmed search as they
|
||||
like.
|
||||
|
|
|
@ -79,6 +79,13 @@ Checking shards may take a lot of time on large indices.
|
|||
which uses https://en.wikipedia.org/wiki/DEFLATE[DEFLATE] for a higher
|
||||
compression ratio, at the expense of slower stored fields performance.
|
||||
|
||||
[[routing-partition-size]] `index.routing_partition_size`::
|
||||
|
||||
The number of shards a custom <<mapping-routing-field,routing>> value can go to.
|
||||
Defaults to 1 and can only be set at index creation time. This value must be less
|
||||
than the `index.number_of_shards` unless the `index.number_of_shards` value is also 1.
|
||||
See <<routing-index-partition>> for more details about how this setting is used.
|
||||
|
||||
[float]
|
||||
[[dynamic-index-settings]]
|
||||
=== Dynamic index settings
|
||||
|
|
|
@ -109,3 +109,29 @@ documents with the same `_id` might end up on different shards if indexed with
|
|||
different `_routing` values.
|
||||
|
||||
It is up to the user to ensure that IDs are unique across the index.
|
||||
|
||||
[[routing-index-partition]]
|
||||
==== Routing to an index partition
|
||||
|
||||
An index can be configured such that custom routing values will go to a subset of the shards rather
|
||||
than a single shard. This helps mitigate the risk of ending up with an imbalanced cluster while still
|
||||
reducing the impact of searches.
|
||||
|
||||
This is done by providing the index level setting <<routing-partition-size,`index.routing_partition_size`>> at index creation.
|
||||
As the partition size increases, the more evenly distributed the data will become at the
|
||||
expense of having to search more shards per request.
|
||||
|
||||
When this setting is present, the formula for calculating the shard becomes:
|
||||
|
||||
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
|
||||
|
||||
That is, the `_routing` field is used to calculate a set of shards within the index and then the
|
||||
`_id` is used to pick a shard within that set.
|
||||
|
||||
To enable this feature, the `index.routing_partition_size` should have a value greater than 1 and
|
||||
less than `index.number_of_shards`.
|
||||
|
||||
Once enabled, the partitioned index will have the following limitations:
|
||||
|
||||
* Mappings with parent-child relationships cannot be created within it.
|
||||
* All mappings within the index must have the `_routing` field marked as required.
|
|
@ -4,3 +4,4 @@
|
|||
The `index` option controls whether field values are indexed. It accepts `true`
|
||||
or `false`. Fields that are not indexed are not queryable.
|
||||
|
||||
NOTE: For the legacy mapping type <<string,`string`>> the `index` option only accepts legacy values `analyzed` (default, treat as full-text field), `not_analyzed` (treat as keyword field) and `no`.
|
||||
|
|
|
@ -12,7 +12,7 @@ to use `text` or `keyword`.
|
|||
|
||||
Indexes imported from 2.x *only* support `string` and not `text` or `keyword`.
|
||||
To ease the migration from 2.x Elasticsearch will downgrade `text` and `keyword`
|
||||
mappings applied to indexes imported to 2.x into `string`. While long lived
|
||||
mappings applied to indexes imported from 2.x into `string`. While long lived
|
||||
indexes will eventually need to be recreated against 5.x before eventually
|
||||
upgrading to 6.x, this downgrading smooths the process before you find time for
|
||||
it.
|
||||
|
|
|
@ -152,7 +152,7 @@ public class MultiSearchTemplateResponse extends ActionResponse implements Itera
|
|||
for (Item item : items) {
|
||||
if (item.isFailure()) {
|
||||
builder.startObject();
|
||||
ElasticsearchException.renderException(builder, params, item.getFailure());
|
||||
ElasticsearchException.generateFailureXContent(builder, params, item.getFailure(), true);
|
||||
builder.endObject();
|
||||
} else {
|
||||
item.getResponse().toXContent(builder, params);
|
||||
|
|
|
@ -71,6 +71,11 @@ public abstract class BulkByScrollTask extends CancellableTask {
|
|||
*/
|
||||
public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List<TaskInfo> sliceInfo);
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status, SuccessfullyProcessed {
|
||||
public static final String NAME = "bulk-by-scroll";
|
||||
|
||||
|
@ -486,7 +491,7 @@ public abstract class BulkByScrollTask extends CancellableTask {
|
|||
status.toXContent(builder, params);
|
||||
} else {
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, params, exception);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, exception);
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
|
|
|
@ -47,9 +47,6 @@ public class ReindexParallelizationHelper {
|
|||
r -> task.onSliceResponse(listener, slice.source().slice().getId(), r),
|
||||
e -> task.onSliceFailure(listener, slice.source().slice().getId(), e));
|
||||
client.execute(action, requestForSlice, sliceListener);
|
||||
/* Explicitly tell the task manager that we're running child tasks on the local node so it will cancel them when the parent is
|
||||
* cancelled. */
|
||||
taskManager.registerChildTask(task, localNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -348,7 +348,7 @@ public abstract class ScrollableHitSource implements Closeable {
|
|||
builder.field("reason");
|
||||
{
|
||||
builder.startObject();
|
||||
ElasticsearchException.toXContent(builder, params, reason);
|
||||
ElasticsearchException.generateThrowableXContent(builder, params, reason);
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
|
|
|
@ -604,7 +604,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
* that good stuff.
|
||||
*/
|
||||
if (delay.nanos() > 0) {
|
||||
generic().execute(() -> taskManager.cancel(testTask, reason, (Set<String> s) -> {}));
|
||||
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
|
||||
}
|
||||
return super.schedule(delay, name, command);
|
||||
}
|
||||
|
@ -637,7 +637,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
action.setScroll(scrollId());
|
||||
}
|
||||
String reason = randomSimpleString(random());
|
||||
taskManager.cancel(testTask, reason, (Set<String> s) -> {});
|
||||
taskManager.cancel(testTask, reason, () -> {});
|
||||
testMe.accept(action);
|
||||
assertEquals(reason, listener.get().getReasonCancelled());
|
||||
if (previousScrollSet) {
|
||||
|
|
|
@ -406,14 +406,14 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
if (SETTING_CORS_ALLOW_CREDENTIALS.get(settings)) {
|
||||
builder.allowCredentials();
|
||||
}
|
||||
String[] strMethods = Strings.splitStringByCommaToArray(SETTING_CORS_ALLOW_METHODS.get(settings));
|
||||
String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ",");
|
||||
HttpMethod[] methods = Arrays.asList(strMethods)
|
||||
.stream()
|
||||
.map(HttpMethod::valueOf)
|
||||
.toArray(size -> new HttpMethod[size]);
|
||||
return builder.allowedRequestMethods(methods)
|
||||
.maxAge(SETTING_CORS_MAX_AGE.get(settings))
|
||||
.allowedRequestHeaders(Strings.splitStringByCommaToArray(SETTING_CORS_ALLOW_HEADERS.get(settings)))
|
||||
.allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ","))
|
||||
.shortCircuit()
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import java.util.HashSet;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.Strings.collectionToDelimitedString;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS;
|
||||
|
@ -89,11 +90,12 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
|
|||
public void testCorsConfig() {
|
||||
final Set<String> methods = new HashSet<>(Arrays.asList("get", "options", "post"));
|
||||
final Set<String> headers = new HashSet<>(Arrays.asList("Content-Type", "Content-Length"));
|
||||
final String suffix = randomBoolean() ? " " : ""; // sometimes have a leading whitespace between comma delimited elements
|
||||
final Settings settings = Settings.builder()
|
||||
.put(SETTING_CORS_ENABLED.getKey(), true)
|
||||
.put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "*")
|
||||
.put(SETTING_CORS_ALLOW_METHODS.getKey(), Strings.collectionToCommaDelimitedString(methods))
|
||||
.put(SETTING_CORS_ALLOW_HEADERS.getKey(), Strings.collectionToCommaDelimitedString(headers))
|
||||
.put(SETTING_CORS_ALLOW_METHODS.getKey(), collectionToDelimitedString(methods, ",", suffix, ""))
|
||||
.put(SETTING_CORS_ALLOW_HEADERS.getKey(), collectionToDelimitedString(headers, ",", suffix, ""))
|
||||
.put(SETTING_CORS_ALLOW_CREDENTIALS.getKey(), true)
|
||||
.build();
|
||||
final Netty4CorsConfig corsConfig = Netty4HttpServerTransport.buildCorsConfig(settings);
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
{
|
||||
"description": "Parameters that are accepted by all API endpoints.",
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html",
|
||||
"params": {
|
||||
"pretty": {
|
||||
"type": "boolean",
|
||||
"description": "Pretty format the returned JSON response.",
|
||||
"default": false
|
||||
},
|
||||
"human": {
|
||||
"type": "boolean",
|
||||
"description": "Return human readable values for statistics.",
|
||||
"default": true
|
||||
},
|
||||
"error_trace": {
|
||||
"type": "boolean",
|
||||
"description": "Include the stack trace of returned errors.",
|
||||
"default": false
|
||||
},
|
||||
"source": {
|
||||
"type": "string",
|
||||
"description": "The URL-encoded request definition. Useful for libraries that do not accept a request body for non-POST requests."
|
||||
},
|
||||
"filter_path": {
|
||||
"type": "string",
|
||||
"description": "A comma-separated list of filters used to reduce the respone."
|
||||
}
|
||||
}
|
||||
}
|
|
@ -138,4 +138,41 @@ public class NodeTests extends ESTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testNodeAttributes() throws IOException {
|
||||
String attr = randomAsciiOfLength(5);
|
||||
Settings.Builder settings = baseSettings().put(Node.NODE_ATTRIBUTES.getKey() + "test_attr", attr);
|
||||
try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) {
|
||||
final Settings nodeSettings = randomBoolean() ? node.settings() : node.getEnvironment().settings();
|
||||
assertEquals(attr, Node.NODE_ATTRIBUTES.get(nodeSettings).getAsMap().get("test_attr"));
|
||||
}
|
||||
|
||||
// leading whitespace not allowed
|
||||
attr = " leading";
|
||||
settings = baseSettings().put(Node.NODE_ATTRIBUTES.getKey() + "test_attr", attr);
|
||||
try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) {
|
||||
fail("should not allow a node attribute with leading whitespace");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("node.attr.test_attr cannot have leading or trailing whitespace [ leading]", e.getMessage());
|
||||
}
|
||||
|
||||
// trailing whitespace not allowed
|
||||
attr = "trailing ";
|
||||
settings = baseSettings().put(Node.NODE_ATTRIBUTES.getKey() + "test_attr", attr);
|
||||
try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) {
|
||||
fail("should not allow a node attribute with trailing whitespace");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("node.attr.test_attr cannot have leading or trailing whitespace [trailing ]", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static Settings.Builder baseSettings() {
|
||||
final Path tempDir = createTempDir();
|
||||
return Settings.builder()
|
||||
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", randomLong()))
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
.put("transport.type", "mock-socket-network")
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), true);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.client.ResponseException;
|
|||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi;
|
||||
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestPath;
|
||||
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
|
||||
|
@ -44,7 +43,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Used by {@link ESClientYamlSuiteTestCase} to execute REST requests according to the tests written in yaml suite files. Wraps a
|
||||
|
@ -53,11 +51,6 @@ import java.util.Set;
|
|||
*/
|
||||
public class ClientYamlTestClient {
|
||||
private static final Logger logger = Loggers.getLogger(ClientYamlTestClient.class);
|
||||
/**
|
||||
* Query params that don't need to be declared in the spec, they are supported by default.
|
||||
*/
|
||||
private static final Set<String> ALWAYS_ACCEPTED_QUERY_STRING_PARAMS = Sets.newHashSet(
|
||||
"ignore", "error_trace", "human", "filter_path", "pretty", "source");
|
||||
|
||||
private final ClientYamlSuiteRestSpec restSpec;
|
||||
private final RestClient restClient;
|
||||
|
@ -108,7 +101,8 @@ public class ClientYamlTestClient {
|
|||
if (restApi.getPathParts().contains(entry.getKey())) {
|
||||
pathParts.put(entry.getKey(), entry.getValue());
|
||||
} else {
|
||||
if (restApi.getParams().contains(entry.getKey()) || ALWAYS_ACCEPTED_QUERY_STRING_PARAMS.contains(entry.getKey())) {
|
||||
if (restApi.getParams().contains(entry.getKey()) || restSpec.isGlobalParameter(entry.getKey())
|
||||
|| restSpec.isClientParameter(entry.getKey())) {
|
||||
queryStringParams.put(entry.getKey(), entry.getValue());
|
||||
} else {
|
||||
throw new IllegalArgumentException("param [" + entry.getKey() + "] not supported in ["
|
||||
|
|
|
@ -30,18 +30,21 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Holds the specification used to turn {@code do} actions in the YAML suite into REST api calls.
|
||||
*/
|
||||
public class ClientYamlSuiteRestSpec {
|
||||
Map<String, ClientYamlSuiteRestApi> restApiMap = new HashMap<>();
|
||||
private final Set<String> globalParameters = new HashSet<>();
|
||||
private final Map<String, ClientYamlSuiteRestApi> restApiMap = new HashMap<>();
|
||||
|
||||
private ClientYamlSuiteRestSpec() {
|
||||
}
|
||||
|
||||
void addApi(ClientYamlSuiteRestApi restApi) {
|
||||
private void addApi(ClientYamlSuiteRestApi restApi) {
|
||||
ClientYamlSuiteRestApi previous = restApiMap.putIfAbsent(restApi.getName(), restApi);
|
||||
if (previous != null) {
|
||||
throw new IllegalArgumentException("cannot register api [" + restApi.getName() + "] found in [" + restApi.getLocation() + "]. "
|
||||
|
@ -57,6 +60,21 @@ public class ClientYamlSuiteRestSpec {
|
|||
return restApiMap.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the provided parameter is one of those parameters that are supported by all Elasticsearch api
|
||||
*/
|
||||
public boolean isGlobalParameter(String param) {
|
||||
return globalParameters.contains(param);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the provided parameter is one of those parameters that are supported by the Elasticsearch language clients, meaning
|
||||
* that they influence the client behaviour and don't get sent to Elasticsearch
|
||||
*/
|
||||
public boolean isClientParameter(String name) {
|
||||
return "ignore".equals(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the complete set of REST spec available under the provided directories
|
||||
*/
|
||||
|
@ -66,16 +84,40 @@ public class ClientYamlSuiteRestSpec {
|
|||
for (String path : paths) {
|
||||
for (Path jsonFile : FileUtils.findJsonSpec(fileSystem, optionalPathPrefix, path)) {
|
||||
try (InputStream stream = Files.newInputStream(jsonFile)) {
|
||||
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, stream)) {
|
||||
ClientYamlSuiteRestApi restApi = restApiParser.parse(jsonFile.toString(), parser);
|
||||
String filename = jsonFile.getFileName().toString();
|
||||
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, stream)) {
|
||||
if (filename.equals("_common.json")) {
|
||||
String currentFieldName = null;
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (parser.currentToken() == XContentParser.Token.START_OBJECT
|
||||
&& "params".equals(currentFieldName)) {
|
||||
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
|
||||
String param = parser.currentName();
|
||||
if (restSpec.globalParameters.contains(param)) {
|
||||
throw new IllegalArgumentException("Found duplicate global param [" + param + "]");
|
||||
}
|
||||
restSpec.globalParameters.add(param);
|
||||
parser.nextToken();
|
||||
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
|
||||
throw new IllegalArgumentException("Expected params field in rest api definition to " +
|
||||
"contain an object");
|
||||
}
|
||||
parser.skipChildren();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ClientYamlSuiteRestApi restApi = restApiParser.parse(jsonFile.toString(), parser);
|
||||
String expectedApiName = filename.substring(0, filename.lastIndexOf('.'));
|
||||
if (restApi.getName().equals(expectedApiName) == false) {
|
||||
throw new IllegalArgumentException("found api [" + restApi.getName() + "] in [" + jsonFile.toString() + "]. " +
|
||||
"Each api is expected to have the same name as the file that defines it.");
|
||||
throw new IllegalArgumentException("found api [" + restApi.getName() + "] in [" + jsonFile.toString() +
|
||||
"]. " + "Each api is expected to have the same name as the file that defines it.");
|
||||
}
|
||||
restSpec.addApi(restApi);
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
throw new IOException("Can't parse rest spec file: [" + jsonFile + "]", ex);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue