Merge branch 'master' into feature/rank-eval
This commit is contained in:
commit
2b24091361
|
@ -27,6 +27,10 @@ subprojects {
|
|||
group = 'org.elasticsearch'
|
||||
version = org.elasticsearch.gradle.VersionProperties.elasticsearch
|
||||
description = "Elasticsearch subproject ${project.path}"
|
||||
}
|
||||
|
||||
// setup pom license info, but only for artifacts that are part of elasticsearch
|
||||
configure(subprojects.findAll { it.path.startsWith(':x-plugins') == false }) {
|
||||
|
||||
// we only use maven publish to add tasks for pom generation
|
||||
plugins.withType(MavenPublishPlugin).whenPluginAdded {
|
||||
|
|
|
@ -286,6 +286,9 @@ class ClusterFormationTasks {
|
|||
esConfig['node.max_local_storage_nodes'] = node.config.numNodes
|
||||
esConfig['http.port'] = node.config.httpPort
|
||||
esConfig['transport.tcp.port'] = node.config.transportPort
|
||||
// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
|
||||
esConfig['cluster.routing.allocation.disk.watermark.low'] = '1b'
|
||||
esConfig['cluster.routing.allocation.disk.watermark.high'] = '1b'
|
||||
esConfig.putAll(node.config.settings)
|
||||
|
||||
Task writeConfig = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -33,29 +32,25 @@ import java.io.IOException;
|
|||
public class ClusterSearchShardsGroup implements Streamable, ToXContent {
|
||||
|
||||
private ShardId shardId;
|
||||
ShardRouting[] shards;
|
||||
private ShardRouting[] shards;
|
||||
|
||||
ClusterSearchShardsGroup() {
|
||||
private ClusterSearchShardsGroup() {
|
||||
|
||||
}
|
||||
|
||||
public ClusterSearchShardsGroup(ShardId shardId, ShardRouting[] shards) {
|
||||
ClusterSearchShardsGroup(ShardId shardId, ShardRouting[] shards) {
|
||||
this.shardId = shardId;
|
||||
this.shards = shards;
|
||||
}
|
||||
|
||||
public static ClusterSearchShardsGroup readSearchShardsGroupResponse(StreamInput in) throws IOException {
|
||||
static ClusterSearchShardsGroup readSearchShardsGroupResponse(StreamInput in) throws IOException {
|
||||
ClusterSearchShardsGroup response = new ClusterSearchShardsGroup();
|
||||
response.readFrom(in);
|
||||
return response;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return shardId.getIndexName();
|
||||
}
|
||||
|
||||
public int getShardId() {
|
||||
return shardId.id();
|
||||
public ShardId getShardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
public ShardRouting[] getShards() {
|
||||
|
|
|
@ -38,12 +38,11 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
|
|||
private DiscoveryNode[] nodes;
|
||||
private Map<String, AliasFilter> indicesAndFilters;
|
||||
|
||||
ClusterSearchShardsResponse() {
|
||||
public ClusterSearchShardsResponse() {
|
||||
|
||||
}
|
||||
|
||||
ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
|
||||
Map<String, AliasFilter> indicesAndFilters) {
|
||||
ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, Map<String, AliasFilter> indicesAndFilters) {
|
||||
this.groups = groups;
|
||||
this.nodes = nodes;
|
||||
this.indicesAndFilters = indicesAndFilters;
|
||||
|
@ -116,7 +115,8 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
|
|||
String index = entry.getKey();
|
||||
builder.startObject(index);
|
||||
AliasFilter aliasFilter = entry.getValue();
|
||||
if (aliasFilter.getQueryBuilder() != null) {
|
||||
if (aliasFilter.getAliases().length > 0) {
|
||||
builder.array("aliases", aliasFilter.getAliases());
|
||||
builder.field("filter");
|
||||
aliasFilter.getQueryBuilder().toXContent(builder, params);
|
||||
}
|
||||
|
|
|
@ -93,8 +93,6 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
this.aliasFilter = aliasFilter;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void start() {
|
||||
if (expectedSuccessfulOps == 0) {
|
||||
//no search shards to search on, bail with empty response
|
||||
|
@ -125,7 +123,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
if (node == null) {
|
||||
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
||||
} else {
|
||||
AliasFilter filter = this.aliasFilter.get(shard.index().getName());
|
||||
AliasFilter filter = this.aliasFilter.get(shard.index().getUUID());
|
||||
assert filter != null;
|
||||
ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(),
|
||||
filter, startTime());
|
||||
sendExecuteFirstPhase(node, transportRequest , new ActionListener<FirstResult>() {
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -72,14 +73,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
this.searchService = searchService;
|
||||
}
|
||||
|
||||
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, String...concreteIndices) {
|
||||
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, Index[] concreteIndices) {
|
||||
final Map<String, AliasFilter> aliasFilterMap = new HashMap<>();
|
||||
for (String index : concreteIndices) {
|
||||
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
|
||||
AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, index, request.indices());
|
||||
if (aliasFilter != null) {
|
||||
aliasFilterMap.put(index, aliasFilter);
|
||||
}
|
||||
for (Index index : concreteIndices) {
|
||||
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName());
|
||||
AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, index.getName(), request.indices());
|
||||
assert aliasFilter != null;
|
||||
aliasFilterMap.put(index.getUUID(), aliasFilter);
|
||||
}
|
||||
return aliasFilterMap;
|
||||
}
|
||||
|
@ -94,11 +94,15 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
|
||||
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
|
||||
// of just for the _search api
|
||||
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, searchRequest.indicesOptions(),
|
||||
Index[] indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
|
||||
startTimeInMillis, searchRequest.indices());
|
||||
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, concreteIndices);
|
||||
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices);
|
||||
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
|
||||
searchRequest.indices());
|
||||
String[] concreteIndices = new String[indices.length];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
concreteIndices[i] = indices[i].getName();
|
||||
}
|
||||
GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
|
||||
searchRequest.preference());
|
||||
failIfOverShardCountLimit(clusterService, shardIterators.size());
|
||||
|
|
|
@ -111,9 +111,10 @@ public final class AutoCreateIndex {
|
|||
try {
|
||||
String[] patterns = Strings.commaDelimitedListToStringArray(value);
|
||||
for (String pattern : patterns) {
|
||||
if (pattern == null || pattern.length() == 0) {
|
||||
if (pattern == null || pattern.trim().length() == 0) {
|
||||
throw new IllegalArgumentException("Can't parse [" + value + "] for setting [action.auto_create_index] must be either [true, false, or a comma separated list of index patterns]");
|
||||
}
|
||||
pattern = pattern.trim();
|
||||
Tuple<String, Boolean> expression;
|
||||
if (pattern.startsWith("-")) {
|
||||
if (pattern.length() == 1) {
|
||||
|
|
|
@ -391,7 +391,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
|
|||
logger.trace("connecting to listed node (light) [{}]", listedNode);
|
||||
transportService.connectToNodeLight(listedNode);
|
||||
} catch (Exception e) {
|
||||
logger.debug(
|
||||
logger.info(
|
||||
(Supplier<?>)
|
||||
() -> new ParameterizedMessage("failed to connect to node [{}], removed from nodes list", listedNode), e);
|
||||
hostFailureListener.onNodeDisconnected(listedNode, e);
|
||||
|
|
|
@ -130,9 +130,9 @@ public class IndexNameExpressionResolver extends AbstractComponent {
|
|||
* @throws IllegalArgumentException if one of the aliases resolve to multiple indices and the provided
|
||||
* indices options in the context don't allow such a case.
|
||||
*/
|
||||
public String[] concreteIndexNames(ClusterState state, IndicesOptions options, long startTime, String... indexExpressions) {
|
||||
public Index[] concreteIndices(ClusterState state, IndicesOptions options, long startTime, String... indexExpressions) {
|
||||
Context context = new Context(state, options, startTime);
|
||||
return concreteIndexNames(context, indexExpressions);
|
||||
return concreteIndices(context, indexExpressions);
|
||||
}
|
||||
|
||||
String[] concreteIndexNames(Context context, String... indexExpressions) {
|
||||
|
|
|
@ -1,153 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Instances of this class keeps explanations of decisions that have been made by allocation.
|
||||
* An {@link AllocationExplanation} consists of a set of per node explanations.
|
||||
* Since {@link NodeExplanation}s are related to shards an {@link AllocationExplanation} maps
|
||||
* a shards id to a set of {@link NodeExplanation}s.
|
||||
*/
|
||||
public class AllocationExplanation implements Streamable {
|
||||
|
||||
public static final AllocationExplanation EMPTY = new AllocationExplanation();
|
||||
|
||||
/**
|
||||
* Instances of this class keep messages and informations about nodes of an allocation
|
||||
*/
|
||||
public static class NodeExplanation {
|
||||
private final DiscoveryNode node;
|
||||
|
||||
private final String description;
|
||||
|
||||
/**
|
||||
* Creates a new {@link NodeExplanation}
|
||||
*
|
||||
* @param node node referenced by this {@link NodeExplanation}
|
||||
* @param description a message associated with the given node
|
||||
*/
|
||||
public NodeExplanation(DiscoveryNode node, String description) {
|
||||
this.node = node;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
/**
|
||||
* The node referenced by the explanation
|
||||
* @return referenced node
|
||||
*/
|
||||
public DiscoveryNode node() {
|
||||
return node;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the explanation for the node
|
||||
* @return explanation for the node
|
||||
*/
|
||||
public String description() {
|
||||
return description;
|
||||
}
|
||||
}
|
||||
|
||||
private final Map<ShardId, List<NodeExplanation>> explanations = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Create and add a node explanation to this explanation referencing a shard
|
||||
* @param shardId id the of the referenced shard
|
||||
* @param nodeExplanation Explanation itself
|
||||
* @return AllocationExplanation involving the explanation
|
||||
*/
|
||||
public AllocationExplanation add(ShardId shardId, NodeExplanation nodeExplanation) {
|
||||
List<NodeExplanation> list = explanations.get(shardId);
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
explanations.put(shardId, list);
|
||||
}
|
||||
list.add(nodeExplanation);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* List of explanations involved by this AllocationExplanation
|
||||
* @return Map of shard ids and corresponding explanations
|
||||
*/
|
||||
public Map<ShardId, List<NodeExplanation>> explanations() {
|
||||
return this.explanations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an {@link AllocationExplanation} from an {@link StreamInput}
|
||||
* @param in {@link StreamInput} to read from
|
||||
* @return a new {@link AllocationExplanation} read from the stream
|
||||
* @throws IOException if something bad happened while reading
|
||||
*/
|
||||
public static AllocationExplanation readAllocationExplanation(StreamInput in) throws IOException {
|
||||
AllocationExplanation e = new AllocationExplanation();
|
||||
e.readFrom(in);
|
||||
return e;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
ShardId shardId = ShardId.readShardId(in);
|
||||
int size2 = in.readVInt();
|
||||
List<NodeExplanation> ne = new ArrayList<>(size2);
|
||||
for (int j = 0; j < size2; j++) {
|
||||
DiscoveryNode node = null;
|
||||
if (in.readBoolean()) {
|
||||
node = new DiscoveryNode(in);
|
||||
}
|
||||
ne.add(new NodeExplanation(node, in.readString()));
|
||||
}
|
||||
explanations.put(shardId, ne);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(explanations.size());
|
||||
for (Map.Entry<ShardId, List<NodeExplanation>> entry : explanations.entrySet()) {
|
||||
entry.getKey().writeTo(out);
|
||||
out.writeVInt(entry.getValue().size());
|
||||
for (NodeExplanation nodeExplanation : entry.getValue()) {
|
||||
if (nodeExplanation.node() == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
nodeExplanation.node().writeTo(out);
|
||||
}
|
||||
out.writeString(nodeExplanation.description());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
@ -61,8 +60,6 @@ public class RoutingAllocation {
|
|||
|
||||
private final ImmutableOpenMap<String, ClusterState.Custom> customs;
|
||||
|
||||
private final AllocationExplanation explanation = new AllocationExplanation();
|
||||
|
||||
private final ClusterInfo clusterInfo;
|
||||
|
||||
private Map<ShardId, Set<String>> ignoredShardToNodes = null;
|
||||
|
@ -162,14 +159,6 @@ public class RoutingAllocation {
|
|||
return customs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get explanations of current routing
|
||||
* @return explanation of routing
|
||||
*/
|
||||
public AllocationExplanation explanation() {
|
||||
return explanation;
|
||||
}
|
||||
|
||||
public void ignoreDisable(boolean ignoreDisable) {
|
||||
this.ignoreDisable = ignoreDisable;
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
|||
private final Setting.Property scope;
|
||||
private static final Pattern KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])*[-\\w]+$");
|
||||
private static final Pattern GROUP_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+$");
|
||||
private static final Pattern AFFIX_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+(?:[*][.])+[-\\w]+$");
|
||||
|
||||
protected AbstractScopedSettings(Settings settings, Set<Setting<?>> settingsSet, Setting.Property scope) {
|
||||
super(settings);
|
||||
|
@ -86,7 +87,8 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
|||
}
|
||||
|
||||
protected void validateSettingKey(Setting setting) {
|
||||
if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) {
|
||||
if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())
|
||||
|| isValidAffixKey(setting.getKey())) == false) {
|
||||
throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]");
|
||||
}
|
||||
}
|
||||
|
@ -111,6 +113,10 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
|||
return GROUP_KEY_PATTERN.matcher(key).matches();
|
||||
}
|
||||
|
||||
private static boolean isValidAffixKey(String key) {
|
||||
return AFFIX_KEY_PATTERN.matcher(key).matches();
|
||||
}
|
||||
|
||||
public Setting.Property getScope() {
|
||||
return this.scope;
|
||||
}
|
||||
|
@ -372,14 +378,10 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
|||
public Settings diff(Settings source, Settings defaultSettings) {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
for (Setting<?> setting : keySettings.values()) {
|
||||
if (setting.exists(source) == false) {
|
||||
builder.put(setting.getKey(), setting.getRaw(defaultSettings));
|
||||
}
|
||||
setting.diff(builder, source, defaultSettings);
|
||||
}
|
||||
for (Setting<?> setting : complexMatchers.values()) {
|
||||
if (setting.exists(source) == false) {
|
||||
builder.put(setting.getKey(), setting.getRaw(defaultSettings));
|
||||
}
|
||||
setting.diff(builder, source, defaultSettings);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -311,6 +311,19 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add this setting to the builder if it doesn't exists in the source settings.
|
||||
* The value added to the builder is taken from the given default settings object.
|
||||
* @param builder the settings builder to fill the diff into
|
||||
* @param source the source settings object to diff
|
||||
* @param defaultSettings the default settings object to diff against
|
||||
*/
|
||||
public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) {
|
||||
if (exists(source) == false) {
|
||||
builder.put(getKey(), getRaw(defaultSettings));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the raw (string) settings value. If the setting is not present in the given settings object the default value is returned
|
||||
* instead. This is useful if the value can't be parsed due to an invalid value to access the actual value.
|
||||
|
@ -649,6 +662,9 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
|
||||
public static <T> Setting<List<T>> listSetting(String key, Function<Settings, List<String>> defaultStringValue,
|
||||
Function<String, T> singleValueParser, Property... properties) {
|
||||
if (defaultStringValue.apply(Settings.EMPTY) == null) {
|
||||
throw new IllegalArgumentException("default value function must not return null");
|
||||
}
|
||||
Function<String, List<T>> parser = (s) ->
|
||||
parseableStringToList(s).stream().map(singleValueParser).collect(Collectors.toList());
|
||||
|
||||
|
@ -670,6 +686,18 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
boolean exists = super.exists(settings);
|
||||
return exists || settings.get(getKey() + ".0") != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) {
|
||||
if (exists(source) == false) {
|
||||
String[] asArray = defaultSettings.getAsArray(getKey(), null);
|
||||
if (asArray == null) {
|
||||
builder.putArray(getKey(), defaultStringValue.apply(defaultSettings));
|
||||
} else {
|
||||
builder.putArray(getKey(), asArray);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -747,6 +775,17 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) {
|
||||
Map<String, String> leftGroup = get(source).getAsMap();
|
||||
Settings defaultGroup = get(defaultSettings);
|
||||
for (Map.Entry<String, String> entry : defaultGroup.getAsMap().entrySet()) {
|
||||
if (leftGroup.containsKey(entry.getKey()) == false) {
|
||||
builder.put(getKey() + entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractScopedSettings.SettingUpdater<Settings> newUpdater(Consumer<Settings> consumer, Logger logger,
|
||||
Consumer<Settings> validator) {
|
||||
|
@ -856,14 +895,14 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
* storage.${backend}.enable=[true|false] can easily be added with this setting. Yet, adfix key settings don't support updaters
|
||||
* out of the box unless {@link #getConcreteSetting(String)} is used to pull the updater.
|
||||
*/
|
||||
public static <T> Setting<T> adfixKeySetting(String prefix, String suffix, Function<Settings, String> defaultValue,
|
||||
public static <T> Setting<T> affixKeySetting(String prefix, String suffix, Function<Settings, String> defaultValue,
|
||||
Function<String, T> parser, Property... properties) {
|
||||
return affixKeySetting(AffixKey.withAdfix(prefix, suffix), defaultValue, parser, properties);
|
||||
return affixKeySetting(AffixKey.withAffix(prefix, suffix), defaultValue, parser, properties);
|
||||
}
|
||||
|
||||
public static <T> Setting<T> adfixKeySetting(String prefix, String suffix, String defaultValue, Function<String, T> parser,
|
||||
public static <T> Setting<T> affixKeySetting(String prefix, String suffix, String defaultValue, Function<String, T> parser,
|
||||
Property... properties) {
|
||||
return adfixKeySetting(prefix, suffix, (s) -> defaultValue, parser, properties);
|
||||
return affixKeySetting(prefix, suffix, (s) -> defaultValue, parser, properties);
|
||||
}
|
||||
|
||||
public static <T> Setting<T> affixKeySetting(AffixKey key, Function<Settings, String> defaultValue, Function<String, T> parser,
|
||||
|
@ -888,6 +927,15 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) {
|
||||
for (Map.Entry<String, String> entry : defaultSettings.getAsMap().entrySet()) {
|
||||
if (match(entry.getKey())) {
|
||||
getConcreteSetting(entry.getKey()).diff(builder, source, defaultSettings);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -960,7 +1008,7 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
return new AffixKey(prefix, null);
|
||||
}
|
||||
|
||||
public static AffixKey withAdfix(String prefix, String suffix) {
|
||||
public static AffixKey withAffix(String prefix, String suffix) {
|
||||
return new AffixKey(prefix, suffix);
|
||||
}
|
||||
|
||||
|
@ -970,6 +1018,9 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
public AffixKey(String prefix, String suffix) {
|
||||
assert prefix != null || suffix != null: "Either prefix or suffix must be non-null";
|
||||
this.prefix = prefix;
|
||||
if (prefix.endsWith(".") == false) {
|
||||
throw new IllegalArgumentException("prefix must end with a '.'");
|
||||
}
|
||||
this.suffix = suffix;
|
||||
}
|
||||
|
||||
|
@ -1005,9 +1056,9 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
sb.append(prefix);
|
||||
}
|
||||
if (suffix != null) {
|
||||
sb.append("*");
|
||||
sb.append('*');
|
||||
sb.append('.');
|
||||
sb.append(suffix);
|
||||
sb.append(".");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
@ -859,7 +859,8 @@ final class DocumentParser {
|
|||
Mapper.BuilderContext builderContext = new Mapper.BuilderContext(context.indexSettings(), context.path());
|
||||
mapper = (ObjectMapper) builder.build(builderContext);
|
||||
if (mapper.nested() != ObjectMapper.Nested.NO) {
|
||||
throw new MapperParsingException("It is forbidden to create dynamic nested objects ([" + context.path().pathAsText(paths[i]) + "]) through `copy_to`");
|
||||
throw new MapperParsingException("It is forbidden to create dynamic nested objects ([" + context.path().pathAsText(paths[i])
|
||||
+ "]) through `copy_to` or dots in field names");
|
||||
}
|
||||
context.addDynamicMapper(mapper);
|
||||
break;
|
||||
|
@ -909,6 +910,11 @@ final class DocumentParser {
|
|||
return null;
|
||||
}
|
||||
objectMapper = (ObjectMapper)mapper;
|
||||
if (objectMapper.nested().isNested()) {
|
||||
throw new MapperParsingException("Cannot add a value for field ["
|
||||
+ fieldName + "] since one of the intermediate objects is mapped as a nested object: ["
|
||||
+ mapper.name() + "]");
|
||||
}
|
||||
}
|
||||
return objectMapper.getMapper(subfields[subfields.length - 1]);
|
||||
}
|
||||
|
|
|
@ -131,7 +131,10 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
|
|||
// to be unbounded and most instances may only aggregate few
|
||||
// documents, so use hashed based
|
||||
// global ordinals to keep the bucket ords dense.
|
||||
if (Aggregator.descendsFromBucketAggregator(parent)) {
|
||||
// Additionally, if using partitioned terms the regular global
|
||||
// ordinals would be sparse so we opt for hash
|
||||
if (Aggregator.descendsFromBucketAggregator(parent) ||
|
||||
(includeExclude != null && includeExclude.isPartitionBased())) {
|
||||
execution = ExecutionMode.GLOBAL_ORDINALS_HASH;
|
||||
} else {
|
||||
if (factories == AggregatorFactories.EMPTY) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.terms.support;
|
||||
|
||||
import com.carrotsearch.hppc.BitMixer;
|
||||
import com.carrotsearch.hppc.LongHashSet;
|
||||
import com.carrotsearch.hppc.LongSet;
|
||||
|
||||
|
@ -35,6 +36,7 @@ import org.apache.lucene.util.automaton.CompiledAutomaton;
|
|||
import org.apache.lucene.util.automaton.Operations;
|
||||
import org.apache.lucene.util.automaton.RegExp;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -46,6 +48,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.search.DocValueFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -61,15 +64,34 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
private static final ParseField INCLUDE_FIELD = new ParseField("include");
|
||||
private static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
|
||||
private static final ParseField PATTERN_FIELD = new ParseField("pattern");
|
||||
private static final ParseField PARTITION_FIELD = new ParseField("partition");
|
||||
private static final ParseField NUM_PARTITIONS_FIELD = new ParseField("num_partitions");
|
||||
|
||||
// The includeValue and excludeValue ByteRefs which are the result of the parsing
|
||||
// process are converted into a LongFilter when used on numeric fields
|
||||
// in the index.
|
||||
public static class LongFilter {
|
||||
public abstract static class LongFilter {
|
||||
public abstract boolean accept(long value);
|
||||
|
||||
}
|
||||
|
||||
public class PartitionedLongFilter extends LongFilter {
|
||||
private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
|
||||
|
||||
@Override
|
||||
public boolean accept(long value) {
|
||||
// hash the value to keep even distributions
|
||||
final long hashCode = BitMixer.mix64(value);
|
||||
return Math.floorMod(hashCode, incNumPartitions) == incZeroBasedPartition;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class SetBackedLongFilter extends LongFilter {
|
||||
private LongSet valids;
|
||||
private LongSet invalids;
|
||||
|
||||
private LongFilter(int numValids, int numInvalids) {
|
||||
private SetBackedLongFilter(int numValids, int numInvalids) {
|
||||
if (numValids > 0) {
|
||||
valids = new LongHashSet(numValids);
|
||||
}
|
||||
|
@ -96,6 +118,13 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
public abstract boolean accept(BytesRef value);
|
||||
}
|
||||
|
||||
class PartitionedStringFilter extends StringFilter {
|
||||
@Override
|
||||
public boolean accept(BytesRef value) {
|
||||
return Math.floorMod(value.hashCode(), incNumPartitions) == incZeroBasedPartition;
|
||||
}
|
||||
}
|
||||
|
||||
static class AutomatonBackedStringFilter extends StringFilter {
|
||||
|
||||
private final ByteRunAutomaton runAutomaton;
|
||||
|
@ -138,6 +167,25 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
|
||||
}
|
||||
|
||||
class PartitionedOrdinalsFilter extends OrdinalsFilter {
|
||||
|
||||
@Override
|
||||
public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws IOException {
|
||||
final long numOrds = globalOrdinals.getValueCount();
|
||||
final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds);
|
||||
final TermsEnum termEnum = globalOrdinals.termsEnum();
|
||||
|
||||
BytesRef term = termEnum.next();
|
||||
while (term != null) {
|
||||
if (Math.floorMod(term.hashCode(), incNumPartitions) == incZeroBasedPartition) {
|
||||
acceptedGlobalOrdinals.set(termEnum.ord());
|
||||
}
|
||||
term = termEnum.next();
|
||||
}
|
||||
return acceptedGlobalOrdinals;
|
||||
}
|
||||
}
|
||||
|
||||
static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter {
|
||||
|
||||
private final CompiledAutomaton compiled;
|
||||
|
@ -205,6 +253,8 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
|
||||
private final RegExp include, exclude;
|
||||
private final SortedSet<BytesRef> includeValues, excludeValues;
|
||||
private final int incZeroBasedPartition;
|
||||
private final int incNumPartitions;
|
||||
|
||||
/**
|
||||
* @param include The regular expression pattern for the terms to be included
|
||||
|
@ -218,6 +268,8 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
this.exclude = exclude;
|
||||
this.includeValues = null;
|
||||
this.excludeValues = null;
|
||||
this.incZeroBasedPartition = 0;
|
||||
this.incNumPartitions = 0;
|
||||
}
|
||||
|
||||
public IncludeExclude(String include, String exclude) {
|
||||
|
@ -234,6 +286,8 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
}
|
||||
this.include = null;
|
||||
this.exclude = null;
|
||||
this.incZeroBasedPartition = 0;
|
||||
this.incNumPartitions = 0;
|
||||
this.includeValues = includeValues;
|
||||
this.excludeValues = excludeValues;
|
||||
}
|
||||
|
@ -250,6 +304,21 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues));
|
||||
}
|
||||
|
||||
public IncludeExclude(int partition, int numPartitions) {
|
||||
if (partition < 0 || partition >= numPartitions) {
|
||||
throw new IllegalArgumentException("Partition must be >=0 and < numPartition which is "+numPartitions);
|
||||
}
|
||||
this.incZeroBasedPartition = partition;
|
||||
this.incNumPartitions = numPartitions;
|
||||
this.include = null;
|
||||
this.exclude = null;
|
||||
this.includeValues = null;
|
||||
this.excludeValues = null;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
|
@ -257,6 +326,8 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
if (in.readBoolean()) {
|
||||
includeValues = null;
|
||||
excludeValues = null;
|
||||
incZeroBasedPartition = 0;
|
||||
incNumPartitions = 0;
|
||||
String includeString = in.readOptionalString();
|
||||
include = includeString == null ? null : new RegExp(includeString);
|
||||
String excludeString = in.readOptionalString();
|
||||
|
@ -283,6 +354,13 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
} else {
|
||||
excludeValues = null;
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
|
||||
incNumPartitions = in.readVInt();
|
||||
incZeroBasedPartition = in.readVInt();
|
||||
} else {
|
||||
incNumPartitions = 0;
|
||||
incZeroBasedPartition = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -309,6 +387,10 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
out.writeBytesRef(value);
|
||||
}
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
|
||||
out.writeVInt(incNumPartitions);
|
||||
out.writeVInt(incZeroBasedPartition);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -436,11 +518,26 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
if (token == XContentParser.Token.START_OBJECT) {
|
||||
if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
|
||||
// This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0
|
||||
// Regexes should be "include":"foo.*"
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) {
|
||||
otherOptions.put(INCLUDE_FIELD, parser.text());
|
||||
} else {
|
||||
throw new ElasticsearchParseException(
|
||||
"Unknown string parameter in Include/Exclude clause: " + currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if (parseFieldMatcher.match(currentFieldName, NUM_PARTITIONS_FIELD)) {
|
||||
otherOptions.put(NUM_PARTITIONS_FIELD, parser.intValue());
|
||||
} else if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) {
|
||||
otherOptions.put(INCLUDE_FIELD, parser.intValue());
|
||||
} else {
|
||||
throw new ElasticsearchParseException(
|
||||
"Unknown numeric parameter in Include/Exclude clause: " + currentFieldName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -480,15 +577,43 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
public IncludeExclude createIncludeExclude(Map<ParseField, Object> otherOptions) {
|
||||
Object includeObject = otherOptions.get(INCLUDE_FIELD);
|
||||
String include = null;
|
||||
int partition = -1;
|
||||
int numPartitions = -1;
|
||||
SortedSet<BytesRef> includeValues = null;
|
||||
if (includeObject != null) {
|
||||
if (includeObject instanceof String) {
|
||||
include = (String) includeObject;
|
||||
} else if (includeObject instanceof SortedSet) {
|
||||
includeValues = (SortedSet<BytesRef>) includeObject;
|
||||
} else if (includeObject instanceof Integer) {
|
||||
partition = (Integer) includeObject;
|
||||
Object numPartitionsObject = otherOptions.get(NUM_PARTITIONS_FIELD);
|
||||
if (numPartitionsObject instanceof Integer) {
|
||||
numPartitions = (Integer) numPartitionsObject;
|
||||
if (numPartitions < 2) {
|
||||
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " must be >1");
|
||||
}
|
||||
if (partition < 0 || partition >= numPartitions) {
|
||||
throw new IllegalArgumentException(
|
||||
PARTITION_FIELD.getPreferredName() + " must be >=0 and <" + numPartitions);
|
||||
}
|
||||
} else {
|
||||
if (numPartitionsObject == null) {
|
||||
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " parameter is missing");
|
||||
}
|
||||
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " value must be an integer");
|
||||
}
|
||||
}
|
||||
}
|
||||
Object excludeObject = otherOptions.get(EXCLUDE_FIELD);
|
||||
if (numPartitions >0 ){
|
||||
if(excludeObject!=null){
|
||||
throw new IllegalArgumentException("Partitioned Include cannot be used in combination with excludes");
|
||||
}
|
||||
return new IncludeExclude(partition, numPartitions);
|
||||
}
|
||||
|
||||
|
||||
String exclude = null;
|
||||
SortedSet<BytesRef> excludeValues = null;
|
||||
if (excludeObject != null) {
|
||||
|
@ -517,6 +642,10 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
return include != null || exclude != null;
|
||||
}
|
||||
|
||||
public boolean isPartitionBased() {
|
||||
return incNumPartitions > 0;
|
||||
}
|
||||
|
||||
private Automaton toAutomaton() {
|
||||
Automaton a = null;
|
||||
if (include != null) {
|
||||
|
@ -538,6 +667,9 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
if (isRegexBased()) {
|
||||
return new AutomatonBackedStringFilter(toAutomaton());
|
||||
}
|
||||
if (isPartitionBased()){
|
||||
return new PartitionedStringFilter();
|
||||
}
|
||||
return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
|
||||
}
|
||||
|
||||
|
@ -559,13 +691,22 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
if (isRegexBased()) {
|
||||
return new AutomatonBackedOrdinalsFilter(toAutomaton());
|
||||
}
|
||||
if (isPartitionBased()){
|
||||
return new PartitionedOrdinalsFilter();
|
||||
}
|
||||
|
||||
return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
|
||||
}
|
||||
|
||||
public LongFilter convertToLongFilter(DocValueFormat format) {
|
||||
|
||||
if(isPartitionBased()){
|
||||
return new PartitionedLongFilter();
|
||||
}
|
||||
|
||||
int numValids = includeValues == null ? 0 : includeValues.size();
|
||||
int numInvalids = excludeValues == null ? 0 : excludeValues.size();
|
||||
LongFilter result = new LongFilter(numValids, numInvalids);
|
||||
SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
|
||||
if (includeValues != null) {
|
||||
for (BytesRef val : includeValues) {
|
||||
result.addAccept(format.parseLong(val.utf8ToString(), false, null));
|
||||
|
@ -580,9 +721,13 @@ public class IncludeExclude implements Writeable, ToXContent {
|
|||
}
|
||||
|
||||
public LongFilter convertToDoubleFilter() {
|
||||
if(isPartitionBased()){
|
||||
return new PartitionedLongFilter();
|
||||
}
|
||||
|
||||
int numValids = includeValues == null ? 0 : includeValues.size();
|
||||
int numInvalids = excludeValues == null ? 0 : excludeValues.size();
|
||||
LongFilter result = new LongFilter(numValids, numInvalids);
|
||||
SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
|
||||
if (includeValues != null) {
|
||||
for (BytesRef val : includeValues) {
|
||||
double dval = Double.parseDouble(val.utf8ToString());
|
||||
|
|
|
@ -91,7 +91,6 @@ public class ClusterSearchShardsResponseTests extends ESTestCase {
|
|||
ClusterSearchShardsGroup clusterSearchShardsGroup = clusterSearchShardsResponse.getGroups()[i];
|
||||
ClusterSearchShardsGroup deserializedGroup = deserialized.getGroups()[i];
|
||||
assertEquals(clusterSearchShardsGroup.getShardId(), deserializedGroup.getShardId());
|
||||
assertEquals(clusterSearchShardsGroup.getIndex(), deserializedGroup.getIndex());
|
||||
assertArrayEquals(clusterSearchShardsGroup.getShards(), deserializedGroup.getShards());
|
||||
}
|
||||
if (version.onOrAfter(Version.V_5_1_0_UNRELEASED)) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
|
|||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
|
@ -84,8 +86,9 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
};
|
||||
Map<String, DiscoveryNode> lookup = new HashMap<>();
|
||||
lookup.put(primaryNode.getId(), primaryNode);
|
||||
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
|
||||
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>(logger, transportService, lookup::get,
|
||||
Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) {
|
||||
aliasFilters, null, request, responseListener, shardsIter, 0, 0, null) {
|
||||
TestSearchResponse response = new TestSearchResponse();
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,11 +25,16 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
||||
public class AutoCreateIndexTests extends ESTestCase {
|
||||
|
@ -57,6 +62,24 @@ public class AutoCreateIndexTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testHandleSpaces() { // see #21449
|
||||
Settings settings = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(),
|
||||
randomFrom(".marvel-, .security, .watches, .triggered_watches, .watcher-history-",
|
||||
".marvel-,.security,.watches,.triggered_watches,.watcher-history-")).build();
|
||||
AutoCreateIndex autoCreateIndex = newAutoCreateIndex(settings);
|
||||
List<Tuple<String, Boolean>> expressions = autoCreateIndex.getAutoCreate().getExpressions();
|
||||
Map<String, Boolean> map = new HashMap<>();
|
||||
for (Tuple<String, Boolean> t : expressions) {
|
||||
map.put(t.v1(), t.v2());
|
||||
}
|
||||
assertTrue(map.get(".marvel-"));
|
||||
assertTrue(map.get(".security"));
|
||||
assertTrue(map.get(".watches"));
|
||||
assertTrue(map.get(".triggered_watches"));
|
||||
assertTrue(map.get(".watcher-history-"));
|
||||
assertEquals(5, map.size());
|
||||
}
|
||||
|
||||
public void testAutoCreationDisabled() {
|
||||
Settings settings = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), false).build();
|
||||
AutoCreateIndex autoCreateIndex = newAutoCreateIndex(settings);
|
||||
|
|
|
@ -56,16 +56,16 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
|
|||
ensureGreen();
|
||||
ClusterSearchShardsResponse response = client().admin().cluster().prepareSearchShards("test").execute().actionGet();
|
||||
assertThat(response.getGroups().length, equalTo(1));
|
||||
assertThat(response.getGroups()[0].getIndex(), equalTo("test"));
|
||||
assertThat(response.getGroups()[0].getShardId(), equalTo(0));
|
||||
assertThat(response.getGroups()[0].getShardId().getIndexName(), equalTo("test"));
|
||||
assertThat(response.getGroups()[0].getShardId().getId(), equalTo(0));
|
||||
assertThat(response.getGroups()[0].getShards().length, equalTo(1));
|
||||
assertThat(response.getNodes().length, equalTo(1));
|
||||
assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
|
||||
|
||||
response = client().admin().cluster().prepareSearchShards("test").setRouting("A").execute().actionGet();
|
||||
assertThat(response.getGroups().length, equalTo(1));
|
||||
assertThat(response.getGroups()[0].getIndex(), equalTo("test"));
|
||||
assertThat(response.getGroups()[0].getShardId(), equalTo(0));
|
||||
assertThat(response.getGroups()[0].getShardId().getIndexName(), equalTo("test"));
|
||||
assertThat(response.getGroups()[0].getShardId().getId(), equalTo(0));
|
||||
assertThat(response.getGroups()[0].getShards().length, equalTo(1));
|
||||
assertThat(response.getNodes().length, equalTo(1));
|
||||
assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
|
||||
|
@ -79,7 +79,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
|
|||
|
||||
ClusterSearchShardsResponse response = client().admin().cluster().prepareSearchShards("test").execute().actionGet();
|
||||
assertThat(response.getGroups().length, equalTo(4));
|
||||
assertThat(response.getGroups()[0].getIndex(), equalTo("test"));
|
||||
assertThat(response.getGroups()[0].getShardId().getIndexName(), equalTo("test"));
|
||||
assertThat(response.getNodes().length, equalTo(1));
|
||||
assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
|
||||
|
||||
|
@ -88,7 +88,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
|
|||
|
||||
response = client().admin().cluster().prepareSearchShards("test").setPreference("_shards:2").execute().actionGet();
|
||||
assertThat(response.getGroups().length, equalTo(1));
|
||||
assertThat(response.getGroups()[0].getShardId(), equalTo(2));
|
||||
assertThat(response.getGroups()[0].getShardId().getId(), equalTo(2));
|
||||
}
|
||||
|
||||
public void testMultipleIndicesAllocation() throws Exception {
|
||||
|
@ -109,10 +109,10 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
|
|||
boolean seenTest1 = false;
|
||||
boolean seenTest2 = false;
|
||||
for (ClusterSearchShardsGroup group : response.getGroups()) {
|
||||
if (group.getIndex().equals("test1")) {
|
||||
if (group.getShardId().getIndexName().equals("test1")) {
|
||||
seenTest1 = true;
|
||||
assertThat(group.getShards().length, equalTo(2));
|
||||
} else if (group.getIndex().equals("test2")) {
|
||||
} else if (group.getShardId().getIndexName().equals("test2")) {
|
||||
seenTest2 = true;
|
||||
assertThat(group.getShards().length, equalTo(2));
|
||||
} else {
|
||||
|
|
|
@ -213,20 +213,44 @@ public class ScopedSettingsTests extends ESTestCase {
|
|||
public void testDiff() throws IOException {
|
||||
Setting<Integer> fooBarBaz = Setting.intSetting("foo.bar.baz", 1, Property.NodeScope);
|
||||
Setting<Integer> fooBar = Setting.intSetting("foo.bar", 1, Property.Dynamic, Property.NodeScope);
|
||||
Setting<Settings> someGroup = Setting.groupSetting("some.group.", Property.Dynamic, Property.NodeScope);
|
||||
Setting<Boolean> someAffix = Setting.affixKeySetting("some.prefix.", "somekey", "true", Boolean::parseBoolean, Property.NodeScope);
|
||||
Setting<List<String>> foorBarQuux =
|
||||
Setting.listSetting("foo.bar.quux", Arrays.asList("a", "b", "c"), Function.identity(), Property.NodeScope);
|
||||
ClusterSettings settings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(fooBar, fooBarBaz, foorBarQuux)));
|
||||
ClusterSettings settings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(fooBar, fooBarBaz, foorBarQuux,
|
||||
someGroup, someAffix)));
|
||||
Settings diff = settings.diff(Settings.builder().put("foo.bar", 5).build(), Settings.EMPTY);
|
||||
assertThat(diff.getAsMap().size(), equalTo(2));
|
||||
assertEquals(4, diff.getAsMap().size()); // 4 since foo.bar.quux has 3 values essentially
|
||||
assertThat(diff.getAsInt("foo.bar.baz", null), equalTo(1));
|
||||
assertThat(diff.get("foo.bar.quux", null), equalTo("[\"a\",\"b\",\"c\"]"));
|
||||
assertArrayEquals(diff.getAsArray("foo.bar.quux", null), new String[] {"a", "b", "c"});
|
||||
|
||||
diff = settings.diff(
|
||||
Settings.builder().put("foo.bar", 5).build(),
|
||||
Settings.builder().put("foo.bar.baz", 17).put("foo.bar.quux", "d,e,f").build());
|
||||
assertThat(diff.getAsMap().size(), equalTo(2));
|
||||
Settings.builder().put("foo.bar.baz", 17).putArray("foo.bar.quux", "d", "e", "f").build());
|
||||
assertEquals(4, diff.getAsMap().size()); // 4 since foo.bar.quux has 3 values essentially
|
||||
assertThat(diff.getAsInt("foo.bar.baz", null), equalTo(17));
|
||||
assertThat(diff.get("foo.bar.quux", null), equalTo("[\"d\",\"e\",\"f\"]"));
|
||||
assertArrayEquals(diff.getAsArray("foo.bar.quux", null), new String[] {"d", "e", "f"});
|
||||
|
||||
diff = settings.diff(
|
||||
Settings.builder().put("some.group.foo", 5).build(),
|
||||
Settings.builder().put("some.group.foobar", 17, "some.group.foo", 25).build());
|
||||
assertEquals(6, diff.getAsMap().size()); // 6 since foo.bar.quux has 3 values essentially
|
||||
assertThat(diff.getAsInt("some.group.foobar", null), equalTo(17));
|
||||
assertNull(diff.get("some.group.foo"));
|
||||
assertArrayEquals(diff.getAsArray("foo.bar.quux", null), new String[] {"a", "b", "c"});
|
||||
assertThat(diff.getAsInt("foo.bar.baz", null), equalTo(1));
|
||||
assertThat(diff.getAsInt("foo.bar", null), equalTo(1));
|
||||
|
||||
diff = settings.diff(
|
||||
Settings.builder().put("some.prefix.foo.somekey", 5).build(),
|
||||
Settings.builder().put("some.prefix.foobar.somekey", 17,
|
||||
"some.prefix.foo.somekey", 18).build());
|
||||
assertEquals(6, diff.getAsMap().size()); // 6 since foo.bar.quux has 3 values essentially
|
||||
assertThat(diff.getAsInt("some.prefix.foobar.somekey", null), equalTo(17));
|
||||
assertNull(diff.get("some.prefix.foo.somekey"));
|
||||
assertArrayEquals(diff.getAsArray("foo.bar.quux", null), new String[] {"a", "b", "c"});
|
||||
assertThat(diff.getAsInt("foo.bar.baz", null), equalTo(1));
|
||||
assertThat(diff.getAsInt("foo.bar", null), equalTo(1));
|
||||
}
|
||||
|
||||
public void testUpdateTracer() {
|
||||
|
|
|
@ -442,9 +442,9 @@ public class SettingTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testAdfixKeySetting() {
|
||||
public void testAffixKeySetting() {
|
||||
Setting<Boolean> setting =
|
||||
Setting.adfixKeySetting("foo", "enable", "false", Boolean::parseBoolean, Property.NodeScope);
|
||||
Setting.affixKeySetting("foo.", "enable", "false", Boolean::parseBoolean, Property.NodeScope);
|
||||
assertTrue(setting.hasComplexMatcher());
|
||||
assertTrue(setting.match("foo.bar.enable"));
|
||||
assertTrue(setting.match("foo.baz.enable"));
|
||||
|
@ -456,12 +456,12 @@ public class SettingTests extends ESTestCase {
|
|||
assertTrue(concreteSetting.get(Settings.builder().put("foo.bar.enable", "true").build()));
|
||||
assertFalse(concreteSetting.get(Settings.builder().put("foo.baz.enable", "true").build()));
|
||||
|
||||
try {
|
||||
setting.getConcreteSetting("foo");
|
||||
fail();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("key [foo] must match [foo*enable.] but didn't.", ex.getMessage());
|
||||
}
|
||||
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> setting.getConcreteSetting("foo"));
|
||||
assertEquals("key [foo] must match [foo.*.enable] but didn't.", exc.getMessage());
|
||||
|
||||
exc = expectThrows(IllegalArgumentException.class, () -> Setting.affixKeySetting("foo", "enable", "false",
|
||||
Boolean::parseBoolean, Property.NodeScope));
|
||||
assertEquals("prefix must end with a '.'", exc.getMessage());
|
||||
}
|
||||
|
||||
public void testMinMaxInt() {
|
||||
|
|
|
@ -628,7 +628,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
|
||||
String oldMasterNode = internalCluster().getMasterName();
|
||||
// a very long GC, but it's OK as we remove the disruption when it has had an effect
|
||||
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(oldMasterNode, random(), 100, 200, 30000, 60000);
|
||||
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), oldMasterNode, 100, 200, 30000, 60000);
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
|
|
|
@ -120,6 +120,51 @@ public class DocumentParserTests extends ESSingleNodeTestCase {
|
|||
assertEquals("789", values[2]);
|
||||
}
|
||||
|
||||
public void testDotsWithExistingNestedMapper() throws Exception {
|
||||
DocumentMapperParser mapperParser = createIndex("test").mapperService().documentMapperParser();
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").startObject("properties")
|
||||
.startObject("foo").field("type", "nested").startObject("properties")
|
||||
.startObject("bar").field("type", "integer")
|
||||
.endObject().endObject().endObject().endObject().endObject().endObject().string();
|
||||
DocumentMapper mapper = mapperParser.parse("type", new CompressedXContent(mapping));
|
||||
|
||||
BytesReference bytes = XContentFactory.jsonBuilder()
|
||||
.startObject()
|
||||
.field("foo.bar", 123)
|
||||
.endObject().bytes();
|
||||
MapperParsingException e = expectThrows(MapperParsingException.class,
|
||||
() -> mapper.parse("test", "type", "1", bytes));
|
||||
assertEquals(
|
||||
"Cannot add a value for field [foo.bar] since one of the intermediate objects is mapped as a nested object: [foo]",
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
public void testDotsWithDynamicNestedMapper() throws Exception {
|
||||
DocumentMapperParser mapperParser = createIndex("test").mapperService().documentMapperParser();
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
.startArray("dynamic_templates")
|
||||
.startObject()
|
||||
.startObject("objects_as_nested")
|
||||
.field("match_mapping_type", "object")
|
||||
.startObject("mapping")
|
||||
.field("type", "nested")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endArray().endObject().endObject().string();
|
||||
DocumentMapper mapper = mapperParser.parse("type", new CompressedXContent(mapping));
|
||||
|
||||
BytesReference bytes = XContentFactory.jsonBuilder()
|
||||
.startObject()
|
||||
.field("foo.bar",42)
|
||||
.endObject().bytes();
|
||||
MapperParsingException e = expectThrows(MapperParsingException.class,
|
||||
() -> mapper.parse("test", "type", "1", bytes));
|
||||
assertEquals(
|
||||
"It is forbidden to create dynamic nested objects ([foo]) through `copy_to` or dots in field names",
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
public void testPropagateDynamicWithExistingMapper() throws Exception {
|
||||
DocumentMapperParser mapperParser = createIndex("test").mapperService().documentMapperParser();
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
|
|||
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
|
@ -48,10 +49,12 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -359,6 +362,43 @@ public class DoubleTermsIT extends AbstractTermsTestCase {
|
|||
assertThat(bucket.getDocCount(), equalTo(1L));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
|
||||
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
|
||||
}
|
||||
|
||||
public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
|
||||
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
|
||||
}
|
||||
|
||||
private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
|
||||
// Find total number of unique terms
|
||||
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
|
||||
.addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values())))
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(allResponse);
|
||||
Terms terms = allResponse.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
int expectedCardinality = terms.getBuckets().size();
|
||||
|
||||
// Gather terms using partitioned aggregations
|
||||
final int numPartitions = randomIntBetween(2, 4);
|
||||
Set<Number> foundTerms = new HashSet<>();
|
||||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field)
|
||||
.includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values())))
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
for (Bucket bucket : terms.getBuckets()) {
|
||||
assertTrue(foundTerms.add(bucket.getKeyAsNumber()));
|
||||
}
|
||||
}
|
||||
assertEquals(expectedCardinality, foundTerms.size());
|
||||
}
|
||||
|
||||
public void testSingleValueFieldOrderedByTermAsc() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx").setTypes("type")
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
|
|||
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
|
@ -47,10 +48,12 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -326,6 +329,48 @@ public class LongTermsIT extends AbstractTermsTestCase {
|
|||
assertThat(bucket.getDocCount(), equalTo(1L));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
|
||||
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
|
||||
}
|
||||
|
||||
public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
|
||||
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
|
||||
}
|
||||
|
||||
private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
|
||||
// Find total number of unique terms
|
||||
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
|
||||
.addAggregation(terms("terms").field(field).collectMode(randomFrom(SubAggCollectionMode.values()))).execute().actionGet();
|
||||
assertSearchResponse(allResponse);
|
||||
Terms terms = allResponse.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
int expectedCardinality = terms.getBuckets().size();
|
||||
|
||||
// Gather terms using partitioned aggregations
|
||||
final int numPartitions = randomIntBetween(2, 4);
|
||||
Set<Number> foundTerms = new HashSet<>();
|
||||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
SearchResponse response = client().prepareSearch("idx").setTypes("type")
|
||||
.addAggregation(
|
||||
terms("terms").field(field).includeExclude(new IncludeExclude(partition, numPartitions))
|
||||
.collectMode(randomFrom(SubAggCollectionMode.values())))
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
for (Bucket bucket : terms.getBuckets()) {
|
||||
assertFalse(foundTerms.contains(bucket.getKeyAsNumber()));
|
||||
foundTerms.add(bucket.getKeyAsNumber());
|
||||
}
|
||||
}
|
||||
assertEquals(expectedCardinality, foundTerms.size());
|
||||
}
|
||||
|
||||
|
||||
public void testSingleValueFieldWithMaxSize() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.automaton.RegExp;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
|
@ -37,6 +39,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
|
|||
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
|
||||
|
@ -54,10 +57,12 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -455,6 +460,44 @@ public class StringTermsIT extends AbstractTermsTestCase {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
|
||||
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
|
||||
}
|
||||
|
||||
public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
|
||||
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
|
||||
}
|
||||
|
||||
private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
|
||||
// Find total number of unique terms
|
||||
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
|
||||
.addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values())))
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(allResponse);
|
||||
Terms terms = allResponse.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
int expectedCardinality = terms.getBuckets().size();
|
||||
|
||||
// Gather terms using partitioned aggregations
|
||||
final int numPartitions = randomIntBetween(2, 4);
|
||||
Set<String> foundTerms = new HashSet<>();
|
||||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field)
|
||||
.includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values())))
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
for (Bucket bucket : terms.getBuckets()) {
|
||||
assertTrue(foundTerms.add(bucket.getKeyAsString()));
|
||||
}
|
||||
}
|
||||
assertEquals(expectedCardinality, foundTerms.size());
|
||||
}
|
||||
|
||||
|
||||
public void testSingleValueFieldWithMaxSize() throws Exception {
|
||||
SearchResponse response = client()
|
||||
|
|
|
@ -241,6 +241,9 @@ The following project appears to be abandoned:
|
|||
[[smalltalk]]
|
||||
== Smalltalk
|
||||
|
||||
* https://github.com/newapplesho/elasticsearch-smalltalk[elasticsearch-smalltalk] -
|
||||
Pharo Smalltalk client for Elasticsearch
|
||||
|
||||
* http://ss3.gemstone.com/ss/Elasticsearch.html[Elasticsearch] -
|
||||
Smalltalk client for Elasticsearch
|
||||
|
||||
|
|
|
@ -126,6 +126,24 @@ return the following exit codes:
|
|||
`74`:: IO error
|
||||
`70`:: any other error
|
||||
|
||||
[float]
|
||||
=== Batch mode
|
||||
|
||||
Certain plugins require more privileges than those provided by default in core
|
||||
Elasticsearch. These plugins will list the required privileges and ask the
|
||||
user for confirmation before continuing with installation.
|
||||
|
||||
When running the plugin install script from another program (e.g. install
|
||||
automation scripts), the plugin script should detect that it is not being
|
||||
called from the console and skip the confirmation response, automatically
|
||||
granting all requested permissions. If console detection fails, then batch
|
||||
mode can be forced by specifying `-b` or `--batch` as follows:
|
||||
|
||||
[source,shell]
|
||||
-----------------------------------
|
||||
sudo bin/elasticsearch-plugin install --batch [pluginname]
|
||||
-----------------------------------
|
||||
|
||||
[float]
|
||||
=== Custom config directory
|
||||
|
||||
|
|
|
@ -514,7 +514,10 @@ TIP: for indexed scripts replace the `file` parameter with an `id` parameter.
|
|||
==== Filtering Values
|
||||
|
||||
It is possible to filter the values for which buckets will be created. This can be done using the `include` and
|
||||
`exclude` parameters which are based on regular expression strings or arrays of exact values.
|
||||
`exclude` parameters which are based on regular expression strings or arrays of exact values. Additionally,
|
||||
`include` clauses can filter using `partition` expressions.
|
||||
|
||||
===== Filtering Values with regular expressions
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
|
@ -538,6 +541,8 @@ both are defined, the `exclude` has precedence, meaning, the `include` is evalua
|
|||
|
||||
The syntax is the same as <<regexp-syntax,regexp queries>>.
|
||||
|
||||
===== Filtering Values with exact values
|
||||
|
||||
For matching based on exact values the `include` and `exclude` parameters can simply take an array of
|
||||
strings that represent the terms as they are found in the index:
|
||||
|
||||
|
@ -561,6 +566,67 @@ strings that represent the terms as they are found in the index:
|
|||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Filtering Values with partitions
|
||||
|
||||
Sometimes there are too many unique terms to process in a single request/response pair so
|
||||
it can be useful to break the analysis up into multiple requests.
|
||||
This can be achieved by grouping the field's values into a number of partitions at query-time and processing
|
||||
only one partition in each request.
|
||||
Consider this request which is looking for accounts that have not logged any access recently:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"size": 0,
|
||||
"aggs": {
|
||||
"expired_sessions": {
|
||||
"terms": {
|
||||
"field": "account_id",
|
||||
"include": {
|
||||
"partition": 0,
|
||||
"num_partitions": 20
|
||||
},
|
||||
"size": 10000,
|
||||
"order": {
|
||||
"last_access": "asc"
|
||||
}
|
||||
},
|
||||
"aggs": {
|
||||
"last_access": {
|
||||
"max": {
|
||||
"field": "access_date"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
This request is finding the last logged access date for a subset of customer accounts because we
|
||||
might want to expire some customer accounts who haven't been seen for a long while.
|
||||
The `num_partitions` setting has requested that the unique account_ids are organized evenly into twenty
|
||||
partitions (0 to 19). and the `partition` setting in this request filters to only consider account_ids falling
|
||||
into partition 0. Subsequent requests should ask for partitions 1 then 2 etc to complete the expired-account analysis.
|
||||
|
||||
Note that the `size` setting for the number of results returned needs to be tuned with the `num_partitions`.
|
||||
For this particular account-expiration example the process for balancing values for `size` and `num_partitions` would be as follows:
|
||||
|
||||
1. Use the `cardinality` aggregation to estimate the total number of unique account_id values
|
||||
2. Pick a value for `num_partitions` to break the number from 1) up into more manageable chunks
|
||||
3. Pick a `size` value for the number of responses we want from each partition
|
||||
4. Run a test request
|
||||
|
||||
If we have a circuit-breaker error we are trying to do too much in one request and must increase `num_partitions`.
|
||||
If the request was successful but the last account ID in the date-sorted test response was still an account we might want to
|
||||
expire then we may be missing accounts of interest and have set our numbers too low. We must either
|
||||
|
||||
* increase the `size` parameter to return more results per partition (could be heavy on memory) or
|
||||
* increase the `num_partitions` to consider less accounts per request (could increase overall processing time as we need to make more requests)
|
||||
|
||||
Ultimately this is a balancing act between managing the elasticsearch resources required to process a single request and the volume
|
||||
of requests that the client application must issue to complete a task.
|
||||
|
||||
==== Multi-field terms aggregation
|
||||
|
||||
The `terms` aggregation does not support collecting terms from multiple fields
|
||||
|
|
|
@ -3,6 +3,10 @@
|
|||
|
||||
A `single-value` metrics aggregation that keeps track and returns the maximum value among the numeric values extracted from the aggregated documents. These values can be extracted either from specific numeric fields in the documents, or be generated by a provided script.
|
||||
|
||||
NOTE: The `min` and `max` aggregation operate on the `double` representation of
|
||||
the data. As a consequence, the result may be approximate when running on longs
|
||||
whose absolute value is greater than +2^53+.
|
||||
|
||||
Computing the max price value across all documents
|
||||
|
||||
[source,js]
|
||||
|
|
|
@ -3,6 +3,10 @@
|
|||
|
||||
A `single-value` metrics aggregation that keeps track and returns the minimum value among numeric values extracted from the aggregated documents. These values can be extracted either from specific numeric fields in the documents, or be generated by a provided script.
|
||||
|
||||
NOTE: The `min` and `max` aggregation operate on the `double` representation of
|
||||
the data. As a consequence, the result may be approximate when running on longs
|
||||
whose absolute value is greater than +2^53+.
|
||||
|
||||
Computing the min price value across all documents:
|
||||
|
||||
[source,js]
|
||||
|
|
|
@ -51,7 +51,7 @@ only the title field is being included in the source.
|
|||
}
|
||||
],
|
||||
"_source": {
|
||||
"include": [
|
||||
"includes": [
|
||||
"title"
|
||||
]
|
||||
},
|
||||
|
|
|
@ -15,8 +15,8 @@ combined to define new <<analysis-custom-analyzer,`custom`>> analyzers.
|
|||
|
||||
A _character filter_ receives the original text as a stream of characters and
|
||||
can transform the stream by adding, removing, or changing characters. For
|
||||
instance, a character filter could be used to convert Arabic numerals
|
||||
(٠١٢٣٤٥٦٧٨٩) into their Latin equivalents (0123456789), or to strip HTML
|
||||
instance, a character filter could be used to convert Hindu-Arabic numerals
|
||||
(٠١٢٣٤٥٦٧٨٩) into their Arabic-Latin equivalents (0123456789), or to strip HTML
|
||||
elements like `<b>` from the stream.
|
||||
|
||||
An analyzer may have *zero or more* <<analysis-charfilters,character filters>>,
|
||||
|
|
|
@ -53,9 +53,9 @@ version number, documents with version equal to zero cannot be updated using
|
|||
All update and query failures cause the `_update_by_query` to abort and are
|
||||
returned in the `failures` of the response. The updates that have been
|
||||
performed still stick. In other words, the process is not rolled back, only
|
||||
aborted. While the first failure causes the abort all failures that are
|
||||
returned by the failing bulk request are returned in the `failures` element so
|
||||
it's possible for there to be quite a few.
|
||||
aborted. While the first failure causes the abort, all failures that are
|
||||
returned by the failing bulk request are returned in the `failures` element; therefore
|
||||
it's possible for there to be quite a few failed entities.
|
||||
|
||||
If you want to simply count version conflicts not cause the `_update_by_query`
|
||||
to abort you can set `conflicts=proceed` on the url or `"conflicts": "proceed"`
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[[indices-put-mapping]]
|
||||
== Put Mapping
|
||||
|
||||
The PUT mapping API allows you to provide type mappings while creating a new index, add a new type to an existing index, or add new
|
||||
The PUT mapping API allows you to add a new type to an existing index, or add new
|
||||
fields to an existing type:
|
||||
|
||||
[source,js]
|
||||
|
@ -62,6 +62,11 @@ PUT /{index}/_mapping/{type}
|
|||
* `{body}` contains the mapping changes that should be applied.
|
||||
|
||||
|
||||
NOTE: When updating the `_default_` mapping with the
|
||||
<<indices-put-mapping,PUT mapping>> API, the new mapping is not merged with
|
||||
the existing mapping. Instead, the new `_default_` mapping replaces the
|
||||
existing one.
|
||||
|
||||
[[updating-field-mappings]]
|
||||
[float]
|
||||
=== Updating field mappings
|
||||
|
|
|
@ -32,6 +32,11 @@ PUT my_index
|
|||
<2> The `user` type inherits the settings from `_default_`.
|
||||
<3> The `blogpost` type overrides the defaults and enables the <<mapping-all-field,`_all`>> field.
|
||||
|
||||
NOTE: When updating the `_default_` mapping with the
|
||||
<<indices-put-mapping,PUT mapping>> API, the new mapping is not merged with
|
||||
the existing mapping. Instead, the new `_default_` mapping replaces the
|
||||
existing one.
|
||||
|
||||
While the `_default_` mapping can be updated after an index has been created,
|
||||
the new defaults will only affect mapping types that are created afterwards.
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@ cluster.routing.allocation.awareness.attributes: zone
|
|||
|
||||
Now, if we start 2 nodes with `node.attr.zone` set to `zone1` and create an index
|
||||
with 5 shards and 1 replica. The index will be created, but only the 5 primary
|
||||
shards will be allocated (with no replicas). Only when we start more shards
|
||||
shards will be allocated (with no replicas). Only when we start more nodes
|
||||
with `node.attr.zone` set to `zone2` will the replicas be allocated.
|
||||
|
||||
The `cluster.routing.allocation.awareness.*` settings can all be updated
|
||||
|
|
|
@ -341,6 +341,8 @@ This will yield the following response.
|
|||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took": 7,/"took": "$body.took",/]
|
||||
|
||||
<1> The terms from each query have been highlighted in the document.
|
||||
|
||||
Instead of the query in the search request highlighting the percolator hits, the percolator queries are highlighting
|
||||
the document defined in the `percolate` query.
|
||||
|
||||
|
|
|
@ -213,7 +213,7 @@ then Elasticsearch will handle it as if there was a mapping of type
|
|||
[[geo-sorting]]
|
||||
==== Geo Distance Sorting
|
||||
|
||||
Allow to sort by `_geo_distance`. Here is an example:
|
||||
Allow to sort by `_geo_distance`. Here is an example, assuming `pin.location` is a field of type `geo_point`:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
|
@ -243,7 +243,7 @@ GET /_search
|
|||
|
||||
How to compute the distance. Can either be `sloppy_arc` (default), `arc` (slightly more precise but significantly slower) or `plane` (faster, but inaccurate on long distances and close to the poles).
|
||||
|
||||
`sort_mode`::
|
||||
`mode`::
|
||||
|
||||
What to do in case a field has several geo points. By default, the shortest
|
||||
distance is taken into account when sorting in ascending order and the
|
||||
|
|
|
@ -40,7 +40,7 @@ GET /_search
|
|||
|
||||
If the requested fields are not stored (`store` mapping set to `false`), they will be ignored.
|
||||
|
||||
Field values fetched from the document it self are always returned as an array. Metadata fields like `_routing` and
|
||||
Stored field values fetched from the document itself are always returned as an array. On the contrary, metadata fields like `_routing` and
|
||||
`_parent` fields are never returned as an array.
|
||||
|
||||
Also only leaf fields can be returned via the `field` option. So object fields can't be returned and such requests
|
||||
|
|
|
@ -129,6 +129,7 @@ public class EqualsTests extends ScriptTestCase {
|
|||
assertEquals(0, exec("def a = 1; Object b = new HashMap(); if (a === (Object)b) return 1; else return 0;"));
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/21801")
|
||||
public void testBranchEqualsDefAndPrimitive() {
|
||||
assertEquals(true, exec("def x = 1000; int y = 1000; return x == y;"));
|
||||
assertEquals(false, exec("def x = 1000; int y = 1000; return x === y;"));
|
||||
|
@ -146,6 +147,7 @@ public class EqualsTests extends ScriptTestCase {
|
|||
assertEquals(1, exec("def a = 1; Object b = new HashMap(); if (a !== (Object)b) return 1; else return 0;"));
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/21801")
|
||||
public void testBranchNotEqualsDefAndPrimitive() {
|
||||
assertEquals(false, exec("def x = 1000; int y = 1000; return x != y;"));
|
||||
assertEquals(true, exec("def x = 1000; int y = 1000; return x !== y;"));
|
||||
|
|
|
@ -32,12 +32,11 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Netty4Utils {
|
||||
|
||||
|
@ -122,7 +121,13 @@ public class Netty4Utils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void maybeDie(final Throwable cause) throws IOException {
|
||||
/**
|
||||
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
|
||||
* caught and bubbles up to the uncaught exception handler.
|
||||
*
|
||||
* @param cause the throwable to test
|
||||
*/
|
||||
public static void maybeDie(final Throwable cause) {
|
||||
if (cause instanceof Error) {
|
||||
/*
|
||||
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many
|
||||
|
@ -131,20 +136,17 @@ public class Netty4Utils {
|
|||
* the exception so as to not lose the original cause during exit, so we give the thread a name based on the previous stack
|
||||
* frame so that at least we know where it came from (in case logging the current stack trace fails).
|
||||
*/
|
||||
try (
|
||||
final StringWriter sw = new StringWriter();
|
||||
final PrintWriter pw = new PrintWriter(sw)) {
|
||||
try {
|
||||
// try to log the current stack trace
|
||||
Arrays.stream(Thread.currentThread().getStackTrace()).skip(1).map(e -> "\tat " + e).forEach(pw::println);
|
||||
ESLoggerFactory.getLogger(Netty4Utils.class).error("fatal error on the network layer\n{}", sw.toString());
|
||||
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
|
||||
final String formatted = Arrays.stream(stackTrace).skip(1).map(e -> "\tat " + e).collect(Collectors.joining("\n"));
|
||||
ESLoggerFactory.getLogger(Netty4Utils.class).error("fatal error on the network layer\n{}", formatted);
|
||||
} finally {
|
||||
final StackTraceElement previous = Thread.currentThread().getStackTrace()[2];
|
||||
new Thread(
|
||||
() -> {
|
||||
throw (Error) cause;
|
||||
},
|
||||
previous.getClassName() + "#" + previous.getMethodName())
|
||||
.start();
|
||||
() -> {
|
||||
throw (Error) cause;
|
||||
})
|
||||
.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,12 +34,7 @@ import java.util.Map;
|
|||
import java.util.function.Function;
|
||||
|
||||
public final class AzureStorageSettings {
|
||||
private static final String TIMEOUT_SUFFIX = "timeout";
|
||||
private static final String ACCOUNT_SUFFIX = "account";
|
||||
private static final String KEY_SUFFIX = "key";
|
||||
private static final String DEFAULT_SUFFIX = "default";
|
||||
|
||||
private static final Setting.AffixKey TIMEOUT_KEY = Setting.AffixKey.withAdfix(Storage.PREFIX, TIMEOUT_SUFFIX);
|
||||
private static final Setting.AffixKey TIMEOUT_KEY = Setting.AffixKey.withAffix(Storage.PREFIX, "timeout");
|
||||
|
||||
private static final Setting<TimeValue> TIMEOUT_SETTING = Setting.affixKeySetting(
|
||||
TIMEOUT_KEY,
|
||||
|
@ -47,11 +42,11 @@ public final class AzureStorageSettings {
|
|||
(s) -> Setting.parseTimeValue(s, TimeValue.timeValueSeconds(-1), TIMEOUT_KEY.toString()),
|
||||
Setting.Property.NodeScope);
|
||||
private static final Setting<String> ACCOUNT_SETTING =
|
||||
Setting.adfixKeySetting(Storage.PREFIX, ACCOUNT_SUFFIX, "", Function.identity(), Setting.Property.NodeScope);
|
||||
Setting.affixKeySetting(Storage.PREFIX, "account", "", Function.identity(), Setting.Property.NodeScope);
|
||||
private static final Setting<String> KEY_SETTING =
|
||||
Setting.adfixKeySetting(Storage.PREFIX, KEY_SUFFIX, "", Function.identity(), Setting.Property.NodeScope);
|
||||
Setting.affixKeySetting(Storage.PREFIX, "key", "", Function.identity(), Setting.Property.NodeScope);
|
||||
private static final Setting<Boolean> DEFAULT_SETTING =
|
||||
Setting.adfixKeySetting(Storage.PREFIX, DEFAULT_SUFFIX, "false", Boolean::valueOf, Setting.Property.NodeScope);
|
||||
Setting.affixKeySetting(Storage.PREFIX, "default", "false", Boolean::valueOf, Setting.Property.NodeScope);
|
||||
|
||||
|
||||
private final String name;
|
||||
|
|
|
@ -61,3 +61,16 @@
|
|||
|
||||
- match: {persistent: {}}
|
||||
|
||||
---
|
||||
"Test get a default settings":
|
||||
|
||||
- skip:
|
||||
version: " - 5.99.99" # this can't be bumped to 5.0.2 until snapshots are published
|
||||
reason: Fetching default group setting was buggy until 5.0.3
|
||||
|
||||
- do:
|
||||
cluster.get_settings:
|
||||
include_defaults: true
|
||||
|
||||
- match: {defaults.node.attr.testattr: "test"}
|
||||
|
||||
|
|
|
@ -1,24 +1,3 @@
|
|||
---
|
||||
setup:
|
||||
# Disable the disk threshold decider so the test doesn't fail if the disk is
|
||||
# > 85% full
|
||||
- do:
|
||||
cluster.put_settings:
|
||||
body:
|
||||
persistent:
|
||||
cluster.routing.allocation.disk.threshold_enabled: false
|
||||
flat_settings: true
|
||||
|
||||
---
|
||||
teardown:
|
||||
# Reset the disk threshold decider so we don't leave anything behind.
|
||||
- do:
|
||||
cluster.put_settings:
|
||||
body:
|
||||
persistent:
|
||||
cluster.routing.allocation.disk.threshold_enabled: null
|
||||
flat_settings: true
|
||||
|
||||
---
|
||||
"Shrink index via API":
|
||||
# creates an index with one document solely allocated on the master node
|
||||
|
|
|
@ -31,26 +31,38 @@
|
|||
field:
|
||||
type: text
|
||||
aliases:
|
||||
test_alias_1: {}
|
||||
test_alias_2:
|
||||
test_alias_no_filter: {}
|
||||
test_alias_filter_1:
|
||||
filter:
|
||||
term:
|
||||
field : value
|
||||
field : value1
|
||||
test_alias_filter_2:
|
||||
filter:
|
||||
term:
|
||||
field : value2
|
||||
|
||||
- do:
|
||||
search_shards:
|
||||
index: test_alias_1
|
||||
index: test_alias_no_filter
|
||||
|
||||
- length: { shards: 1 }
|
||||
- match: { shards.0.0.index: test_index }
|
||||
- is_true: indices.test_index
|
||||
- is_false: indices.test_index.filter
|
||||
- is_false: indices.test_index.aliases
|
||||
|
||||
- do:
|
||||
search_shards:
|
||||
index: test_alias_2
|
||||
index: test_alias_filter_1
|
||||
|
||||
- length: { shards: 1 }
|
||||
- match: { shards.0.0.index: test_index }
|
||||
- match: { indices.test_index: {filter: { term : { field: { value: value, boost: 1.0}}}}}
|
||||
- match: { indices.test_index: {aliases: [test_alias_filter_1], filter: { term : { field: { value: value1, boost: 1.0}}}}}
|
||||
|
||||
- do:
|
||||
search_shards:
|
||||
index: ["test_alias_filter_1","test_alias_filter_2"]
|
||||
|
||||
- length: { shards: 1 }
|
||||
- match: { shards.0.0.index: test_index }
|
||||
- match: { indices.test_index: {aliases: [test_alias_filter_1, test_alias_filter_2], filter: { bool: { should : [{ term : { field: { value: value1, boost: 1.0}}}, { term : { field: { value: value2, boost: 1.0}}}], adjust_pure_negative: true, boost: 1.0, disable_coord: false }}}}
|
||||
|
|
|
@ -74,9 +74,14 @@ public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
|
|||
int sliceLength = Math.max(0, length - sliceOffset - 1);
|
||||
BytesReference slice = pbr.slice(sliceOffset, sliceLength);
|
||||
assertEquals(sliceLength, slice.length());
|
||||
for (int i = 0; i < sliceLength; i++) {
|
||||
assertEquals(pbr.get(i+sliceOffset), slice.get(i));
|
||||
}
|
||||
BytesRef singlePageOrNull = getSinglePageOrNull(slice);
|
||||
if (singlePageOrNull != null) {
|
||||
assertEquals(sliceOffset, singlePageOrNull.offset);
|
||||
// we can't assert the offset since if the length is smaller than the refercence
|
||||
// the offset can be anywhere
|
||||
assertEquals(sliceLength, singlePageOrNull.length);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,19 +39,6 @@ public class IntermittentLongGCDisruption extends LongGCDisruption {
|
|||
final long delayDurationMax;
|
||||
|
||||
|
||||
public IntermittentLongGCDisruption(Random random) {
|
||||
this(null, random);
|
||||
}
|
||||
|
||||
public IntermittentLongGCDisruption(String disruptedNode, Random random) {
|
||||
this(disruptedNode, random, 100, 200, 300, 20000);
|
||||
}
|
||||
|
||||
public IntermittentLongGCDisruption(String disruptedNode, Random random, long intervalBetweenDelaysMin,
|
||||
long intervalBetweenDelaysMax, long delayDurationMin, long delayDurationMax) {
|
||||
this(random, disruptedNode, intervalBetweenDelaysMin, intervalBetweenDelaysMax, delayDurationMin, delayDurationMax);
|
||||
}
|
||||
|
||||
public IntermittentLongGCDisruption(Random random, String disruptedNode, long intervalBetweenDelaysMin, long intervalBetweenDelaysMax,
|
||||
long delayDurationMin, long delayDurationMax) {
|
||||
super(random, disruptedNode);
|
||||
|
@ -88,19 +75,15 @@ public class IntermittentLongGCDisruption extends LongGCDisruption {
|
|||
}
|
||||
|
||||
private void simulateLongGC(final TimeValue duration) throws InterruptedException {
|
||||
final String disruptionNodeCopy = disruptedNode;
|
||||
if (disruptionNodeCopy == null) {
|
||||
return;
|
||||
}
|
||||
logger.info("node [{}] goes into GC for for [{}]", disruptionNodeCopy, duration);
|
||||
logger.info("node [{}] goes into GC for for [{}]", disruptedNode, duration);
|
||||
final Set<Thread> nodeThreads = new HashSet<>();
|
||||
try {
|
||||
while (stopNodeThreads(disruptionNodeCopy, nodeThreads)) ;
|
||||
while (stopNodeThreads(nodeThreads)) ;
|
||||
if (!nodeThreads.isEmpty()) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
} finally {
|
||||
logger.info("node [{}] resumes from GC", disruptionNodeCopy);
|
||||
logger.info("node [{}] resumes from GC", disruptedNode);
|
||||
resumeThreads(nodeThreads);
|
||||
}
|
||||
}
|
||||
|
@ -109,13 +92,13 @@ public class IntermittentLongGCDisruption extends LongGCDisruption {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
while (disrupting && disruptedNode != null) {
|
||||
while (disrupting) {
|
||||
try {
|
||||
TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin)));
|
||||
simulateLongGC(duration);
|
||||
|
||||
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
|
||||
if (disrupting && disruptedNode != null) {
|
||||
if (disrupting) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -19,11 +19,15 @@
|
|||
|
||||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
@ -41,11 +45,16 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
// logging has shared JVM locks - we may suspend a thread and block other nodes from doing their thing
|
||||
Pattern.compile("logging\\.log4j"),
|
||||
// security manager is shared across all nodes AND it uses synced hashmaps interanlly
|
||||
Pattern.compile("java\\.lang\\.SecurityManager")
|
||||
Pattern.compile("java\\.lang\\.SecurityManager"),
|
||||
// SecureRandom instance from SecureRandomHolder class is shared by all nodes
|
||||
Pattern.compile("java\\.security\\.SecureRandom")
|
||||
};
|
||||
|
||||
private static final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
|
||||
|
||||
protected final String disruptedNode;
|
||||
private Set<Thread> suspendedThreads;
|
||||
private Thread blockDetectionThread;
|
||||
|
||||
public LongGCDisruption(Random random, String disruptedNode) {
|
||||
super(random);
|
||||
|
@ -60,7 +69,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
suspendedThreads = ConcurrentHashMap.newKeySet();
|
||||
|
||||
final String currentThreadName = Thread.currentThread().getName();
|
||||
assert currentThreadName.contains("[" + disruptedNode + "]") == false :
|
||||
assert isDisruptedNodeThread(currentThreadName) == false :
|
||||
"current thread match pattern. thread name: " + currentThreadName + ", node: " + disruptedNode;
|
||||
// we spawn a background thread to protect against deadlock which can happen
|
||||
// if there are shared resources between caller thread and and suspended threads
|
||||
|
@ -75,7 +84,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
// keep trying to stop threads, until no new threads are discovered.
|
||||
while (stopNodeThreads(disruptedNode, suspendedThreads)) {
|
||||
while (stopNodeThreads(suspendedThreads)) {
|
||||
if (Thread.interrupted()) {
|
||||
return;
|
||||
}
|
||||
|
@ -95,13 +104,52 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
}
|
||||
if (stoppingThread.isAlive()) {
|
||||
logger.warn("failed to stop node [{}]'s threads within [{}] millis. Stopping thread stack trace:\n {}"
|
||||
, disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread));
|
||||
, disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread.getStackTrace()));
|
||||
stoppingThread.interrupt(); // best effort;
|
||||
throw new RuntimeException("stopping node threads took too long");
|
||||
}
|
||||
// block detection checks if other threads are blocked waiting on an object that is held by one
|
||||
// of the threads that was suspended
|
||||
if (isBlockDetectionSupported()) {
|
||||
blockDetectionThread = new Thread(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof InterruptedException == false) {
|
||||
throw new AssertionError("unexpected exception in blockDetectionThread", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
while (Thread.currentThread().isInterrupted() == false) {
|
||||
ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true, true);
|
||||
for (ThreadInfo threadInfo : threadInfos) {
|
||||
if (isDisruptedNodeThread(threadInfo.getThreadName()) == false &&
|
||||
threadInfo.getLockOwnerName() != null &&
|
||||
isDisruptedNodeThread(threadInfo.getLockOwnerName())) {
|
||||
|
||||
// find ThreadInfo object of the blocking thread (if available)
|
||||
ThreadInfo blockingThreadInfo = null;
|
||||
for (ThreadInfo otherThreadInfo : threadInfos) {
|
||||
if (otherThreadInfo.getThreadId() == threadInfo.getLockOwnerId()) {
|
||||
blockingThreadInfo = otherThreadInfo;
|
||||
break;
|
||||
}
|
||||
}
|
||||
onBlockDetected(threadInfo, blockingThreadInfo);
|
||||
}
|
||||
}
|
||||
Thread.sleep(getBlockDetectionIntervalInMillis());
|
||||
}
|
||||
}
|
||||
});
|
||||
blockDetectionThread.setName(currentThreadName + "[LongGCDisruption][blockDetection]");
|
||||
blockDetectionThread.start();
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
stopBlockDetection();
|
||||
// resume threads if failed
|
||||
resumeThreads(suspendedThreads);
|
||||
suspendedThreads = null;
|
||||
|
@ -112,18 +160,35 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
}
|
||||
}
|
||||
|
||||
private String stackTrace(Thread thread) {
|
||||
return Arrays.stream(thread.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n"));
|
||||
public boolean isDisruptedNodeThread(String threadName) {
|
||||
return threadName.contains("[" + disruptedNode + "]");
|
||||
}
|
||||
|
||||
private String stackTrace(StackTraceElement[] stackTraceElements) {
|
||||
return Arrays.stream(stackTraceElements).map(Object::toString).collect(Collectors.joining("\n"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stopDisrupting() {
|
||||
stopBlockDetection();
|
||||
if (suspendedThreads != null) {
|
||||
resumeThreads(suspendedThreads);
|
||||
suspendedThreads = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void stopBlockDetection() {
|
||||
if (blockDetectionThread != null) {
|
||||
try {
|
||||
blockDetectionThread.interrupt(); // best effort
|
||||
blockDetectionThread.join(getStoppingTimeoutInMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
blockDetectionThread = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
|
||||
removeFromCluster(cluster);
|
||||
|
@ -144,7 +209,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
*/
|
||||
@SuppressWarnings("deprecation") // stops/resumes threads intentionally
|
||||
@SuppressForbidden(reason = "stops/resumes threads intentionally")
|
||||
protected boolean stopNodeThreads(String node, Set<Thread> nodeThreads) {
|
||||
protected boolean stopNodeThreads(Set<Thread> nodeThreads) {
|
||||
Thread[] allThreads = null;
|
||||
while (allThreads == null) {
|
||||
allThreads = new Thread[Thread.activeCount()];
|
||||
|
@ -154,16 +219,15 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
}
|
||||
}
|
||||
boolean liveThreadsFound = false;
|
||||
final String nodeThreadNamePart = "[" + node + "]";
|
||||
for (Thread thread : allThreads) {
|
||||
if (thread == null) {
|
||||
continue;
|
||||
}
|
||||
String name = thread.getName();
|
||||
if (name.contains(nodeThreadNamePart)) {
|
||||
String threadName = thread.getName();
|
||||
if (isDisruptedNodeThread(threadName)) {
|
||||
if (thread.isAlive() && nodeThreads.add(thread)) {
|
||||
liveThreadsFound = true;
|
||||
logger.trace("stopping thread [{}]", name);
|
||||
logger.trace("stopping thread [{}]", threadName);
|
||||
thread.suspend();
|
||||
// double check the thread is not in a shared resource like logging. If so, let it go and come back..
|
||||
boolean safe = true;
|
||||
|
@ -178,7 +242,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
}
|
||||
}
|
||||
if (!safe) {
|
||||
logger.trace("resuming thread [{}] as it is in a critical section", name);
|
||||
logger.trace("resuming thread [{}] as it is in a critical section", threadName);
|
||||
thread.resume();
|
||||
nodeThreads.remove(thread);
|
||||
}
|
||||
|
@ -198,6 +262,28 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
return TimeValue.timeValueSeconds(30).getMillis();
|
||||
}
|
||||
|
||||
public boolean isBlockDetectionSupported() {
|
||||
return threadBean.isObjectMonitorUsageSupported() && threadBean.isSynchronizerUsageSupported();
|
||||
}
|
||||
|
||||
// for testing
|
||||
protected long getBlockDetectionIntervalInMillis() {
|
||||
return 3000L;
|
||||
}
|
||||
|
||||
// for testing
|
||||
protected void onBlockDetected(ThreadInfo blockedThread, @Nullable ThreadInfo blockingThread) {
|
||||
String blockedThreadStackTrace = stackTrace(blockedThread.getStackTrace());
|
||||
String blockingThreadStackTrace = blockingThread != null ?
|
||||
stackTrace(blockingThread.getStackTrace()) : "not available";
|
||||
throw new AssertionError("Thread [" + blockedThread.getThreadName() + "] is blocked waiting on the resource [" +
|
||||
blockedThread.getLockInfo() + "] held by the suspended thread [" + blockedThread.getLockOwnerName() +
|
||||
"] of the disrupted node [" + disruptedNode + "].\n" +
|
||||
"Please add this occurrence to the unsafeClasses list in [" + LongGCDisruption.class.getName() + "].\n" +
|
||||
"Stack trace of blocked thread: " + blockedThreadStackTrace + "\n" +
|
||||
"Stack trace of blocking thread: " + blockingThreadStackTrace);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // stops/resumes threads intentionally
|
||||
@SuppressForbidden(reason = "stops/resumes threads intentionally")
|
||||
protected void resumeThreads(Set<Thread> threads) {
|
||||
|
|
|
@ -18,11 +18,15 @@
|
|||
*/
|
||||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -148,4 +152,94 @@ public class LongGCDisruptionTest extends ESTestCase {
|
|||
stop.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
public void testBlockDetection() throws Exception {
|
||||
final String disruptedNodeName = "disrupted_node";
|
||||
final String blockedNodeName = "blocked_node";
|
||||
CountDownLatch waitForBlockDetectionResult = new CountDownLatch(1);
|
||||
AtomicReference<ThreadInfo> blockDetectionResult = new AtomicReference<>();
|
||||
LongGCDisruption disruption = new LongGCDisruption(random(), disruptedNodeName) {
|
||||
@Override
|
||||
protected Pattern[] getUnsafeClasses() {
|
||||
return new Pattern[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onBlockDetected(ThreadInfo blockedThread, @Nullable ThreadInfo blockingThread) {
|
||||
blockDetectionResult.set(blockedThread);
|
||||
waitForBlockDetectionResult.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getBlockDetectionIntervalInMillis() {
|
||||
return 10L;
|
||||
}
|
||||
};
|
||||
if (disruption.isBlockDetectionSupported() == false) {
|
||||
return;
|
||||
}
|
||||
final AtomicBoolean stop = new AtomicBoolean();
|
||||
final CountDownLatch underLock = new CountDownLatch(1);
|
||||
final CountDownLatch pauseUnderLock = new CountDownLatch(1);
|
||||
final LockedExecutor lockedExecutor = new LockedExecutor();
|
||||
final AtomicLong ops = new AtomicLong();
|
||||
try {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// at least one locked and one none lock thread
|
||||
final boolean lockedExec = (i < 4 && randomBoolean()) || i == 0;
|
||||
Thread thread = new Thread(() -> {
|
||||
while (stop.get() == false) {
|
||||
if (lockedExec) {
|
||||
lockedExecutor.executeLocked(() -> {
|
||||
try {
|
||||
underLock.countDown();
|
||||
ops.incrementAndGet();
|
||||
pauseUnderLock.await();
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
});
|
||||
} else {
|
||||
ops.incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
thread.setName("[" + disruptedNodeName + "][" + i + "]");
|
||||
thread.start();
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// at least one locked and one none lock thread
|
||||
final boolean lockedExec = (i < 4 && randomBoolean()) || i == 0;
|
||||
Thread thread = new Thread(() -> {
|
||||
while (stop.get() == false) {
|
||||
if (lockedExec) {
|
||||
lockedExecutor.executeLocked(() -> {
|
||||
ops.incrementAndGet();
|
||||
});
|
||||
} else {
|
||||
ops.incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
thread.setName("[" + blockedNodeName + "][" + i + "]");
|
||||
thread.start();
|
||||
}
|
||||
// make sure some threads of test_node are under lock
|
||||
underLock.await();
|
||||
disruption.startDisrupting();
|
||||
waitForBlockDetectionResult.await(30, TimeUnit.SECONDS);
|
||||
disruption.stopDisrupting();
|
||||
|
||||
ThreadInfo threadInfo = blockDetectionResult.get();
|
||||
assertNotNull(threadInfo);
|
||||
assertThat(threadInfo.getThreadName(), containsString("[" + blockedNodeName + "]"));
|
||||
assertThat(threadInfo.getLockOwnerName(), containsString("[" + disruptedNodeName + "]"));
|
||||
assertThat(threadInfo.getLockInfo().getClassName(), containsString(ReentrantLock.class.getName()));
|
||||
} finally {
|
||||
stop.set(true);
|
||||
pauseUnderLock.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue