diff --git a/build.gradle b/build.gradle index 1159352cd5d..fd97470ec6c 100644 --- a/build.gradle +++ b/build.gradle @@ -17,6 +17,7 @@ * under the License. */ +import java.nio.file.Path import org.eclipse.jgit.lib.Repository import org.eclipse.jgit.lib.RepositoryBuilder import org.gradle.plugins.ide.eclipse.model.SourceFolder @@ -29,8 +30,9 @@ subprojects { description = "Elasticsearch subproject ${project.path}" } +Path rootPath = rootDir.toPath() // setup pom license info, but only for artifacts that are part of elasticsearch -configure(subprojects.findAll { it.path.startsWith(':x-plugins') == false }) { +configure(subprojects.findAll { it.projectDir.toPath().startsWith(rootPath) }) { // we only use maven publish to add tasks for pom generation plugins.withType(MavenPublishPlugin).whenPluginAdded { diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index ecf6f01f3ff..c62ffe5e4c7 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -22,6 +22,7 @@ package org.elasticsearch; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -36,13 +37,15 @@ import org.elasticsearch.transport.TcpTransport; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_UUID_NA_VALUE; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -56,19 +59,19 @@ public class ElasticsearchException extends RuntimeException implements ToXConte static final Version UNKNOWN_VERSION_ADDED = Version.fromId(0); /** - * Passed in the {@link Params} of {@link #toXContent(XContentBuilder, org.elasticsearch.common.xcontent.ToXContent.Params, Throwable)} + * Passed in the {@link Params} of {@link #generateThrowableXContent(XContentBuilder, Params, Throwable)} * to control if the {@code caused_by} element should render. Unlike most parameters to {@code toXContent} methods this parameter is * internal only and not available as a URL parameter. */ public static final String REST_EXCEPTION_SKIP_CAUSE = "rest.exception.cause.skip"; /** - * Passed in the {@link Params} of {@link #toXContent(XContentBuilder, org.elasticsearch.common.xcontent.ToXContent.Params, Throwable)} + * Passed in the {@link Params} of {@link #generateThrowableXContent(XContentBuilder, Params, Throwable)} * to control if the {@code stack_trace} element should render. Unlike most parameters to {@code toXContent} methods this parameter is * internal only and not available as a URL parameter. Use the {@code error_trace} parameter instead. */ public static final String REST_EXCEPTION_SKIP_STACK_TRACE = "rest.exception.stacktrace.skip"; public static final boolean REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT = true; - public static final boolean REST_EXCEPTION_SKIP_CAUSE_DEFAULT = false; + private static final boolean REST_EXCEPTION_SKIP_CAUSE_DEFAULT = false; private static final String INDEX_HEADER_KEY = "es.index"; private static final String INDEX_HEADER_KEY_UUID = "es.index_uuid"; private static final String SHARD_HEADER_KEY = "es.shard"; @@ -160,6 +163,10 @@ public class ElasticsearchException extends RuntimeException implements ToXConte return headers.get(key); } + protected Map> getHeaders() { + return headers; + } + /** * Returns the rest status code associated with this exception. */ @@ -257,64 +264,56 @@ public class ElasticsearchException extends RuntimeException implements ToXConte public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { Throwable ex = ExceptionsHelper.unwrapCause(this); if (ex != this) { - toXContent(builder, params, this); + generateThrowableXContent(builder, params, this); } else { - builder.field(TYPE, getExceptionName()); - builder.field(REASON, getMessage()); - for (String key : headers.keySet()) { - if (key.startsWith("es.")) { - List values = headers.get(key); - xContentHeader(builder, key.substring("es.".length()), values); - } - } - innerToXContent(builder, params); - renderHeader(builder, params); - if (params.paramAsBoolean(REST_EXCEPTION_SKIP_STACK_TRACE, REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT) == false) { - builder.field(STACK_TRACE, ExceptionsHelper.stackTrace(this)); - } + innerToXContent(builder, params, this, getExceptionName(), getMessage(), headers, getCause()); } return builder; } - /** - * Renders additional per exception information into the xcontent - */ - protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { - causeToXContent(builder, params); - } + protected static void innerToXContent(XContentBuilder builder, Params params, + Throwable throwable, String type, String message, Map> headers, + Throwable cause) throws IOException { + builder.field(TYPE, type); + builder.field(REASON, message); - /** - * Renders a cause exception as xcontent - */ - protected void causeToXContent(XContentBuilder builder, Params params) throws IOException { - final Throwable cause = getCause(); - if (cause != null && params.paramAsBoolean(REST_EXCEPTION_SKIP_CAUSE, REST_EXCEPTION_SKIP_CAUSE_DEFAULT) == false) { - builder.field(CAUSED_BY); - builder.startObject(); - toXContent(builder, params, cause); - builder.endObject(); - } - } - - protected final void renderHeader(XContentBuilder builder, Params params) throws IOException { - boolean hasHeader = false; + Set customHeaders = new HashSet<>(); for (String key : headers.keySet()) { if (key.startsWith("es.")) { - continue; + headerToXContent(builder, key.substring("es.".length()), headers.get(key)); + } else { + customHeaders.add(key); } - if (hasHeader == false) { - builder.startObject(HEADER); - hasHeader = true; - } - List values = headers.get(key); - xContentHeader(builder, key, values); } - if (hasHeader) { + + if (throwable instanceof ElasticsearchException) { + ElasticsearchException exception = (ElasticsearchException) throwable; + exception.metadataToXContent(builder, params); + } + + if (params.paramAsBoolean(REST_EXCEPTION_SKIP_CAUSE, REST_EXCEPTION_SKIP_CAUSE_DEFAULT) == false) { + if (cause != null) { + builder.field(CAUSED_BY); + builder.startObject(); + generateThrowableXContent(builder, params, cause); + builder.endObject(); + } + } + + if (customHeaders.isEmpty() == false) { + builder.startObject(HEADER); + for (String header : customHeaders) { + headerToXContent(builder, header, headers.get(header)); + } builder.endObject(); } + + if (params.paramAsBoolean(REST_EXCEPTION_SKIP_STACK_TRACE, REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT) == false) { + builder.field(STACK_TRACE, ExceptionsHelper.stackTrace(throwable)); + } } - private void xContentHeader(XContentBuilder builder, String key, List values) throws IOException { + private static void headerToXContent(XContentBuilder builder, String key, List values) throws IOException { if (values != null && values.isEmpty() == false) { if (values.size() == 1) { builder.field(key, values.get(0)); @@ -329,25 +328,73 @@ public class ElasticsearchException extends RuntimeException implements ToXConte } /** - * Static toXContent helper method that also renders non {@link org.elasticsearch.ElasticsearchException} instances as XContent. + * Renders additional per exception information into the XContent */ - public static void toXContent(XContentBuilder builder, Params params, Throwable ex) throws IOException { - ex = ExceptionsHelper.unwrapCause(ex); - if (ex instanceof ElasticsearchException) { - ((ElasticsearchException) ex).toXContent(builder, params); + protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException { + } + + /** + * Static toXContent helper method that renders {@link org.elasticsearch.ElasticsearchException} or {@link Throwable} instances + * as XContent, delegating the rendering to {@link #toXContent(XContentBuilder, Params)} + * or {@link #innerToXContent(XContentBuilder, Params, Throwable, String, String, Map, Throwable)}. + * + * This method is usually used when the {@link Throwable} is rendered as a part of another XContent object. + */ + public static void generateThrowableXContent(XContentBuilder builder, Params params, Throwable t) throws IOException { + t = ExceptionsHelper.unwrapCause(t); + + if (t instanceof ElasticsearchException) { + ((ElasticsearchException) t).toXContent(builder, params); } else { - builder.field(TYPE, getExceptionName(ex)); - builder.field(REASON, ex.getMessage()); - if (ex.getCause() != null) { - builder.field(CAUSED_BY); + innerToXContent(builder, params, t, getExceptionName(t), t.getMessage(), emptyMap(), t.getCause()); + } + } + + /** + * Render any exception as a xcontent, encapsulated within a field or object named "error". The level of details that are rendered + * depends on the value of the "detailed" parameter: when it's false only a simple message based on the type and message of the + * exception is rendered. When it's true all detail are provided including guesses root causes, cause and potentially stack + * trace. + * + * This method is usually used when the {@link Exception} is rendered as a full XContent object. + */ + public static void generateFailureXContent(XContentBuilder builder, Params params, @Nullable Exception e, boolean detailed) + throws IOException { + // No exception to render as an error + if (e == null) { + builder.field(ERROR, "unknown"); + return; + } + + // Render the exception with a simple message + if (detailed == false) { + String message = "No ElasticsearchException found"; + Throwable t = e; + for (int counter = 0; counter < 10 && t != null; counter++) { + if (t instanceof ElasticsearchException) { + message = t.getClass().getSimpleName() + "[" + t.getMessage() + "]"; + break; + } + t = t.getCause(); + } + builder.field(ERROR, message); + return; + } + + // Render the exception with all details + final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(e); + builder.startObject(ERROR); + { + builder.startArray(ROOT_CAUSE); + for (ElasticsearchException rootCause : rootCauses) { builder.startObject(); - toXContent(builder, params, ex.getCause()); + rootCause.toXContent(builder, new DelegatingMapParams(singletonMap(REST_EXCEPTION_SKIP_CAUSE, "true"), params)); builder.endObject(); } - if (params.paramAsBoolean(REST_EXCEPTION_SKIP_STACK_TRACE, REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT) == false) { - builder.field(STACK_TRACE, ExceptionsHelper.stackTrace(ex)); - } + builder.endArray(); } + generateThrowableXContent(builder, params, e); + builder.endObject(); } /** @@ -877,22 +924,6 @@ public class ElasticsearchException extends RuntimeException implements ToXConte return null; } - public static void renderException(XContentBuilder builder, Params params, Exception e) throws IOException { - builder.startObject(ERROR); - final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(e); - builder.field(ROOT_CAUSE); - builder.startArray(); - for (ElasticsearchException rootCause : rootCauses) { - builder.startObject(); - rootCause.toXContent(builder, new ToXContent.DelegatingMapParams( - Collections.singletonMap(ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE, "true"), params)); - builder.endObject(); - } - builder.endArray(); - ElasticsearchException.toXContent(builder, params, e); - builder.endObject(); - } - // lower cases and adds underscores to transitions in a name private static String toUnderscoreCase(String value) { StringBuilder sb = new StringBuilder(); diff --git a/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java b/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java index 6704f610ec0..519adc77b84 100644 --- a/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java +++ b/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java @@ -105,7 +105,7 @@ public final class TaskOperationFailure implements Writeable, ToXContent { if (reason != null) { builder.field("reason"); builder.startObject(); - ElasticsearchException.toXContent(builder, params, reason); + ElasticsearchException.generateThrowableXContent(builder, params, reason); builder.endObject(); } return builder; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index ce5d92753a8..06093e1ead0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -19,21 +19,21 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; @@ -49,9 +49,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** @@ -116,18 +114,15 @@ public class TransportCancelTasksAction extends TransportTasksAction listener) { - final BanLock banLock = new BanLock(nodes -> removeBanOnNodes(cancellableTask, nodes)); - Set childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); - if (childNodes != null) { - if (childNodes.isEmpty()) { - // The task has no child tasks, so we can return immediately - logger.trace("cancelling task {} with no children", cancellableTask.getId()); - listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false)); - } else { - // The task has some child tasks, we need to wait for until ban is set on all nodes - logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes); + DiscoveryNodes childNodes = clusterService.state().nodes(); + final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes)); + boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); + if (canceled) { + if (cancellableTask.shouldCancelChildrenOnCancellation()) { + // /In case the task has some child tasks, we need to wait for until ban is set on all nodes + logger.trace("cancelling task {} on child nodes", cancellableTask.getId()); String nodeId = clusterService.localNode().getId(); - AtomicInteger responses = new AtomicInteger(childNodes.size()); + AtomicInteger responses = new AtomicInteger(childNodes.getSize()); List failures = new ArrayList<>(); setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener() { @Override @@ -157,7 +152,8 @@ public class TransportCancelTasksAction extends TransportTasksAction nodes, ActionListener listener) { + private void setBanOnNodes(String reason, CancellableTask task, DiscoveryNodes nodes, ActionListener listener) { sendSetBanRequest(nodes, BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason), listener); } - private void removeBanOnNodes(CancellableTask task, Set nodes) { + private void removeBanOnNodes(CancellableTask task, DiscoveryNodes nodes) { sendRemoveBanRequest(nodes, BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()))); } - private void sendSetBanRequest(Set nodes, BanParentTaskRequest request, ActionListener listener) { - ClusterState clusterState = clusterService.state(); - for (String node : nodes) { - DiscoveryNode discoveryNode = clusterState.getNodes().get(node); - if (discoveryNode != null) { - // Check if node still in the cluster - logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node, - request.ban); - transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, - new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty response) { - listener.onResponse(null); - } + private void sendSetBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request, ActionListener listener) { + for (ObjectObjectCursor node : nodes.getNodes()) { + logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node.key, + request.ban); + transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request, + new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty response) { + listener.onResponse(null); + } - @Override - public void handleException(TransportException exp) { - logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); - listener.onFailure(exp); - } - }); - } else { - listener.onResponse(null); - logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster", - request.parentTaskId, node); - } + @Override + public void handleException(TransportException exp) { + logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key); + listener.onFailure(exp); + } + }); } } - private void sendRemoveBanRequest(Set nodes, BanParentTaskRequest request) { - ClusterState clusterState = clusterService.state(); - for (String node : nodes) { - DiscoveryNode discoveryNode = clusterState.getNodes().get(node); - if (discoveryNode != null) { - // Check if node still in the cluster - logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); - transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler - .INSTANCE_SAME); - } else { - logger.debug("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " + - "the cluster", request.parentTaskId, node); - } + private void sendRemoveBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request) { + for (ObjectObjectCursor node : nodes.getNodes()) { + logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key); + transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler + .INSTANCE_SAME); } } private static class BanLock { - private final Consumer> finish; + private final Runnable finish; private final AtomicInteger counter; - private final AtomicReference> nodes = new AtomicReference<>(); + private final int nodesSize; - public BanLock(Consumer> finish) { + public BanLock(int nodesSize, Runnable finish) { counter = new AtomicInteger(0); this.finish = finish; + this.nodesSize = nodesSize; } public void onBanSet() { @@ -242,15 +222,14 @@ public class TransportCancelTasksAction extends TransportTasksAction nodes) { - this.nodes.set(nodes); - if (counter.addAndGet(nodes.size()) == 0) { + public void onTaskFinished() { + if (counter.addAndGet(nodesSize) == 0) { finish(); } } public void finish() { - finish.accept(nodes.get()); + finish.run(); } } @@ -322,5 +301,4 @@ public class TransportCancelTasksAction extends TransportTasksAction() { @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java index f1df6d53e18..3657e327265 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java @@ -207,7 +207,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon builder.field(Fields.ALLOCATED, allocationStatus.value()); if (storeException != null) { builder.startObject(Fields.STORE_EXCEPTION); - ElasticsearchException.toXContent(builder, params, storeException); + ElasticsearchException.generateThrowableXContent(builder, params, storeException); builder.endObject(); } return builder; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java index 6d27b03db63..8c482eac10c 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -131,6 +131,9 @@ public class TransportShrinkAction extends TransportMasterNodeAction() { @Override public ShardResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 9f11b9b5a70..ceca57e0520 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -318,7 +318,6 @@ public abstract class TransportBroadcastByNodeAction() { @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index fbae9f7a12b..f2bc4da423d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -160,7 +160,6 @@ public abstract class TransportMasterNodeAction(listener, TransportMasterNodeAction.this::newResponse) { @Override public void handleException(final TransportException exp) { diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 6cc063d5af1..d8010f4381f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -199,7 +199,6 @@ public abstract class TransportNodesAction shardActionListener) { ShardRequest shardRequest = newShardRequest(request, shardId); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); - taskManager.registerChildTask(task, clusterService.localNode().getId()); replicatedBroadcastShardAction.execute(shardRequest, shardActionListener); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f03385e3829..d3646ac98e7 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -664,7 +664,6 @@ public abstract class TransportReplicationAction< return; } final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); - taskManager.registerChildTask(task, node.getId()); if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { performLocalAction(state, primary, node); } else { diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index ee384b819b0..09026960580 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -278,7 +278,6 @@ public abstract class TransportTasksAction< } else { NodeTaskRequest nodeRequest = new NodeTaskRequest(request); nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); - taskManager.registerChildTask(task, node.getId()); transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsResponse.java b/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsResponse.java index ed8bd935953..8508c834a9f 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsResponse.java @@ -133,7 +133,7 @@ public class MultiTermVectorsResponse extends ActionResponse implements Iterable builder.field(Fields._INDEX, failure.getIndex()); builder.field(Fields._TYPE, failure.getType()); builder.field(Fields._ID, failure.getId()); - ElasticsearchException.renderException(builder, params, failure.getCause()); + ElasticsearchException.generateFailureXContent(builder, params, failure.getCause(), true); builder.endObject(); } else { TermVectorsResponse getResponse = response.getResponse(); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 8c2dc3d47ed..aa26218ac4c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.LongArrayList; import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + import org.elasticsearch.Version; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.Diff; @@ -70,6 +71,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; import static org.elasticsearch.common.settings.Settings.readSettingsFromStream; @@ -192,6 +194,10 @@ public class IndexMetaData implements Diffable, ToXContent { public static final Setting INDEX_SHADOW_REPLICAS_SETTING = Setting.boolSetting(SETTING_SHADOW_REPLICAS, false, Property.IndexScope); + public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size"; + public static final Setting INDEX_ROUTING_PARTITION_SIZE_SETTING = + Setting.intSetting(SETTING_ROUTING_PARTITION_SIZE, 1, 1, Property.IndexScope); + public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem"; public static final Setting INDEX_SHARED_FILESYSTEM_SETTING = Setting.boolSetting(SETTING_SHARED_FILESYSTEM, false, Property.IndexScope); @@ -242,11 +248,11 @@ public class IndexMetaData implements Diffable, ToXContent { public static final String INDEX_ROUTING_INCLUDE_GROUP_PREFIX = "index.routing.allocation.include"; public static final String INDEX_ROUTING_EXCLUDE_GROUP_PREFIX = "index.routing.allocation.exclude"; public static final Setting INDEX_ROUTING_REQUIRE_GROUP_SETTING = - Setting.groupSetting(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope); + Setting.groupSetting(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.IndexScope); public static final Setting INDEX_ROUTING_INCLUDE_GROUP_SETTING = - Setting.groupSetting(INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope); + Setting.groupSetting(INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.IndexScope); public static final Setting INDEX_ROUTING_EXCLUDE_GROUP_SETTING = - Setting.groupSetting(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope); + Setting.groupSetting(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.IndexScope); public static final Setting INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING = Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!! @@ -272,6 +278,7 @@ public class IndexMetaData implements Diffable, ToXContent { public static final String INDEX_STATE_FILE_PREFIX = "state-"; private final int routingNumShards; private final int routingFactor; + private final int routingPartitionSize; private final int numberOfShards; private final int numberOfReplicas; @@ -310,7 +317,7 @@ public class IndexMetaData implements Diffable, ToXContent { ImmutableOpenMap customs, ImmutableOpenIntMap> inSyncAllocationIds, DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters, Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion, - int routingNumShards, ActiveShardCount waitForActiveShards) { + int routingNumShards, int routingPartitionSize, ActiveShardCount waitForActiveShards) { this.index = index; this.version = version; @@ -334,6 +341,7 @@ public class IndexMetaData implements Diffable, ToXContent { this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion; this.routingNumShards = routingNumShards; this.routingFactor = routingNumShards / numberOfShards; + this.routingPartitionSize = routingPartitionSize; this.waitForActiveShards = waitForActiveShards; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -413,6 +421,14 @@ public class IndexMetaData implements Diffable, ToXContent { return numberOfReplicas; } + public int getRoutingPartitionSize() { + return routingPartitionSize; + } + + public boolean isRoutingPartitionedIndex() { + return routingPartitionSize != 1; + } + public int getTotalNumberOfShards() { return totalNumberOfShards; } @@ -809,6 +825,11 @@ public class IndexMetaData implements Diffable, ToXContent { return routingNumShards == null ? numberOfShards() : routingNumShards; } + /** + * Returns the number of shards. + * + * @return the provided value or -1 if it has not been set. + */ public int numberOfShards() { return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1); } @@ -818,10 +839,29 @@ public class IndexMetaData implements Diffable, ToXContent { return this; } + /** + * Returns the number of replicas. + * + * @return the provided value or -1 if it has not been set. + */ public int numberOfReplicas() { return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1); } + public Builder routingPartitionSize(int routingPartitionSize) { + settings = Settings.builder().put(settings).put(SETTING_ROUTING_PARTITION_SIZE, routingPartitionSize).build(); + return this; + } + + /** + * Returns the routing partition size. + * + * @return the provided value or -1 if it has not been set. + */ + public int routingPartitionSize() { + return settings.getAsInt(SETTING_ROUTING_PARTITION_SIZE, -1); + } + public Builder creationDate(long creationDate) { settings = Settings.builder().put(settings).put(SETTING_CREATION_DATE, creationDate).build(); return this; @@ -885,7 +925,7 @@ public class IndexMetaData implements Diffable, ToXContent { } public Builder putInSyncAllocationIds(int shardId, Set allocationIds) { - inSyncAllocationIds.put(shardId, new HashSet(allocationIds)); + inSyncAllocationIds.put(shardId, new HashSet<>(allocationIds)); return this; } @@ -964,6 +1004,12 @@ public class IndexMetaData implements Diffable, ToXContent { throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]"); } + int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings); + if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) { + throw new IllegalArgumentException("routing partition size [" + routingPartitionSize + "] should be a positive number" + + " less than the number of shards [" + getRoutingNumShards() + "] for [" + index + "]"); + } + // fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable ImmutableOpenIntMap.Builder> filledInSyncAllocationIds = ImmutableOpenIntMap.builder(); for (int i = 0; i < numberOfShards; i++) { @@ -1032,7 +1078,7 @@ public class IndexMetaData implements Diffable, ToXContent { final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters, - indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards); + indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 10cd0f60429..64f96c8e263 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.CollectionUtil; @@ -592,6 +593,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { .put(IndexMetaData.SETTING_VERSION_CREATED, sourceMetaData.getCreationVersion()) .put(IndexMetaData.SETTING_VERSION_UPGRADED, sourceMetaData.getUpgradedVersion()) .put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate)) + .put(IndexMetaData.SETTING_ROUTING_PARTITION_SIZE, sourceMetaData.getRoutingPartitionSize()) .put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName()) .put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID()); if (sourceMetaData.getMinimumCompatibleVersion() != null) { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java index b8c19fbeb46..e35fc7c837c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.support.master.MasterNodeRequest; @@ -197,12 +198,16 @@ public class MetaDataIndexTemplateService extends AbstractComponent { Index createdIndex = null; final String temporaryIndexName = UUIDs.randomBase64UUID(); try { + // use the provided values, otherwise just pick valid dummy values + int dummyPartitionSize = IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.get(request.settings); + int dummyShards = request.settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, + dummyPartitionSize == 1 ? 1 : dummyPartitionSize + 1); //create index service for parsing and validating "mappings" Settings dummySettings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(request.settings) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, dummyShards) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) .build(); diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java index fad86caa7cc..3fcfdc08722 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.node; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; @@ -28,6 +29,7 @@ import org.elasticsearch.common.transport.TransportAddress; import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; public class DiscoveryNodeFilters { @@ -36,6 +38,25 @@ public class DiscoveryNodeFilters { OR } + /** + * Validates the IP addresses in a group of {@link Settings} by looking for the keys + * "_ip", "_host_ip", and "_publish_ip" and ensuring each of their comma separated values + * is a valid IP address. + */ + public static final Consumer IP_VALIDATOR = (settings) -> { + Map settingsMap = settings.getAsMap(); + for (Map.Entry entry : settingsMap.entrySet()) { + String propertyKey = entry.getKey(); + if ("_ip".equals(propertyKey) || "_host_ip".equals(propertyKey) || "_publish_ip".equals(propertyKey)) { + for (String value : Strings.tokenizeToStringArray(entry.getValue(), ",")) { + if (InetAddresses.isInetAddress(value) == false) { + throw new IllegalArgumentException("invalid IP address [" + value + "] for [" + propertyKey + "]"); + } + } + } + } + }; + public static DiscoveryNodeFilters buildFromSettings(OpType opType, String prefix, Settings settings) { return buildFromKeyValue(opType, settings.getByPrefix(prefix).getAsMap()); } @@ -43,7 +64,7 @@ public class DiscoveryNodeFilters { public static DiscoveryNodeFilters buildFromKeyValue(OpType opType, Map filters) { Map bFilters = new HashMap<>(); for (Map.Entry entry : filters.entrySet()) { - String[] values = Strings.splitStringByCommaToArray(entry.getValue()); + String[] values = Strings.tokenizeToStringArray(entry.getValue(), ","); if (values.length > 0) { bFilters.put(entry.getKey(), values); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 94cb4b8c8e8..6881cc75657 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -92,13 +92,10 @@ public class OperationRouting extends AbstractComponent { final Set effectiveRouting = routing.get(index); if (effectiveRouting != null) { for (String r : effectiveRouting) { - int shardId = generateShardId(indexMetaData, null, r); - IndexShardRoutingTable indexShard = indexRouting.shard(shardId); - if (indexShard == null) { - throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId)); + final int routingPartitionSize = indexMetaData.getRoutingPartitionSize(); + for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) { + set.add(shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset))); } - // we might get duplicates, but that's ok, they will override one another - set.add(indexShard); } } else { for (IndexShardRoutingTable indexShard : indexRouting) { @@ -187,6 +184,14 @@ public class OperationRouting extends AbstractComponent { } } + private IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) { + IndexShardRoutingTable indexShard = indexRouting.shard(shardId); + if (indexShard == null) { + throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId)); + } + return indexShard; + } + protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) { IndexRoutingTable indexRouting = clusterState.routingTable().index(index); if (indexRouting == null) { @@ -213,15 +218,33 @@ public class OperationRouting extends AbstractComponent { return new ShardId(indexMetaData.getIndex(), generateShardId(indexMetaData, id, routing)); } - static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) { - final int hash; + static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) { + final String effectiveRouting; + final int partitionOffset; + if (routing == null) { - hash = Murmur3HashFunction.hash(id); + assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index"; + effectiveRouting = id; } else { - hash = Murmur3HashFunction.hash(routing); + effectiveRouting = routing; } + + if (indexMetaData.isRoutingPartitionedIndex()) { + partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize()); + } else { + // we would have still got 0 above but this check just saves us an unnecessary hash calculation + partitionOffset = 0; + } + + return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset); + } + + private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) { + final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset; + // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size // of original index to hash documents return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor(); } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java index 701bb66f8d9..7140a02d90c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java @@ -262,7 +262,7 @@ public class NodeAllocationResult implements ToXContent, Writeable, Comparable CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING = - new Setting<>("cluster.routing.allocation.awareness.attributes", "", Strings::splitStringByCommaToArray , Property.Dynamic, + new Setting<>("cluster.routing.allocation.awareness.attributes", "", s -> Strings.tokenizeToStringArray(s, ","), Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING = Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope); - private String[] awarenessAttributes; + private volatile String[] awarenessAttributes; private volatile Map forcedAwarenessAttributes; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index 855c570a252..85069392eb6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; @@ -68,11 +69,11 @@ public class FilterAllocationDecider extends AllocationDecider { private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include"; private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude"; public static final Setting CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = - Setting.groupSetting(CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope); + Setting.groupSetting(CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_INCLUDE_GROUP_SETTING = - Setting.groupSetting(CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope); + Setting.groupSetting(CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING = - Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope); + Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope); private volatile DiscoveryNodeFilters clusterRequireFilters; private volatile DiscoveryNodeFilters clusterIncludeFilters; diff --git a/core/src/main/java/org/elasticsearch/common/ParsingException.java b/core/src/main/java/org/elasticsearch/common/ParsingException.java index 0519ab38339..5dc2c8a74e4 100644 --- a/core/src/main/java/org/elasticsearch/common/ParsingException.java +++ b/core/src/main/java/org/elasticsearch/common/ParsingException.java @@ -95,12 +95,11 @@ public class ParsingException extends ElasticsearchException { } @Override - protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { + protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException { if (lineNumber != UNKNOWN_POSITION) { builder.field("line", lineNumber); builder.field("col", columnNumber); } - super.innerToXContent(builder, params); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java index e700d301644..e01fe1beee2 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -73,9 +73,8 @@ public class CircuitBreakingException extends ElasticsearchException { } @Override - protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { + protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException { builder.field("bytes_wanted", bytesWanted); builder.field("bytes_limit", byteLimit); - super.innerToXContent(builder, params); } } diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 959fe1849ea..0c86152db78 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -69,6 +69,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING, IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING, IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING, + IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING, IndexMetaData.INDEX_SHADOW_REPLICAS_SETTING, IndexMetaData.INDEX_SHARED_FILESYSTEM_SETTING, IndexMetaData.INDEX_READ_ONLY_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 1d34d570a65..ee4b5fea15e 100755 --- a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper; import com.carrotsearch.hppc.ObjectHashSet; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.DelegatingAnalyzerWrapper; @@ -384,6 +385,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable { MapperUtils.collect(newMapper.mapping().root(), objectMappers, fieldMappers); checkFieldUniqueness(newMapper.type(), objectMappers, fieldMappers, fullPathObjectMappers, fieldTypes); checkObjectsCompatibility(objectMappers, updateAllTypes, fullPathObjectMappers); + checkPartitionedIndexConstraints(newMapper); // update lookup data-structures // this will in particular make sure that the merged fields are compatible with other types @@ -598,6 +600,20 @@ public class MapperService extends AbstractIndexComponent implements Closeable { } } + private void checkPartitionedIndexConstraints(DocumentMapper newMapper) { + if (indexSettings.getIndexMetaData().isRoutingPartitionedIndex()) { + if (newMapper.parentFieldMapper().active()) { + throw new IllegalArgumentException("mapping type name [" + newMapper.type() + "] cannot have a " + + "_parent field for the partitioned index [" + indexSettings.getIndex().getName() + "]"); + } + + if (!newMapper.routingFieldMapper().required()) { + throw new IllegalArgumentException("mapping type [" + newMapper.type() + "] must have routing " + + "required for partitioned index [" + indexSettings.getIndex().getName() + "]"); + } + } + } + public DocumentMapper parse(String mappingType, CompressedXContent mappingSource, boolean applyDefault) throws MapperParsingException { return documentParser.parse(mappingType, mappingSource, applyDefault ? defaultMappingSource : null); } diff --git a/core/src/main/java/org/elasticsearch/index/query/QueryShardException.java b/core/src/main/java/org/elasticsearch/index/query/QueryShardException.java index 1e31c7c50e1..9b6ce3a6e4b 100644 --- a/core/src/main/java/org/elasticsearch/index/query/QueryShardException.java +++ b/core/src/main/java/org/elasticsearch/index/query/QueryShardException.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.query; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; @@ -60,11 +59,6 @@ public class QueryShardException extends ElasticsearchException { return RestStatus.BAD_REQUEST; } - @Override - protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { - super.innerToXContent(builder, params); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index b7802f16bbc..e3fe68838d4 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -181,7 +181,16 @@ public class Node implements Closeable { */ public static final Setting NODE_LOCAL_STORAGE_SETTING = Setting.boolSetting("node.local_storage", true, Property.NodeScope); public static final Setting NODE_NAME_SETTING = Setting.simpleString("node.name", Property.NodeScope); - public static final Setting NODE_ATTRIBUTES = Setting.groupSetting("node.attr.", Property.NodeScope); + public static final Setting NODE_ATTRIBUTES = Setting.groupSetting("node.attr.", (settings) -> { + Map settingsMap = settings.getAsMap(); + for (Map.Entry entry : settingsMap.entrySet()) { + String value = entry.getValue(); + if (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(value.length() - 1))) { + throw new IllegalArgumentException("node.attr." + entry.getKey() + " cannot have leading or trailing whitespace " + + "[" + value + "]"); + } + } + }, Property.NodeScope); public static final Setting BREAKER_TYPE_KEY = new Setting<>("indices.breaker.type", "hierarchy", (s) -> { switch (s) { case "hierarchy": diff --git a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java index 33b73a6ff83..5abaf0d6c13 100644 --- a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java +++ b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java @@ -31,7 +31,10 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Collections; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; +import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT; public class BytesRestResponse extends RestResponse { @@ -89,7 +92,7 @@ public class BytesRestResponse extends RestResponse { this.content = BytesArray.EMPTY; this.contentType = TEXT_CONTENT_TYPE; } else { - try (final XContentBuilder builder = convert(channel, status, e)) { + try (final XContentBuilder builder = build(channel, status, e)) { this.content = builder.bytes(); this.contentType = builder.contentType().mediaType(); } @@ -116,49 +119,25 @@ public class BytesRestResponse extends RestResponse { private static final Logger SUPPRESSED_ERROR_LOGGER = ESLoggerFactory.getLogger("rest.suppressed"); - private static XContentBuilder convert(RestChannel channel, RestStatus status, Exception e) throws IOException { - XContentBuilder builder = channel.newErrorBuilder().startObject(); - if (e == null) { - builder.field("error", "unknown"); - } else if (channel.detailedErrorsEnabled()) { - final ToXContent.Params params; - if (channel.request().paramAsBoolean("error_trace", !ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT)) { - params = new ToXContent.DelegatingMapParams( - Collections.singletonMap(ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE, "false"), channel.request()); + private static XContentBuilder build(RestChannel channel, RestStatus status, Exception e) throws IOException { + ToXContent.Params params = channel.request(); + if (params.paramAsBoolean("error_trace", !REST_EXCEPTION_SKIP_STACK_TRACE_DEFAULT)) { + params = new ToXContent.DelegatingMapParams(singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false"), params); + } else if (e != null) { + Supplier messageSupplier = () -> new ParameterizedMessage("path: {}, params: {}", + channel.request().rawPath(), channel.request().params()); + + if (status.getStatus() < 500) { + SUPPRESSED_ERROR_LOGGER.debug(messageSupplier, e); } else { - if (status.getStatus() < 500) { - SUPPRESSED_ERROR_LOGGER.debug( - (Supplier) () -> new ParameterizedMessage("path: {}, params: {}", - channel.request().rawPath(), channel.request().params()), e); - } else { - SUPPRESSED_ERROR_LOGGER.warn( - (Supplier) () -> new ParameterizedMessage("path: {}, params: {}", - channel.request().rawPath(), channel.request().params()), e); - } - params = channel.request(); + SUPPRESSED_ERROR_LOGGER.warn(messageSupplier, e); } - ElasticsearchException.renderException(builder, params, e); - } else { - builder.field("error", simpleMessage(e)); } + + XContentBuilder builder = channel.newErrorBuilder().startObject(); + ElasticsearchException.generateFailureXContent(builder, params, e, channel.detailedErrorsEnabled()); builder.field("status", status.getStatus()); builder.endObject(); return builder; } - - /* - * Builds a simple error string from the message of the first ElasticsearchException - */ - private static String simpleMessage(Throwable t) throws IOException { - int counter = 0; - Throwable next = t; - while (next != null && counter++ < 10) { - if (t instanceof ElasticsearchException) { - return next.getClass().getSimpleName() + "[" + next.getMessage() + "]"; - } - next = next.getCause(); - } - - return "No ElasticsearchException found"; - } } diff --git a/core/src/main/java/org/elasticsearch/script/ScriptException.java b/core/src/main/java/org/elasticsearch/script/ScriptException.java index 475091f9f6d..91e6ad401fc 100644 --- a/core/src/main/java/org/elasticsearch/script/ScriptException.java +++ b/core/src/main/java/org/elasticsearch/script/ScriptException.java @@ -87,8 +87,7 @@ public class ScriptException extends ElasticsearchException { } @Override - protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { - super.innerToXContent(builder, params); + protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException { builder.field("script_stack", scriptStack); builder.field("script", script); builder.field("lang", lang); diff --git a/core/src/main/java/org/elasticsearch/script/ScriptService.java b/core/src/main/java/org/elasticsearch/script/ScriptService.java index 478aac6a55c..60cdb03e479 100644 --- a/core/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/core/src/main/java/org/elasticsearch/script/ScriptService.java @@ -607,7 +607,7 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.prettyPrint(); builder.startObject(); - ElasticsearchException.toXContent(builder, ToXContent.EMPTY_PARAMS, e); + ElasticsearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, e); builder.endObject(); logger.warn("failed to load/compile script [{}]: {}", scriptNameExt.v1(), builder.string()); } catch (IOException ioe) { diff --git a/core/src/main/java/org/elasticsearch/search/SearchParseException.java b/core/src/main/java/org/elasticsearch/search/SearchParseException.java index 223225af2a6..f8ae3c7f674 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchParseException.java +++ b/core/src/main/java/org/elasticsearch/search/SearchParseException.java @@ -72,12 +72,11 @@ public class SearchParseException extends SearchContextException { } @Override - protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { + protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException { if (lineNumber != UNKNOWN_POSITION) { builder.field("line", lineNumber); builder.field("col", columnNumber); } - super.innerToXContent(builder, params); } /** diff --git a/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java b/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java index eb084a57e10..b3c1a8929a6 100644 --- a/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java +++ b/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * A task that can be canceled */ -public class CancellableTask extends Task { +public abstract class CancellableTask extends Task { private final AtomicReference reason = new AtomicReference<>(); @@ -51,6 +51,11 @@ public class CancellableTask extends Task { return true; } + /** + * Returns true if this task should can potentially have children that needs to be cancelled when the parent is cancelled. + */ + public abstract boolean shouldCancelChildrenOnCancellation(); + public boolean isCancelled() { return reason.get() != null; } diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java index 61c36f9015d..dce39cea7d7 100644 --- a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -125,15 +125,18 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie /** * Cancels a task *

- * Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise. + * Returns true if cancellation was started successful, null otherwise. + * + * After starting cancellation on the parent task, the task manager tries to cancel all children tasks + * of the current task. Once cancellation of the children tasks is done, the listener is triggered. */ - public Set cancel(CancellableTask task, String reason, Consumer> listener) { + public boolean cancel(CancellableTask task, String reason, Runnable listener) { CancellableTaskHolder holder = cancellableTasks.get(task.getId()); if (holder != null) { logger.trace("cancelling task with id {}", task.getId()); return holder.cancel(reason, listener); } - return null; + return false; } /** @@ -344,17 +347,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie } } - public void registerChildTask(Task task, String node) { - if (task == null || task instanceof CancellableTask == false) { - // We don't have a cancellable task - not much we can do here - return; - } - CancellableTaskHolder holder = cancellableTasks.get(task.getId()); - if (holder != null) { - holder.registerChildTaskNode(node); - } - } - /** * Blocks the calling thread, waiting for the task to vanish from the TaskManager. */ @@ -378,11 +370,9 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie private final CancellableTask task; - private final Set nodesWithChildTasks = new HashSet<>(); - private volatile String cancellationReason = null; - private volatile Consumer> cancellationListener = null; + private volatile Runnable cancellationListener = null; public CancellableTaskHolder(CancellableTask task) { this.task = task; @@ -391,33 +381,33 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie /** * Marks task as cancelled. *

- * Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise. + * Returns true if cancellation was successful, false otherwise. */ - public Set cancel(String reason, Consumer> listener) { - Set nodes; + public boolean cancel(String reason, Runnable listener) { + final boolean cancelled; synchronized (this) { assert reason != null; if (cancellationReason == null) { cancellationReason = reason; cancellationListener = listener; - nodes = Collections.unmodifiableSet(nodesWithChildTasks); + cancelled = true; } else { // Already cancelled by somebody else - nodes = null; + cancelled = false; } } - if (nodes != null) { + if (cancelled) { task.cancel(reason); } - return nodes; + return cancelled; } /** * Marks task as cancelled. *

- * Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise. + * Returns true if cancellation was successful, false otherwise. */ - public Set cancel(String reason) { + public boolean cancel(String reason) { return cancel(reason, null); } @@ -425,14 +415,12 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie * Marks task as finished. */ public void finish() { - Consumer> listener = null; - Set nodes = null; + Runnable listener = null; synchronized (this) { if (cancellationReason != null) { // The task was cancelled, we need to notify the listener if (cancellationListener != null) { listener = cancellationListener; - nodes = Collections.unmodifiableSet(nodesWithChildTasks); cancellationListener = null; } } else { @@ -442,7 +430,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie // We need to call the listener outside of the synchronised section to avoid potential bottle necks // in the listener synchronization if (listener != null) { - listener.accept(nodes); + listener.run(); } } @@ -454,14 +442,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie public CancellableTask getTask() { return task; } - - public synchronized void registerChildTaskNode(String nodeId) { - if (cancellationReason == null) { - nodesWithChildTasks.add(nodeId); - } else { - throw new TaskCancelledException("cannot register child task request, the task is already cancelled"); - } - } } } diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskResult.java b/core/src/main/java/org/elasticsearch/tasks/TaskResult.java index 5df43b7485c..87cce4a3d1f 100644 --- a/core/src/main/java/org/elasticsearch/tasks/TaskResult.java +++ b/core/src/main/java/org/elasticsearch/tasks/TaskResult.java @@ -230,7 +230,7 @@ public final class TaskResult implements Writeable, ToXContent { private static BytesReference toXContent(Exception error) throws IOException { try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { builder.startObject(); - ElasticsearchException.toXContent(builder, ToXContent.EMPTY_PARAMS, error); + ElasticsearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, error); builder.endObject(); return builder.bytes(); } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index a3915c5d89c..8c816d12be1 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -515,7 +515,6 @@ public class TransportService extends AbstractLifecycleComponent { final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); try { - taskManager.registerChildTask(parentTask, connection.getNode().getId()); sendRequest(connection, action, request, options, handler); } catch (TaskCancelledException ex) { // The parent task is already cancelled - just fail the request diff --git a/core/src/test/java/org/elasticsearch/ESExceptionTests.java b/core/src/test/java/org/elasticsearch/ESExceptionTests.java index b59c5625939..5f09cfe57c9 100644 --- a/core/src/test/java/org/elasticsearch/ESExceptionTests.java +++ b/core/src/test/java/org/elasticsearch/ESExceptionTests.java @@ -253,7 +253,7 @@ public class ESExceptionTests extends ESTestCase { } XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); - ElasticsearchException.toXContent(builder, PARAMS, ex); + ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex); builder.endObject(); String expected = "{\"type\":\"file_not_found_exception\",\"reason\":\"foo not found\"}"; @@ -264,7 +264,7 @@ public class ESExceptionTests extends ESTestCase { ParsingException ex = new ParsingException(1, 2, "foobar", null); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); - ElasticsearchException.toXContent(builder, PARAMS, ex); + ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex); builder.endObject(); String expected = "{\"type\":\"parsing_exception\",\"reason\":\"foobar\",\"line\":1,\"col\":2}"; assertEquals(expected, builder.string()); @@ -274,7 +274,7 @@ public class ESExceptionTests extends ESTestCase { ElasticsearchException ex = new RemoteTransportException("foobar", new FileNotFoundException("foo not found")); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); - ElasticsearchException.toXContent(builder, PARAMS, ex); + ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex); builder.endObject(); XContentBuilder otherBuilder = XContentFactory.jsonBuilder(); @@ -292,7 +292,7 @@ public class ESExceptionTests extends ESTestCase { ex.addHeader("test_multi", "some value", "another value"); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); - ElasticsearchException.toXContent(builder, PARAMS, ex); + ElasticsearchException.generateThrowableXContent(builder, PARAMS, ex); builder.endObject(); assertThat(builder.string(), Matchers.anyOf( // iteration order depends on platform equalTo("{\"type\":\"parsing_exception\",\"reason\":\"foobar\",\"line\":1,\"col\":2,\"header\":{\"test_multi\":[\"some value\",\"another value\"],\"test\":\"some value\"}}"), diff --git a/core/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java b/core/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java index c5d9d31c1eb..576b53ad0e7 100644 --- a/core/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java +++ b/core/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch; +import org.apache.lucene.util.Constants; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -67,7 +68,9 @@ public class ElasticsearchExceptionTests extends ESTestCase { // in the JSON. Since the stack can be large, it only checks the beginning of the JSON. assertExceptionAsJson(e, true, startsWith("{\"type\":\"exception\",\"reason\":\"foo\"," + "\"caused_by\":{\"type\":\"illegal_state_exception\",\"reason\":\"bar\"," + - "\"stack_trace\":\"java.lang.IllegalStateException: bar")); + "\"stack_trace\":\"java.lang.IllegalStateException: bar" + + (Constants.WINDOWS ? "\\r\\n" : "\\n") + + "\\tat org.elasticsearch.")); } public void testToXContentWithHeaders() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 6d0a0824490..da60b564cec 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -91,7 +91,12 @@ public class CancellableTasksTests extends TaskManagerTestCase { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId); + return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + }; } } @@ -126,7 +131,12 @@ public class CancellableTasksTests extends TaskManagerTestCase { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId); + return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + }; } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 28aff0d3bda..99d058886a5 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -83,6 +83,11 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { super(id, type, action, description, parentTaskId); } + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + public boolean isBlocked() { return blocked; } @@ -242,7 +247,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId); + return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + }; } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 1f5c92e286f..f461be77e0c 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -41,6 +41,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -307,4 +308,31 @@ public class CreateIndexIT extends ESIntegTestCase { .build(); assertFalse(client().admin().indices().prepareCreate("test-idx-3").setSettings(settings).setTimeout("100ms").get().isShardsAcked()); } + + public void testInvalidPartitionSize() { + BiFunction createPartitionedIndex = (shards, partitionSize) -> { + CreateIndexResponse response; + + try { + response = prepareCreate("test_" + shards + "_" + partitionSize) + .setSettings(Settings.builder() + .put("index.number_of_shards", shards) + .put("index.routing_partition_size", partitionSize)) + .execute().actionGet(); + } catch (IllegalStateException | IllegalArgumentException e) { + return false; + } + + return response.isAcknowledged(); + }; + + assertFalse(createPartitionedIndex.apply(3, 6)); + assertFalse(createPartitionedIndex.apply(3, 0)); + assertFalse(createPartitionedIndex.apply(3, 3)); + + assertTrue(createPartitionedIndex.apply(3, 1)); + assertTrue(createPartitionedIndex.apply(3, 2)); + + assertTrue(createPartitionedIndex.apply(1, 1)); + } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 8e5950fe9f9..cdb3574236b 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1163,7 +1163,8 @@ public class TransportReplicationActionTests extends ESTestCase { @Override public void execute() throws Exception { - this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<>(null, new Response())); + // Using the diamond operator (<>) prevents Eclipse from being able to compile this code + this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult(null, new Response())); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java index 03866269cae..93771019ef4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java @@ -24,8 +24,10 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; @@ -144,5 +146,15 @@ public class FilteringAllocationIT extends ESIntegTestCase { clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(clusterState.routingTable().index("test").numberOfNodesShardsAreAllocatedOn(), equalTo(2)); } + + public void testInvalidIPFilterClusterSettings() { + String ipKey = randomFrom("_ip", "_host_ip", "_publish_ip"); + Setting filterSetting = randomFrom(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, + FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(filterSetting.getKey() + ipKey, "192.168.1.1.")) + .execute().actionGet()); + assertEquals("invalid IP address [192.168.1.1.] for [" + ipKey + "]", e.getMessage()); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeFiltersTests.java b/core/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeFiltersTests.java index 1f226427237..c78de758d96 100644 --- a/core/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeFiltersTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeFiltersTests.java @@ -245,6 +245,17 @@ public class DiscoveryNodeFiltersTests extends ESTestCase { assertThat(filters.match(node), equalTo(true)); } + public void testCommaSeparatedValuesTrimmed() { + DiscoveryNode node = new DiscoveryNode("", "", "", "", "192.1.1.54", localAddress, singletonMap("tag", "B"), emptySet(), null); + + Settings settings = shuffleSettings(Settings.builder() + .put("xxx." + randomFrom("_ip", "_host_ip", "_publish_ip"), "192.1.1.1, 192.1.1.54") + .put("xxx.tag", "A, B") + .build()); + DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings); + assertTrue(filters.match(node)); + } + private Settings shuffleSettings(Settings source) { Settings.Builder settings = Settings.builder(); List keys = new ArrayList<>(source.getAsMap().keySet()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index 3118692b341..c24a86176dc 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -22,7 +22,6 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -35,6 +34,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -43,8 +43,8 @@ import java.util.TreeMap; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.object.HasToString.hasToString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.object.HasToString.hasToString; public class OperationRoutingTests extends ESTestCase{ @@ -75,6 +75,142 @@ public class OperationRoutingTests extends ESTestCase{ } } + public void testPartitionedIndex() { + // make sure the same routing value always has each _id fall within the configured partition size + for (int shards = 1; shards < 5; shards++) { + for (int partitionSize = 1; partitionSize == 1 || partitionSize < shards; partitionSize++) { + IndexMetaData metaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(shards) + .routingPartitionSize(partitionSize) + .numberOfReplicas(1) + .build(); + + for (int i = 0; i < 20; i++) { + String routing = randomUnicodeOfLengthBetween(1, 50); + Set shardSet = new HashSet<>(); + + for (int k = 0; k < 150; k++) { + String id = randomUnicodeOfLengthBetween(1, 50); + + shardSet.add(OperationRouting.generateShardId(metaData, id, routing)); + } + + assertEquals(partitionSize, shardSet.size()); + } + } + } + } + + public void testPartitionedIndexShrunk() { + Map> routingIdToShard = new HashMap<>(); + + Map routingA = new HashMap<>(); + routingA.put("a_0", 1); + routingA.put("a_1", 2); + routingA.put("a_2", 2); + routingA.put("a_3", 2); + routingA.put("a_4", 1); + routingA.put("a_5", 2); + routingIdToShard.put("a", routingA); + + Map routingB = new HashMap<>(); + routingB.put("b_0", 0); + routingB.put("b_1", 0); + routingB.put("b_2", 0); + routingB.put("b_3", 0); + routingB.put("b_4", 3); + routingB.put("b_5", 3); + routingIdToShard.put("b", routingB); + + Map routingC = new HashMap<>(); + routingC.put("c_0", 1); + routingC.put("c_1", 1); + routingC.put("c_2", 0); + routingC.put("c_3", 0); + routingC.put("c_4", 0); + routingC.put("c_5", 1); + routingIdToShard.put("c", routingC); + + Map routingD = new HashMap<>(); + routingD.put("d_0", 2); + routingD.put("d_1", 2); + routingD.put("d_2", 3); + routingD.put("d_3", 3); + routingD.put("d_4", 3); + routingD.put("d_5", 3); + routingIdToShard.put("d", routingD); + + IndexMetaData metaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .setRoutingNumShards(8) + .numberOfShards(4) + .routingPartitionSize(3) + .numberOfReplicas(1) + .build(); + + for (Map.Entry> routingIdEntry : routingIdToShard.entrySet()) { + String routing = routingIdEntry.getKey(); + + for (Map.Entry idEntry : routingIdEntry.getValue().entrySet()) { + String id = idEntry.getKey(); + int shard = idEntry.getValue(); + + assertEquals(shard, OperationRouting.generateShardId(metaData, id, routing)); + } + } + } + + public void testPartitionedIndexBWC() { + Map> routingIdToShard = new HashMap<>(); + + Map routingA = new HashMap<>(); + routingA.put("a_0", 3); + routingA.put("a_1", 2); + routingA.put("a_2", 2); + routingA.put("a_3", 3); + routingIdToShard.put("a", routingA); + + Map routingB = new HashMap<>(); + routingB.put("b_0", 5); + routingB.put("b_1", 0); + routingB.put("b_2", 0); + routingB.put("b_3", 0); + routingIdToShard.put("b", routingB); + + Map routingC = new HashMap<>(); + routingC.put("c_0", 4); + routingC.put("c_1", 4); + routingC.put("c_2", 3); + routingC.put("c_3", 4); + routingIdToShard.put("c", routingC); + + Map routingD = new HashMap<>(); + routingD.put("d_0", 3); + routingD.put("d_1", 4); + routingD.put("d_2", 4); + routingD.put("d_3", 4); + routingIdToShard.put("d", routingD); + + IndexMetaData metaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(6) + .routingPartitionSize(2) + .numberOfReplicas(1) + .build(); + + for (Map.Entry> routingIdEntry : routingIdToShard.entrySet()) { + String routing = routingIdEntry.getKey(); + + for (Map.Entry idEntry : routingIdEntry.getValue().entrySet()) { + String id = idEntry.getKey(); + int shard = idEntry.getValue(); + + assertEquals(shard, OperationRouting.generateShardId(metaData, id, routing)); + } + } + } + /** * Ensures that all changes to the hash-function / shard selection are BWC */ diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java index fec0a33b917..2c1ec07c7fa 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -32,10 +32,14 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import java.util.HashMap; +import java.util.Map; + import static java.util.Collections.singletonMap; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -803,4 +807,50 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(4)); // +1 for relocating shard. assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); // Still 1 unassigned. } + + public void testMultipleAwarenessAttributes() { + AllocationService strategy = createAllocationService(Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone, rack") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a, b") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "rack.values", "c, d") + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .build()); + + logger.info("Building initial routing table for 'testUnbalancedZones'"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build(); + + ClusterState clusterState = ClusterState.builder( + org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY) + ).metaData(metaData).routingTable(initialRoutingTable).build(); + + logger.info("--> adding two nodes in different zones and do rerouting"); + Map nodeAAttributes = new HashMap<>(); + nodeAAttributes.put("zone", "a"); + nodeAAttributes.put("rack", "c"); + Map nodeBAttributes = new HashMap<>(); + nodeBAttributes.put("zone", "b"); + nodeBAttributes.put("rack", "d"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(newNode("A-0", nodeAAttributes)) + .add(newNode("B-0", nodeBAttributes)) + ).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + logger.info("--> all replicas are allocated and started since we have one node in each zone and rack"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java index 5ec162eb719..d4ec30f6e51 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java @@ -37,6 +37,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDeci import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; @@ -191,4 +193,16 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { .build(); return service.reroute(clusterState, "reroute", false); } + + public void testInvalidIPFilter() { + String ipKey = randomFrom("_ip", "_host_ip", "_publish_ip"); + Setting filterSetting = randomFrom(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING, + IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { + IndexScopedSettings indexScopedSettings = new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS); + indexScopedSettings.updateDynamicSettings(Settings.builder().put(filterSetting.getKey() + ipKey, "192..168.1.1").build(), + Settings.builder().put(Settings.EMPTY), Settings.builder(), "test ip validation"); + }); + assertEquals("invalid IP address [192..168.1.1] for [" + ipKey + "]", e.getMessage()); + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index d5e9ddcde9a..d8eee1391e7 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -584,6 +584,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } ensureGreen("test"); + // make sure all nodes have the updated cluster state with the latest routing table + final long clusterStateVersionOnMaster = internalCluster().clusterService(internalCluster().getMasterName()) + .state().getVersion(); + assertBusy(() -> { + for (String node : nodes) { + assertThat(internalCluster().clusterService(node).state().getVersion(), + greaterThanOrEqualTo(clusterStateVersionOnMaster)); + } + }); + logger.info("validating successful docs"); for (String node : nodes) { try { diff --git a/core/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java b/core/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java index fec32cdd5cc..3c9a1c16d16 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java @@ -234,4 +234,37 @@ public class MapperServiceTests extends ESSingleNodeTestCase { MergeReason.MAPPING_UPDATE, random().nextBoolean())); assertThat(e.getMessage(), containsString("[_all] is disabled in 6.0")); } + + public void testPartitionedConstraints() { + // partitioned index must have routing + IllegalArgumentException noRoutingException = expectThrows(IllegalArgumentException.class, () -> { + client().admin().indices().prepareCreate("test-index") + .addMapping("type", "{\"type\":{}}") + .setSettings(Settings.builder() + .put("index.number_of_shards", 4) + .put("index.routing_partition_size", 2)) + .execute().actionGet(); + }); + assertTrue(noRoutingException.getMessage(), noRoutingException.getMessage().contains("must have routing")); + + // partitioned index cannot have parent/child relationships + IllegalArgumentException parentException = expectThrows(IllegalArgumentException.class, () -> { + client().admin().indices().prepareCreate("test-index") + .addMapping("parent", "{\"parent\":{\"_routing\":{\"required\":true}}}") + .addMapping("child", "{\"child\": {\"_routing\":{\"required\":true}, \"_parent\": {\"type\": \"parent\"}}}") + .setSettings(Settings.builder() + .put("index.number_of_shards", 4) + .put("index.routing_partition_size", 2)) + .execute().actionGet(); + }); + assertTrue(parentException.getMessage(), parentException.getMessage().contains("cannot have a _parent field")); + + // valid partitioned index + assertTrue(client().admin().indices().prepareCreate("test-index") + .addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}") + .setSettings(Settings.builder() + .put("index.number_of_shards", 4) + .put("index.routing_partition_size", 2)) + .execute().actionGet().isAcknowledged()); + } } diff --git a/core/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java b/core/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java index 57777453976..521446583c8 100644 --- a/core/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java @@ -37,7 +37,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESIntegTestCase; - import org.junit.After; import java.io.IOException; @@ -306,7 +305,6 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase { "get template with " + Arrays.toString(names)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/8802") public void testBrokenMapping() throws Exception { // clean all templates setup by the framework. client().admin().indices().prepareDeleteTemplate("*").get(); @@ -764,4 +762,63 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase { assertEquals("value1", searchResponse.getHits().getAt(0).field("field1").value().toString()); assertNull(searchResponse.getHits().getAt(0).field("field2")); } + + public void testPartitionedTemplate() throws Exception { + // clean all templates setup by the framework. + client().admin().indices().prepareDeleteTemplate("*").get(); + + // check get all templates on an empty index. + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get(); + assertThat(response.getIndexTemplates(), empty()); + + // provide more partitions than shards + IllegalArgumentException eBadSettings = expectThrows(IllegalArgumentException.class, + () -> client().admin().indices().preparePutTemplate("template_1") + .setPatterns(Collections.singletonList("te*")) + .setSettings(Settings.builder() + .put("index.number_of_shards", "5") + .put("index.routing_partition_size", "6")) + .get()); + assertThat(eBadSettings.getMessage(), containsString("partition size [6] should be a positive number " + + "less than the number of shards [5]")); + + // provide an invalid mapping for a partitioned index + IllegalArgumentException eBadMapping = expectThrows(IllegalArgumentException.class, + () -> client().admin().indices().preparePutTemplate("template_2") + .setPatterns(Collections.singletonList("te*")) + .addMapping("type", "{\"type\":{\"_routing\":{\"required\":false}}}") + .setSettings(Settings.builder() + .put("index.number_of_shards", "6") + .put("index.routing_partition_size", "3")) + .get()); + assertThat(eBadMapping.getMessage(), containsString("must have routing required for partitioned index")); + + // no templates yet + response = client().admin().indices().prepareGetTemplates().get(); + assertEquals(0, response.getIndexTemplates().size()); + + // a valid configuration that only provides the partition size + assertAcked(client().admin().indices().preparePutTemplate("just_partitions") + .setPatterns(Collections.singletonList("te*")) + .setSettings(Settings.builder() + .put("index.routing_partition_size", "6")) + .get()); + + // create an index with too few shards + IllegalArgumentException eBadIndex = expectThrows(IllegalArgumentException.class, + () -> prepareCreate("test_bad", Settings.builder() + .put("index.number_of_shards", 5)) + .get()); + + assertThat(eBadIndex.getMessage(), containsString("partition size [6] should be a positive number " + + "less than the number of shards [5]")); + + // finally, create a valid index + prepareCreate("test_good", Settings.builder() + .put("index.number_of_shards", 7)) + .get(); + + GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test_good").get(); + assertEquals("6", getSettingsResponse.getIndexToSettings().get("test_good").getAsMap().get("index.routing_partition_size")); + } } diff --git a/core/src/test/java/org/elasticsearch/routing/PartitionedRoutingIT.java b/core/src/test/java/org/elasticsearch/routing/PartitionedRoutingIT.java new file mode 100644 index 00000000000..14634116f04 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/routing/PartitionedRoutingIT.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.routing; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ESIntegTestCase; +import org.mockito.internal.util.collections.Sets; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class PartitionedRoutingIT extends ESIntegTestCase { + + public void testVariousPartitionSizes() throws Exception { + for (int shards = 1; shards <= 4; shards++) { + for (int partitionSize = 1; partitionSize < shards; partitionSize++) { + String index = "index_" + shards + "_" + partitionSize; + + client().admin().indices().prepareCreate(index) + .setSettings(Settings.builder() + .put("index.number_of_shards", shards) + .put("index.routing_partition_size", partitionSize)) + .addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}") + .execute().actionGet(); + ensureGreen(); + + Map> routingToDocumentIds = generateRoutedDocumentIds(index); + + verifyGets(index, routingToDocumentIds); + verifyBroadSearches(index, routingToDocumentIds, shards); + verifyRoutedSearches(index, routingToDocumentIds, Sets.newSet(partitionSize)); + } + } + } + + public void testShrinking() throws Exception { + // creates random routing groups and repeatedly halves the index until it is down to 1 shard + // verifying that the count is correct for each shrunken index + final int partitionSize = 3; + final int originalShards = 8; + int currentShards = originalShards; + String index = "index_" + currentShards; + + client().admin().indices().prepareCreate(index) + .setSettings(Settings.builder() + .put("index.number_of_shards", currentShards) + .put("index.routing_partition_size", partitionSize)) + .addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}") + .execute().actionGet(); + ensureGreen(); + + Map> routingToDocumentIds = generateRoutedDocumentIds(index); + + while (true) { + int factor = originalShards / currentShards; + + verifyGets(index, routingToDocumentIds); + verifyBroadSearches(index, routingToDocumentIds, currentShards); + + // we need the floor and ceiling of the routing_partition_size / factor since the partition size of the shrunken + // index will be one of those, depending on the routing value + verifyRoutedSearches(index, routingToDocumentIds, + Math.floorDiv(partitionSize, factor) == 0 ? + Sets.newSet(1, 2) : + Sets.newSet(Math.floorDiv(partitionSize, factor), -Math.floorDiv(-partitionSize, factor))); + + client().admin().indices().prepareUpdateSettings(index) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes().values().toArray(DiscoveryNode.class)[0].getName()) + .put("index.blocks.write", true)).get(); + ensureGreen(); + + currentShards = Math.floorDiv(currentShards, 2); + + if (currentShards == 0) { + break; + } + + String previousIndex = index; + index = "index_" + currentShards; + + logger.info("--> shrinking index [" + previousIndex + "] to [" + index + "]"); + client().admin().indices().prepareShrinkIndex(previousIndex, index) + .setSettings(Settings.builder() + .put("index.number_of_shards", currentShards) + .build()).get(); + ensureGreen(); + } + } + + private void verifyRoutedSearches(String index, Map> routingToDocumentIds, Set expectedShards) { + for (Map.Entry> routingEntry : routingToDocumentIds.entrySet()) { + String routing = routingEntry.getKey(); + int expectedDocuments = routingEntry.getValue().size(); + + SearchResponse response = client().prepareSearch() + .setQuery(QueryBuilders.termQuery("_routing", routing)) + .setRouting(routing) + .setIndices(index) + .setSize(100) + .execute().actionGet(); + + logger.info("--> routed search on index [" + index + "] visited [" + response.getTotalShards() + + "] shards for routing [" + routing + "] and got hits [" + response.getHits().totalHits() + "]"); + + assertTrue(response.getTotalShards() + " was not in " + expectedShards + " for " + index, + expectedShards.contains(response.getTotalShards())); + assertEquals(expectedDocuments, response.getHits().totalHits()); + + Set found = new HashSet<>(); + response.getHits().forEach(h -> found.add(h.getId())); + + assertEquals(routingEntry.getValue(), found); + } + } + + private void verifyBroadSearches(String index, Map> routingToDocumentIds, int expectedShards) { + for (Map.Entry> routingEntry : routingToDocumentIds.entrySet()) { + String routing = routingEntry.getKey(); + int expectedDocuments = routingEntry.getValue().size(); + + SearchResponse response = client().prepareSearch() + .setQuery(QueryBuilders.termQuery("_routing", routing)) + .setIndices(index) + .setSize(100) + .execute().actionGet(); + + assertEquals(expectedShards, response.getTotalShards()); + assertEquals(expectedDocuments, response.getHits().totalHits()); + + Set found = new HashSet<>(); + response.getHits().forEach(h -> found.add(h.getId())); + + assertEquals(routingEntry.getValue(), found); + } + } + + private void verifyGets(String index, Map> routingToDocumentIds) { + for (Map.Entry> routingEntry : routingToDocumentIds.entrySet()) { + String routing = routingEntry.getKey(); + + for (String id : routingEntry.getValue()) { + assertTrue(client().prepareGet(index, "type", id).setRouting(routing).execute().actionGet().isExists()); + } + } + } + + private Map> generateRoutedDocumentIds(String index) { + Map> routingToDocumentIds = new HashMap<>(); + int numRoutingValues = randomIntBetween(5, 15); + + for (int i = 0; i < numRoutingValues; i++) { + String routingValue = String.valueOf(i); + int numDocuments = randomIntBetween(10, 100); + routingToDocumentIds.put(String.valueOf(routingValue), new HashSet<>()); + + for (int k = 0; k < numDocuments; k++) { + String id = routingValue + "_" + String.valueOf(k); + routingToDocumentIds.get(routingValue).add(id); + + client().prepareIndex(index, "type", id) + .setRouting(routingValue) + .setSource("foo", "bar") + .get(); + } + } + + client().admin().indices().prepareRefresh(index).get(); + + return routingToDocumentIds; + } + + +} diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 7c0a28762a5..78a5fa69042 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -51,6 +52,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; @@ -162,6 +164,33 @@ public class MockRepository extends FsRepository { blockOnDataFiles = blocked; } + @Override + public RepositoryData getRepositoryData() { + final int numIterations = 5; + int count = 0; + NotXContentException ex = null; + RepositoryData repositoryData = null; + while (count < numIterations) { + try { + repositoryData = super.getRepositoryData(); + } catch (NotXContentException e) { + ex = e; + } + if (repositoryData != null) { + break; + } + count++; + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + } + } + if (ex != null) { + throw ex; + } + return repositoryData; + } + public void blockOnControlFiles(boolean blocked) { blockOnControlFiles = blocked; } diff --git a/docs/reference/getting-started.asciidoc b/docs/reference/getting-started.asciidoc index 529272a44e4..870ad84a040 100755 --- a/docs/reference/getting-started.asciidoc +++ b/docs/reference/getting-started.asciidoc @@ -216,6 +216,7 @@ And the response: epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent 1475247709 17:01:49 elasticsearch green 1 1 0 0 0 0 0 0 - 100.0% -------------------------------------------------- +// TESTRESPONSE[s/0 0/0 [01]/] // TESTRESPONSE[s/1475247709 17:01:49 elasticsearch/\\d+ \\d+:\\d+:\\d+ docs_integTest/ _cat] We can see that our cluster named "elasticsearch" is up with a green status. diff --git a/docs/reference/how-to.asciidoc b/docs/reference/how-to.asciidoc index d709e17bb4e..accdb8e8915 100644 --- a/docs/reference/how-to.asciidoc +++ b/docs/reference/how-to.asciidoc @@ -4,7 +4,7 @@ [partintro] -- Elasticsearch ships with defaults which are intended to give a good out of -the box experience. Full text search, highlighting, aggregations, indexing +the box experience. Full text search, highlighting, aggregations, and indexing should all just work without the user having to change anything. Once you better understand how you want to use Elasticsearch, however, diff --git a/docs/reference/how-to/indexing-speed.asciidoc b/docs/reference/how-to/indexing-speed.asciidoc index b0bd5fef802..6d7f66c2cd6 100644 --- a/docs/reference/how-to/indexing-speed.asciidoc +++ b/docs/reference/how-to/indexing-speed.asciidoc @@ -17,17 +17,17 @@ it is advisable to avoid going beyond a couple tens of megabytes per request even if larger requests seem to perform better. [float] -=== Use multiple workers/threads to send data to elasticsearch +=== Use multiple workers/threads to send data to Elasticsearch A single thread sending bulk requests is unlikely to be able to max out the -indexing capacity of an elasticsearch cluster. In order to use all resources +indexing capacity of an Elasticsearch cluster. In order to use all resources of the cluster, you should send data from multiple threads or processes. In addition to making better use of the resources of the cluster, this should help reduce the cost of each fsync. Make sure to watch for `TOO_MANY_REQUESTS (429)` response codes (`EsRejectedExecutionException` with the Java client), which is the way that -elasticsearch tells you that it cannot keep up with the current indexing rate. +Elasticsearch tells you that it cannot keep up with the current indexing rate. When it happens, you should pause indexing a bit before trying again, ideally with randomized exponential backoff. @@ -39,7 +39,7 @@ number of workers until either I/O or CPU is saturated on the cluster. === Increase the refresh interval The default <> is `1s`, which -forces elasticsearch to create a new segment every second. +forces Elasticsearch to create a new segment every second. Increasing this value (to say, `30s`) will allow larger segments to flush and decreases future merge pressure. diff --git a/docs/reference/how-to/recipes.asciidoc b/docs/reference/how-to/recipes.asciidoc index 0bb158f88e8..4d1a4b67a2b 100644 --- a/docs/reference/how-to/recipes.asciidoc +++ b/docs/reference/how-to/recipes.asciidoc @@ -169,9 +169,9 @@ the query need to be matched exactly while other parts should still take stemming into account? Fortunately, the `query_string` and `simple_query_string` queries have a feature -that allows to solve exactly this problem: `quote_field_suffix`. It allows to -tell Elasticsearch that words that appear in between quotes should be redirected -to a different field, see below: +that solve this exact problem: `quote_field_suffix`. This tell Elasticsearch +that the words that appear in between quotes are to be redirected to a different +field, see below: [source,js] -------------------------------------------------- @@ -218,7 +218,7 @@ GET index/_search -------------------------------------------------- // TESTRESPONSE[s/"took": 2,/"took": "$body.took",/] -In that case, since `ski` was in-between quotes, it was searched on the +In the above case, since `ski` was in-between quotes, it was searched on the `body.exact` field due to the `quote_field_suffix` parameter, so only document `1` matched. This allows users to mix exact search with stemmed search as they like. diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 28e9e6a114e..2d2b6600faf 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -79,6 +79,13 @@ Checking shards may take a lot of time on large indices. which uses https://en.wikipedia.org/wiki/DEFLATE[DEFLATE] for a higher compression ratio, at the expense of slower stored fields performance. +[[routing-partition-size]] `index.routing_partition_size`:: + + The number of shards a custom <> value can go to. + Defaults to 1 and can only be set at index creation time. This value must be less + than the `index.number_of_shards` unless the `index.number_of_shards` value is also 1. + See <> for more details about how this setting is used. + [float] [[dynamic-index-settings]] === Dynamic index settings diff --git a/docs/reference/mapping/fields/routing-field.asciidoc b/docs/reference/mapping/fields/routing-field.asciidoc index bbdef3370c4..ebb659af344 100644 --- a/docs/reference/mapping/fields/routing-field.asciidoc +++ b/docs/reference/mapping/fields/routing-field.asciidoc @@ -109,3 +109,29 @@ documents with the same `_id` might end up on different shards if indexed with different `_routing` values. It is up to the user to ensure that IDs are unique across the index. + +[[routing-index-partition]] +==== Routing to an index partition + +An index can be configured such that custom routing values will go to a subset of the shards rather +than a single shard. This helps mitigate the risk of ending up with an imbalanced cluster while still +reducing the impact of searches. + +This is done by providing the index level setting <> at index creation. +As the partition size increases, the more evenly distributed the data will become at the +expense of having to search more shards per request. + +When this setting is present, the formula for calculating the shard becomes: + + shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards + +That is, the `_routing` field is used to calculate a set of shards within the index and then the +`_id` is used to pick a shard within that set. + +To enable this feature, the `index.routing_partition_size` should have a value greater than 1 and +less than `index.number_of_shards`. + +Once enabled, the partitioned index will have the following limitations: + +* Mappings with parent-child relationships cannot be created within it. +* All mappings within the index must have the `_routing` field marked as required. \ No newline at end of file diff --git a/docs/reference/mapping/params/index.asciidoc b/docs/reference/mapping/params/index.asciidoc index e097293d142..270c930e284 100644 --- a/docs/reference/mapping/params/index.asciidoc +++ b/docs/reference/mapping/params/index.asciidoc @@ -4,3 +4,4 @@ The `index` option controls whether field values are indexed. It accepts `true` or `false`. Fields that are not indexed are not queryable. +NOTE: For the legacy mapping type <> the `index` option only accepts legacy values `analyzed` (default, treat as full-text field), `not_analyzed` (treat as keyword field) and `no`. diff --git a/docs/reference/mapping/types/string.asciidoc b/docs/reference/mapping/types/string.asciidoc index b6a5fef86eb..c4be060cc40 100644 --- a/docs/reference/mapping/types/string.asciidoc +++ b/docs/reference/mapping/types/string.asciidoc @@ -12,7 +12,7 @@ to use `text` or `keyword`. Indexes imported from 2.x *only* support `string` and not `text` or `keyword`. To ease the migration from 2.x Elasticsearch will downgrade `text` and `keyword` -mappings applied to indexes imported to 2.x into `string`. While long lived +mappings applied to indexes imported from 2.x into `string`. While long lived indexes will eventually need to be recreated against 5.x before eventually upgrading to 6.x, this downgrading smooths the process before you find time for it. diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java index bd7ef2bc512..37d72ac4c0d 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java @@ -152,7 +152,7 @@ public class MultiSearchTemplateResponse extends ActionResponse implements Itera for (Item item : items) { if (item.isFailure()) { builder.startObject(); - ElasticsearchException.renderException(builder, params, item.getFailure()); + ElasticsearchException.generateFailureXContent(builder, params, item.getFailure(), true); builder.endObject(); } else { item.getResponse().toXContent(builder, params); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 7de4f19339f..6d2151a1405 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -71,6 +71,11 @@ public abstract class BulkByScrollTask extends CancellableTask { */ public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo); + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + public static class Status implements Task.Status, SuccessfullyProcessed { public static final String NAME = "bulk-by-scroll"; @@ -486,7 +491,7 @@ public abstract class BulkByScrollTask extends CancellableTask { status.toXContent(builder, params); } else { builder.startObject(); - ElasticsearchException.toXContent(builder, params, exception); + ElasticsearchException.generateThrowableXContent(builder, params, exception); builder.endObject(); } return builder; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java index b2dbd51f381..e14b33b5db5 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java @@ -47,9 +47,6 @@ public class ReindexParallelizationHelper { r -> task.onSliceResponse(listener, slice.source().slice().getId(), r), e -> task.onSliceFailure(listener, slice.source().slice().getId(), e)); client.execute(action, requestForSlice, sliceListener); - /* Explicitly tell the task manager that we're running child tasks on the local node so it will cancel them when the parent is - * cancelled. */ - taskManager.registerChildTask(task, localNodeId); } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index 1945c6e2f76..c48858fcf20 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -348,7 +348,7 @@ public abstract class ScrollableHitSource implements Closeable { builder.field("reason"); { builder.startObject(); - ElasticsearchException.toXContent(builder, params, reason); + ElasticsearchException.generateThrowableXContent(builder, params, reason); builder.endObject(); } builder.endObject(); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 932d4de9174..e54d7c72efe 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -604,7 +604,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * that good stuff. */ if (delay.nanos() > 0) { - generic().execute(() -> taskManager.cancel(testTask, reason, (Set s) -> {})); + generic().execute(() -> taskManager.cancel(testTask, reason, () -> {})); } return super.schedule(delay, name, command); } @@ -637,7 +637,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { action.setScroll(scrollId()); } String reason = randomSimpleString(random()); - taskManager.cancel(testTask, reason, (Set s) -> {}); + taskManager.cancel(testTask, reason, () -> {}); testMe.accept(action); assertEquals(reason, listener.get().getReasonCancelled()); if (previousScrollSet) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 10a0a159c23..b9a81010a72 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -406,14 +406,14 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem if (SETTING_CORS_ALLOW_CREDENTIALS.get(settings)) { builder.allowCredentials(); } - String[] strMethods = Strings.splitStringByCommaToArray(SETTING_CORS_ALLOW_METHODS.get(settings)); + String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ","); HttpMethod[] methods = Arrays.asList(strMethods) .stream() .map(HttpMethod::valueOf) .toArray(size -> new HttpMethod[size]); return builder.allowedRequestMethods(methods) .maxAge(SETTING_CORS_MAX_AGE.get(settings)) - .allowedRequestHeaders(Strings.splitStringByCommaToArray(SETTING_CORS_ALLOW_HEADERS.get(settings))) + .allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ",")) .shortCircuit() .build(); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index e3dd6d8a78e..69dcaad9dd3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -51,6 +51,7 @@ import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; +import static org.elasticsearch.common.Strings.collectionToDelimitedString; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS; @@ -89,11 +90,12 @@ public class Netty4HttpServerTransportTests extends ESTestCase { public void testCorsConfig() { final Set methods = new HashSet<>(Arrays.asList("get", "options", "post")); final Set headers = new HashSet<>(Arrays.asList("Content-Type", "Content-Length")); + final String suffix = randomBoolean() ? " " : ""; // sometimes have a leading whitespace between comma delimited elements final Settings settings = Settings.builder() .put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "*") - .put(SETTING_CORS_ALLOW_METHODS.getKey(), Strings.collectionToCommaDelimitedString(methods)) - .put(SETTING_CORS_ALLOW_HEADERS.getKey(), Strings.collectionToCommaDelimitedString(headers)) + .put(SETTING_CORS_ALLOW_METHODS.getKey(), collectionToDelimitedString(methods, ",", suffix, "")) + .put(SETTING_CORS_ALLOW_HEADERS.getKey(), collectionToDelimitedString(headers, ",", suffix, "")) .put(SETTING_CORS_ALLOW_CREDENTIALS.getKey(), true) .build(); final Netty4CorsConfig corsConfig = Netty4HttpServerTransport.buildCorsConfig(settings); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/_common.json b/rest-api-spec/src/main/resources/rest-api-spec/api/_common.json new file mode 100644 index 00000000000..fec522be4a1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/_common.json @@ -0,0 +1,29 @@ +{ + "description": "Parameters that are accepted by all API endpoints.", + "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html", + "params": { + "pretty": { + "type": "boolean", + "description": "Pretty format the returned JSON response.", + "default": false + }, + "human": { + "type": "boolean", + "description": "Return human readable values for statistics.", + "default": true + }, + "error_trace": { + "type": "boolean", + "description": "Include the stack trace of returned errors.", + "default": false + }, + "source": { + "type": "string", + "description": "The URL-encoded request definition. Useful for libraries that do not accept a request body for non-POST requests." + }, + "filter_path": { + "type": "string", + "description": "A comma-separated list of filters used to reduce the respone." + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java index 9203b632cba..69291ccaba6 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java +++ b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java @@ -138,4 +138,41 @@ public class NodeTests extends ESTestCase { } + public void testNodeAttributes() throws IOException { + String attr = randomAsciiOfLength(5); + Settings.Builder settings = baseSettings().put(Node.NODE_ATTRIBUTES.getKey() + "test_attr", attr); + try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) { + final Settings nodeSettings = randomBoolean() ? node.settings() : node.getEnvironment().settings(); + assertEquals(attr, Node.NODE_ATTRIBUTES.get(nodeSettings).getAsMap().get("test_attr")); + } + + // leading whitespace not allowed + attr = " leading"; + settings = baseSettings().put(Node.NODE_ATTRIBUTES.getKey() + "test_attr", attr); + try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) { + fail("should not allow a node attribute with leading whitespace"); + } catch (IllegalArgumentException e) { + assertEquals("node.attr.test_attr cannot have leading or trailing whitespace [ leading]", e.getMessage()); + } + + // trailing whitespace not allowed + attr = "trailing "; + settings = baseSettings().put(Node.NODE_ATTRIBUTES.getKey() + "test_attr", attr); + try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) { + fail("should not allow a node attribute with trailing whitespace"); + } catch (IllegalArgumentException e) { + assertEquals("node.attr.test_attr cannot have leading or trailing whitespace [trailing ]", e.getMessage()); + } + } + + private static Settings.Builder baseSettings() { + final Path tempDir = createTempDir(); + return Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", randomLong())) + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) + .put(NetworkModule.HTTP_ENABLED.getKey(), false) + .put("transport.type", "mock-socket-network") + .put(Node.NODE_DATA_SETTING.getKey(), true); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestClient.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestClient.java index 748a08384ca..c18dad907bd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestClient.java @@ -32,7 +32,6 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestPath; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec; @@ -44,7 +43,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * Used by {@link ESClientYamlSuiteTestCase} to execute REST requests according to the tests written in yaml suite files. Wraps a @@ -53,11 +51,6 @@ import java.util.Set; */ public class ClientYamlTestClient { private static final Logger logger = Loggers.getLogger(ClientYamlTestClient.class); - /** - * Query params that don't need to be declared in the spec, they are supported by default. - */ - private static final Set ALWAYS_ACCEPTED_QUERY_STRING_PARAMS = Sets.newHashSet( - "ignore", "error_trace", "human", "filter_path", "pretty", "source"); private final ClientYamlSuiteRestSpec restSpec; private final RestClient restClient; @@ -108,7 +101,8 @@ public class ClientYamlTestClient { if (restApi.getPathParts().contains(entry.getKey())) { pathParts.put(entry.getKey(), entry.getValue()); } else { - if (restApi.getParams().contains(entry.getKey()) || ALWAYS_ACCEPTED_QUERY_STRING_PARAMS.contains(entry.getKey())) { + if (restApi.getParams().contains(entry.getKey()) || restSpec.isGlobalParameter(entry.getKey()) + || restSpec.isClientParameter(entry.getKey())) { queryStringParams.put(entry.getKey(), entry.getValue()); } else { throw new IllegalArgumentException("param [" + entry.getKey() + "] not supported in [" diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestSpec.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestSpec.java index 4efed709e22..15f2f7e3016 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestSpec.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestSpec.java @@ -30,18 +30,21 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * Holds the specification used to turn {@code do} actions in the YAML suite into REST api calls. */ public class ClientYamlSuiteRestSpec { - Map restApiMap = new HashMap<>(); + private final Set globalParameters = new HashSet<>(); + private final Map restApiMap = new HashMap<>(); private ClientYamlSuiteRestSpec() { } - void addApi(ClientYamlSuiteRestApi restApi) { + private void addApi(ClientYamlSuiteRestApi restApi) { ClientYamlSuiteRestApi previous = restApiMap.putIfAbsent(restApi.getName(), restApi); if (previous != null) { throw new IllegalArgumentException("cannot register api [" + restApi.getName() + "] found in [" + restApi.getLocation() + "]. " @@ -57,6 +60,21 @@ public class ClientYamlSuiteRestSpec { return restApiMap.values(); } + /** + * Returns whether the provided parameter is one of those parameters that are supported by all Elasticsearch api + */ + public boolean isGlobalParameter(String param) { + return globalParameters.contains(param); + } + + /** + * Returns whether the provided parameter is one of those parameters that are supported by the Elasticsearch language clients, meaning + * that they influence the client behaviour and don't get sent to Elasticsearch + */ + public boolean isClientParameter(String name) { + return "ignore".equals(name); + } + /** * Parses the complete set of REST spec available under the provided directories */ @@ -66,15 +84,39 @@ public class ClientYamlSuiteRestSpec { for (String path : paths) { for (Path jsonFile : FileUtils.findJsonSpec(fileSystem, optionalPathPrefix, path)) { try (InputStream stream = Files.newInputStream(jsonFile)) { + String filename = jsonFile.getFileName().toString(); try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, stream)) { - ClientYamlSuiteRestApi restApi = restApiParser.parse(jsonFile.toString(), parser); - String filename = jsonFile.getFileName().toString(); - String expectedApiName = filename.substring(0, filename.lastIndexOf('.')); - if (restApi.getName().equals(expectedApiName) == false) { - throw new IllegalArgumentException("found api [" + restApi.getName() + "] in [" + jsonFile.toString() + "]. " + - "Each api is expected to have the same name as the file that defines it."); + if (filename.equals("_common.json")) { + String currentFieldName = null; + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + if (parser.currentToken() == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (parser.currentToken() == XContentParser.Token.START_OBJECT + && "params".equals(currentFieldName)) { + while (parser.nextToken() == XContentParser.Token.FIELD_NAME) { + String param = parser.currentName(); + if (restSpec.globalParameters.contains(param)) { + throw new IllegalArgumentException("Found duplicate global param [" + param + "]"); + } + restSpec.globalParameters.add(param); + parser.nextToken(); + if (parser.currentToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Expected params field in rest api definition to " + + "contain an object"); + } + parser.skipChildren(); + } + } + } + } else { + ClientYamlSuiteRestApi restApi = restApiParser.parse(jsonFile.toString(), parser); + String expectedApiName = filename.substring(0, filename.lastIndexOf('.')); + if (restApi.getName().equals(expectedApiName) == false) { + throw new IllegalArgumentException("found api [" + restApi.getName() + "] in [" + jsonFile.toString() + + "]. " + "Each api is expected to have the same name as the file that defines it."); + } + restSpec.addApi(restApi); } - restSpec.addApi(restApi); } } catch (Exception ex) { throw new IOException("Can't parse rest spec file: [" + jsonFile + "]", ex);