diff --git a/core/cli/src/main/java/org/elasticsearch/cli/Command.java b/core/cli/src/main/java/org/elasticsearch/cli/Command.java index 78a9f31283d..34ede7ccf94 100644 --- a/core/cli/src/main/java/org/elasticsearch/cli/Command.java +++ b/core/cli/src/main/java/org/elasticsearch/cli/Command.java @@ -38,6 +38,8 @@ public abstract class Command implements Closeable { /** A description of the command, used in the help output. */ protected final String description; + private final Runnable beforeMain; + /** The option parser for this command. */ protected final OptionParser parser = new OptionParser(); @@ -46,8 +48,15 @@ public abstract class Command implements Closeable { private final OptionSpec verboseOption = parser.acceptsAll(Arrays.asList("v", "verbose"), "show verbose output").availableUnless(silentOption); - public Command(String description) { + /** + * Construct the command with the specified command description and runnable to execute before main is invoked. + * + * @param description the command description + * @param beforeMain the before-main runnable + */ + public Command(final String description, final Runnable beforeMain) { this.description = description; + this.beforeMain = beforeMain; } private Thread shutdownHookThread; @@ -75,7 +84,7 @@ public abstract class Command implements Closeable { Runtime.getRuntime().addShutdownHook(shutdownHookThread); } - beforeExecute(); + beforeMain.run(); try { mainWithoutErrorHandling(args, terminal); @@ -93,12 +102,6 @@ public abstract class Command implements Closeable { return ExitCodes.OK; } - /** - * Setup method to be executed before parsing or execution of the command being run. Any exceptions thrown by the - * method will not be cleanly caught by the parser. - */ - protected void beforeExecute() {} - /** * Executes the command, but all errors are thrown. */ diff --git a/core/cli/src/main/java/org/elasticsearch/cli/MultiCommand.java b/core/cli/src/main/java/org/elasticsearch/cli/MultiCommand.java index 16754cd7bf1..ba6b447792a 100644 --- a/core/cli/src/main/java/org/elasticsearch/cli/MultiCommand.java +++ b/core/cli/src/main/java/org/elasticsearch/cli/MultiCommand.java @@ -35,8 +35,14 @@ public class MultiCommand extends Command { private final NonOptionArgumentSpec arguments = parser.nonOptions("command"); - public MultiCommand(String description) { - super(description); + /** + * Construct the multi-command with the specified command description and runnable to execute before main is invoked. + * + * @param description the multi-command description + * @param beforeMain the before-main runnable + */ + public MultiCommand(final String description, final Runnable beforeMain) { + super(description, beforeMain); parser.posixlyCorrect(true); } diff --git a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index c30dfd360a0..5b20b848f0b 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -33,8 +33,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.AliasFilter; @@ -86,6 +88,19 @@ public class TransportExplainAction extends TransportSingleShardAction listener) throws IOException { + IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.awaitShardSearchActive(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } + @Override protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException { ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId, diff --git a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 884af4a3af9..d14db67744d 100644 --- a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -19,13 +19,13 @@ package org.elasticsearch.action.get; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -38,6 +38,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the get operation. */ @@ -76,6 +78,23 @@ public class TransportGetAction extends TransportSingleShardAction listener) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + if (request.realtime()) { // we are not tied to a refresh cycle here anyway + listener.onResponse(shardOperation(request, shardId)); + } else { + indexShard.awaitShardSearchActive(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } + } + @Override protected GetResponse shardOperation(GetRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 811dcbed3dc..f2b2090dc28 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -47,6 +48,8 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.Executor; import java.util.function.Supplier; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; @@ -78,7 +81,7 @@ public abstract class TransportSingleShardAction listener) throws IOException { + threadPool.executor(this.executor).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(shardOperation(request, shardId)); + } + }); + } protected abstract Response newResponse(); protected abstract boolean resolveIndex(Request request); @@ -291,11 +307,27 @@ public abstract class TransportSingleShardAction() { + @Override + public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (IOException e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + }); } } - /** * Internal request class that gets built on each node. Holds the original request plus additional info. */ diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index 5ff55a6fa55..289f40f1a34 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.termvectors; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; @@ -37,6 +38,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the get operation. */ @@ -82,6 +85,23 @@ public class TransportTermVectorsAction extends TransportSingleShardAction listener) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles + listener.onResponse(shardOperation(request, shardId)); + } else { + indexShard.awaitShardSearchActive(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } + } + @Override protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java b/core/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java index cfe73459a05..1538f0cdf00 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java @@ -51,7 +51,7 @@ class Elasticsearch extends EnvironmentAwareCommand { // visible for testing Elasticsearch() { - super("starts elasticsearch"); + super("starts elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging versionOption = parser.acceptsAll(Arrays.asList("V", "version"), "Prints elasticsearch version information and exits"); daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), @@ -92,15 +92,6 @@ class Elasticsearch extends EnvironmentAwareCommand { return elasticsearch.main(args, terminal); } - @Override - protected boolean shouldConfigureLoggingWithoutConfig() { - /* - * If we allow logging to be configured without a config before we are ready to read the log4j2.properties file, then we will fail - * to detect uses of logging before it is properly configured. - */ - return false; - } - @Override protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException { if (options.nonOptionArguments().isEmpty() == false) { diff --git a/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java b/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java index c6692cec08b..6869a6abb71 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java @@ -65,12 +65,10 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH } } - // visible for testing static boolean isFatalUncaught(Throwable e) { return e instanceof Error; } - // visible for testing void onFatalUncaught(final String threadName, final Throwable t) { final Logger logger = Loggers.getLogger(ElasticsearchUncaughtExceptionHandler.class, loggingPrefixSupplier.get()); logger.error( @@ -78,24 +76,32 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH () -> new ParameterizedMessage("fatal error in thread [{}], exiting", threadName), t); } - // visible for testing void onNonFatalUncaught(final String threadName, final Throwable t) { final Logger logger = Loggers.getLogger(ElasticsearchUncaughtExceptionHandler.class, loggingPrefixSupplier.get()); logger.warn((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("uncaught exception in thread [{}]", threadName), t); } - // visible for testing void halt(int status) { - AccessController.doPrivileged(new PrivilegedAction() { - @SuppressForbidden(reason = "halt") - @Override - public Void run() { - // we halt to prevent shutdown hooks from running - Runtime.getRuntime().halt(status); - return null; - } - }); + AccessController.doPrivileged(new PrivilegedHaltAction(status)); + } + + static class PrivilegedHaltAction implements PrivilegedAction { + + private final int status; + + private PrivilegedHaltAction(final int status) { + this.status = status; + } + + @SuppressForbidden(reason = "halt") + @Override + public Void run() { + // we halt to prevent shutdown hooks from running + Runtime.getRuntime().halt(status); + return null; + } + } } diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Security.java b/core/src/main/java/org/elasticsearch/bootstrap/Security.java index f591780b5ad..3693f5cba58 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/Security.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/Security.java @@ -119,7 +119,11 @@ final class Security { Policy.setPolicy(new ESPolicy(createPermissions(environment), getPluginPermissions(environment), filterBadDefaults)); // enable security manager - final String[] classesThatCanExit = new String[]{ElasticsearchUncaughtExceptionHandler.class.getName(), Command.class.getName()}; + final String[] classesThatCanExit = + new String[]{ + // SecureSM matches class names as regular expressions so we escape the $ that arises from the nested class name + ElasticsearchUncaughtExceptionHandler.PrivilegedHaltAction.class.getName().replace("$", "\\$"), + Command.class.getName()}; System.setSecurityManager(new SecureSM(classesThatCanExit)); // do some basic tests diff --git a/core/src/main/java/org/elasticsearch/cli/CommandLoggingConfigurator.java b/core/src/main/java/org/elasticsearch/cli/CommandLoggingConfigurator.java new file mode 100644 index 00000000000..406c362dd72 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cli/CommandLoggingConfigurator.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cli; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.common.settings.Settings; + +/** + * Holder class for method to configure logging without Elasticsearch configuration files for use in CLI tools that will not read such + * files. + */ +final class CommandLoggingConfigurator { + + /** + * Configures logging without Elasticsearch configuration files based on the system property "es.logger.level" only. As such, any + * logging will be written to the console. + */ + static void configureLoggingWithoutConfig() { + // initialize default for es.logger.level because we will not read the log4j2.properties + final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name()); + final Settings settings = Settings.builder().put("logger.level", loggerLevel).build(); + LogConfigurator.configureWithoutConfig(settings); + } + +} diff --git a/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java b/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java index b2bd887e0f6..7d963655957 100644 --- a/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java +++ b/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java @@ -22,9 +22,7 @@ package org.elasticsearch.cli; import joptsimple.OptionSet; import joptsimple.OptionSpec; import joptsimple.util.KeyValuePair; -import org.apache.logging.log4j.Level; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.node.InternalSettingsPreparer; @@ -40,8 +38,25 @@ public abstract class EnvironmentAwareCommand extends Command { private final OptionSpec settingOption; - public EnvironmentAwareCommand(String description) { - super(description); + /** + * Construct the command with the specified command description. This command will have logging configured without reading Elasticsearch + * configuration files. + * + * @param description the command description + */ + public EnvironmentAwareCommand(final String description) { + this(description, CommandLoggingConfigurator::configureLoggingWithoutConfig); + } + + /** + * Construct the command with the specified command description and runnable to execute before main is invoked. Commands constructed + * with this constructor must take ownership of configuring logging. + * + * @param description the command description + * @param beforeMain the before-main runnable + */ + public EnvironmentAwareCommand(final String description, final Runnable beforeMain) { + super(description, beforeMain); this.settingOption = parser.accepts("E", "Configure a setting").withRequiredArg().ofType(KeyValuePair.class); } @@ -104,26 +119,6 @@ public abstract class EnvironmentAwareCommand extends Command { } } - @Override - protected final void beforeExecute() { - if (shouldConfigureLoggingWithoutConfig()) { - // initialize default for es.logger.level because we will not read the log4j2.properties - final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name()); - final Settings settings = Settings.builder().put("logger.level", loggerLevel).build(); - LogConfigurator.configureWithoutConfig(settings); - } - } - - /** - * Indicate whether or not logging should be configured without reading a log4j2.properties. Most commands should do this because we do - * not configure logging for CLI tools. Only commands that configure logging on their own should not do this. - * - * @return true if logging should be configured without reading a log4j2.properties file - */ - protected boolean shouldConfigureLoggingWithoutConfig() { - return true; - } - /** Execute the command with the initialized {@link Environment}. */ protected abstract void execute(Terminal terminal, OptionSet options, Environment env) throws Exception; diff --git a/core/src/main/java/org/elasticsearch/cli/LoggingAwareCommand.java b/core/src/main/java/org/elasticsearch/cli/LoggingAwareCommand.java new file mode 100644 index 00000000000..94da7f510b1 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cli/LoggingAwareCommand.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cli; + +/** + * A command that is aware of logging. This class should be preferred over the base {@link Command} class for any CLI tools that depend on + * core Elasticsearch as they could directly or indirectly touch classes that touch logging and as such logging needs to be configured. + */ +public abstract class LoggingAwareCommand extends Command { + + /** + * Construct the command with the specified command description. This command will have logging configured without reading Elasticsearch + * configuration files. + * + * @param description the command description + */ + public LoggingAwareCommand(final String description) { + super(description, CommandLoggingConfigurator::configureLoggingWithoutConfig); + } + +} diff --git a/core/src/main/java/org/elasticsearch/cli/LoggingAwareMultiCommand.java b/core/src/main/java/org/elasticsearch/cli/LoggingAwareMultiCommand.java new file mode 100644 index 00000000000..e22a4f22e83 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cli/LoggingAwareMultiCommand.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cli; + +/** + * A multi-command that is aware of logging. This class should be preferred over the base {@link MultiCommand} class for any CLI tools that + * depend on core Elasticsearch as they could directly or indirectly touch classes that touch logging and as such logging needs to be + * configured. + */ +public abstract class LoggingAwareMultiCommand extends MultiCommand { + + /** + * Construct the command with the specified command description. This command will have logging configured without reading Elasticsearch + * configuration files. + * + * @param description the command description + */ + public LoggingAwareMultiCommand(final String description) { + super(description, CommandLoggingConfigurator::configureLoggingWithoutConfig); + } + +} diff --git a/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 597fa970a57..d53d1fd0642 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -30,16 +30,19 @@ import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -650,6 +653,21 @@ public class Lucene { return LenientParser.parse(toParse, defaultValue); } + /** + * Tries to extract a segment reader from the given index reader. + * If no SegmentReader can be extracted an {@link IllegalStateException} is thrown. + */ + public static SegmentReader segmentReader(LeafReader reader) { + if (reader instanceof SegmentReader) { + return (SegmentReader) reader; + } else if (reader instanceof FilterLeafReader) { + final FilterLeafReader fReader = (FilterLeafReader) reader; + return segmentReader(FilterLeafReader.unwrap(fReader)); + } + // hard fail - we can't get a SegmentReader + throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]"); + } + @SuppressForbidden(reason = "Version#parseLeniently() used in a central place") private static final class LenientParser { public static Version parse(String toParse, Version defaultValue) { diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index b3b0fdb8cf8..63c5e4d5ab5 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -36,7 +36,6 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.Store; @@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, + IndexSettings.INDEX_SEARCH_IDLE_AFTER, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/common/settings/KeyStoreCli.java b/core/src/main/java/org/elasticsearch/common/settings/KeyStoreCli.java index 16818341cbd..b3d448dae50 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/KeyStoreCli.java +++ b/core/src/main/java/org/elasticsearch/common/settings/KeyStoreCli.java @@ -19,13 +19,14 @@ package org.elasticsearch.common.settings; +import org.elasticsearch.cli.LoggingAwareMultiCommand; import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.Terminal; /** * A cli tool for managing secrets in the elasticsearch keystore. */ -public class KeyStoreCli extends MultiCommand { +public class KeyStoreCli extends LoggingAwareMultiCommand { private KeyStoreCli() { super("A tool for managing settings stored in the elasticsearch keystore"); @@ -39,4 +40,5 @@ public class KeyStoreCli extends MultiCommand { public static void main(String[] args) throws Exception { exit(new KeyStoreCli().main(args, Terminal.DEFAULT)); } + } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 22aaaec1e72..7fc8dc6d6bd 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; @@ -624,6 +625,27 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { + // once we change the refresh interval we schedule yet another refresh + // to ensure we are in a clean and predictable state. + // it doesn't matter if we move from or to -1 in both cases we want + // docs to become visible immediately. This also flushes all pending indexing / search reqeusts + // that are waiting for a refresh. + threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("forced refresh failed after interval change", e); + } + + @Override + protected void doRun() throws Exception { + maybeRefreshEngine(true); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); rescheduleRefreshTasks(); } final Translog.Durability durability = indexSettings.getTranslogDurability(); @@ -686,17 +708,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } - private void maybeRefreshEngine() { - if (indexSettings.getRefreshInterval().millis() > 0) { + private void maybeRefreshEngine(boolean force) { + if (indexSettings.getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { - if (shard.isReadAllowed()) { - try { - if (shard.isRefreshNeeded()) { - shard.refresh("schedule"); - } - } catch (IndexShardClosedException | AlreadyClosedException ex) { - // fine - continue; - } + try { + shard.scheduledRefresh(); + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // fine - continue; } } } @@ -896,7 +914,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust @Override protected void runInternal() { - indexService.maybeRefreshEngine(); + indexService.maybeRefreshEngine(false); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 9e390fb5b22..bf498d3d07d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -62,6 +62,9 @@ public final class IndexSettings { public static final Setting INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), Property.IndexScope); + public static final Setting INDEX_SEARCH_IDLE_AFTER = + Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30), + TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic); public static final Setting INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope); @@ -262,6 +265,8 @@ public final class IndexSettings { private volatile int maxNgramDiff; private volatile int maxShingleDiff; private volatile boolean TTLPurgeDisabled; + private volatile TimeValue searchIdleAfter; + /** * The maximum number of refresh listeners allows on this shard. */ @@ -371,6 +376,7 @@ public final class IndexSettings { maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL); this.mergePolicyConfig = new MergePolicyConfig(logger, this); this.indexSortConfig = new IndexSortConfig(this); + searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) { throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: [" @@ -411,8 +417,11 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); } + private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } + private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } @@ -752,4 +761,16 @@ public final class IndexSettings { } public IndexScopedSettings getScopedSettings() { return scopedSettings;} + + /** + * Returns true iff the refresh setting exists or in other words is explicitly set. + */ + public boolean isExplicitRefresh() { + return INDEX_REFRESH_INTERVAL_SETTING.exists(settings); + } + + /** + * Returns the time that an index shard becomes search idle unless it's accessed in between + */ + public TimeValue getSearchIdleAfter() { return searchIdleAfter; } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index b07b68d82b8..add4a443903 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -71,18 +71,12 @@ class CombinedDeletionPolicy extends IndexDeletionPolicy { } private void setLastCommittedTranslogGeneration(List commits) throws IOException { - // We need to keep translog since the smallest translog generation of un-deleted commits. - // However, there are commits that are not deleted just because they are being snapshotted (rather than being kept by the policy). - // TODO: We need to distinguish those commits and skip them in calculating the minimum required translog generation. - long minRequiredGen = Long.MAX_VALUE; - for (IndexCommit indexCommit : commits) { - if (indexCommit.isDeleted() == false) { - long translogGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - minRequiredGen = Math.min(translogGen, minRequiredGen); - } - } - assert minRequiredGen != Long.MAX_VALUE : "All commits are deleted"; - translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); + // when opening an existing lucene index, we currently always open the last commit. + // we therefore use the translog gen as the one that will be required for recovery + final IndexCommit indexCommit = commits.get(commits.size() - 1); + assert indexCommit.isDeleted() == false : "last commit is deleted"; + long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen); } public SnapshotDeletionPolicy getIndexDeletionPolicy() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 99410d9f624..8959bf6f554 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexReader; @@ -143,27 +142,12 @@ public abstract class Engine implements Closeable { return a.ramBytesUsed(); } - /** - * Tries to extract a segment reader from the given index reader. - * If no SegmentReader can be extracted an {@link IllegalStateException} is thrown. - */ - protected static SegmentReader segmentReader(LeafReader reader) { - if (reader instanceof SegmentReader) { - return (SegmentReader) reader; - } else if (reader instanceof FilterLeafReader) { - final FilterLeafReader fReader = (FilterLeafReader) reader; - return segmentReader(FilterLeafReader.unwrap(fReader)); - } - // hard fail - we can't get a SegmentReader - throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]"); - } - /** * Returns whether a leaf reader comes from a merge (versus flush or addIndexes). */ protected static boolean isMergedSegment(LeafReader reader) { // We expect leaves to be segment readers - final Map diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics(); + final Map diagnostics = Lucene.segmentReader(reader).getSegmentInfo().info.getDiagnostics(); final String source = diagnostics.get(IndexWriter.SOURCE); assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH, IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source; @@ -611,7 +595,7 @@ public abstract class Engine implements Closeable { try (Searcher searcher = acquireSearcher("segments_stats")) { SegmentsStats stats = new SegmentsStats(); for (LeafReaderContext reader : searcher.reader().leaves()) { - final SegmentReader segmentReader = segmentReader(reader.reader()); + final SegmentReader segmentReader = Lucene.segmentReader(reader.reader()); stats.add(1, segmentReader.ramBytesUsed()); stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader())); stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader())); @@ -718,7 +702,7 @@ public abstract class Engine implements Closeable { // first, go over and compute the search ones... try (Searcher searcher = acquireSearcher("segments")){ for (LeafReaderContext reader : searcher.reader().leaves()) { - final SegmentReader segmentReader = segmentReader(reader.reader()); + final SegmentReader segmentReader = Lucene.segmentReader(reader.reader()); SegmentCommitInfo info = segmentReader.getSegmentInfo(); assert !segments.containsKey(info.info.name); Segment segment = new Segment(info.info.name); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0bc250275e9..0f66a5fca9c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,10 +21,14 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.ObjectLongMap; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; @@ -58,7 +62,6 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -233,6 +236,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ private final RefreshListeners refreshListeners; + private final AtomicLong lastSearcherAccess = new AtomicLong(); + private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); + public IndexShard( ShardRouting shardRouting, IndexSettings indexSettings, @@ -297,6 +303,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); + lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); } @@ -855,15 +862,30 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } public DocsStats docStats() { + // we calculate the doc stats based on the internal reader that is more up-to-date and not subject + // to external refreshes. For instance we don't refresh an external reader if we flush and indices with + // index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics + // when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are + // safe here. long numDocs = 0; long numDeletedDocs = 0; long sizeInBytes = 0; - List segments = segments(false); - for (Segment segment : segments) { - if (segment.search) { - numDocs += segment.getNumDocs(); - numDeletedDocs += segment.getDeletedDocs(); - sizeInBytes += segment.getSizeInBytes(); + try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) { + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + markSearcherAccessed(); + for (LeafReaderContext reader : searcher.reader().leaves()) { + // we go on the segment level here to get accurate numbers + final SegmentReader segmentReader = Lucene.segmentReader(reader.reader()); + SegmentCommitInfo info = segmentReader.getSegmentInfo(); + numDocs += reader.reader().numDocs(); + numDeletedDocs += reader.reader().numDeletedDocs(); + try { + sizeInBytes += info.sizeInBytes(); + } catch (IOException e) { + logger.trace((org.apache.logging.log4j.util.Supplier) + () -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e); + } } } return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); @@ -948,6 +970,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public CompletionStats completionStats(String... fields) { CompletionStats completionStats = new CompletionStats(); try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) { + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + markSearcherAccessed(); completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields)); } return completionStats; @@ -1117,6 +1142,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); } + private void markSearcherAccessed() { + lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); + } + private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); final Engine engine = getEngine(); @@ -2418,14 +2447,74 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Returns true iff one or more changes to the engine are not visible to via the current searcher *or* there are pending - * refresh listeners. - * Otherwise false. + * Executes a scheduled refresh if necessary. * - * @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed + * @return true iff the engine got refreshed otherwise false */ - public boolean isRefreshNeeded() { - return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded()); + public boolean scheduledRefresh() { + boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); + if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) { + if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it + && isSearchIdle() && indexSettings.isExplicitRefresh() == false) { + // lets skip this refresh since we are search idle and + // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will + // cause the next schedule to refresh. + setRefreshPending(); + return false; + } else { + refresh("schedule"); + return true; + } + } + return false; + } + + /** + * Returns true if this shards is search idle + */ + final boolean isSearchIdle() { + return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); + } + + /** + * Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds. + */ + final long getLastSearcherAccess() { + return lastSearcherAccess.get(); + } + + private void setRefreshPending() { + Engine engine = getEngine(); + Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); + Translog.Location location; + do { + location = this.pendingRefreshLocation.get(); + if (location != null && lastWriteLocation.compareTo(location) <= 0) { + break; + } + } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false); + } + + /** + * Registers the given listener and invokes it once the shard is active again and all + * pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be + * invoked immediately. + * @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with + * true if the listener was registered to wait for a refresh. + */ + public final void awaitShardSearchActive(Consumer listener) { + if (isSearchIdle()) { + markSearcherAccessed(); // move the shard into non-search idle + } + final Translog.Location location = pendingRefreshLocation.get(); + if (location != null) { + addRefreshListener(location, (b) -> { + pendingRefreshLocation.compareAndSet(location, null); + listener.accept(true); + }); + } else { + listener.accept(false); + } } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 668633e07ef..cc9dbdeb63f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -19,6 +19,12 @@ package org.elasticsearch.index.translog; +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongObjectHashMap; +import com.carrotsearch.hppc.LongSet; +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.index.seqno.SequenceNumbers; + import java.io.Closeable; import java.io.IOException; import java.util.Arrays; @@ -30,32 +36,44 @@ final class MultiSnapshot implements Translog.Snapshot { private final TranslogSnapshot[] translogs; private final int totalOperations; + private int overriddenOperations; private final Closeable onClose; private int index; + private final SeqNoSet seenSeqNo; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { this.translogs = translogs; - totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.overriddenOperations = 0; this.onClose = onClose; - index = 0; + this.seenSeqNo = new SeqNoSet(); + this.index = translogs.length - 1; } - @Override public int totalOperations() { return totalOperations; } + @Override + public int overriddenOperations() { + return overriddenOperations; + } + @Override public Translog.Operation next() throws IOException { - for (; index < translogs.length; index++) { + for (; index >= 0; index--) { final TranslogSnapshot current = translogs[index]; - Translog.Operation op = current.next(); - if (op != null) { // if we are null we move to the next snapshot - return op; + Translog.Operation op; + while ((op = current.next()) != null) { + if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { + return op; + } else { + overriddenOperations++; + } } } return null; @@ -65,4 +83,76 @@ final class MultiSnapshot implements Translog.Snapshot { public void close() throws IOException { onClose.close(); } + + /** + * A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1). + */ + private static final class CountedBitSet { + private short onBits; + private final FixedBitSet bitset; + + CountedBitSet(short numBits) { + assert numBits > 0; + this.onBits = 0; + this.bitset = new FixedBitSet(numBits); + } + + boolean getAndSet(int index) { + assert index >= 0; + boolean wasOn = bitset.getAndSet(index); + if (wasOn == false) { + onBits++; + } + return wasOn; + } + + boolean hasAllBitsOn() { + return onBits == bitset.length(); + } + } + + /** + * Sequence numbers from translog are likely to form contiguous ranges, + * thus collapsing a completed bitset into a single entry will reduce memory usage. + */ + static final class SeqNoSet { + static final short BIT_SET_SIZE = 1024; + private final LongSet completedSets = new LongHashSet(); + private final LongObjectHashMap ongoingSets = new LongObjectHashMap<>(); + + /** + * Marks this sequence number and returns true if it is seen before. + */ + boolean getAndSet(long value) { + assert value >= 0; + final long key = value / BIT_SET_SIZE; + + if (completedSets.contains(key)) { + return true; + } + + CountedBitSet bitset = ongoingSets.get(key); + if (bitset == null) { + bitset = new CountedBitSet(BIT_SET_SIZE); + ongoingSets.put(key, bitset); + } + + final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE)); + if (bitset.hasAllBitsOn()) { + ongoingSets.remove(key); + completedSets.add(key); + } + return wasOn; + } + + // For testing + long completeSetsSize() { + return completedSets.size(); + } + + // For testing + long ongoingSetsSize() { + return ongoingSets.size(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index b361d107b22..b51aca2ae25 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -845,10 +845,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public interface Snapshot extends Closeable { /** - * The total number of operations in the translog. + * The total estimated number of operations in the snapshot. */ int totalOperations(); + /** + * The number of operations have been overridden (eg. superseded) in the snapshot so far. + * If two operations have the same sequence number, the operation with a lower term will be overridden by the operation + * with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called. + */ + default int overriddenOperations() { + return 0; + } + /** * Returns the next operation in the snapshot or null if we reached the end. */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java index 944296d6813..b9cbf032951 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java @@ -19,13 +19,14 @@ package org.elasticsearch.index.translog; +import org.elasticsearch.cli.LoggingAwareMultiCommand; import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.Terminal; /** * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool */ -public class TranslogToolCli extends MultiCommand { +public class TranslogToolCli extends LoggingAwareMultiCommand { private TranslogToolCli() { super("A CLI tool for various Elasticsearch translog actions"); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 2fb62256942..5d1c244ba44 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -66,6 +66,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -567,8 +568,9 @@ public class RecoverySourceHandler { cancellableThreads.executeIO(sendBatch); } - assert expectedTotalOps == skippedOps + totalSentOps - : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; + assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps + : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", + expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 49ab6652957..117a979639b 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -582,6 +582,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv throws IOException { return createSearchContext(request, timeout, true); } + private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, boolean assertAsyncActions) throws IOException { @@ -979,22 +980,31 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv * The action listener is guaranteed to be executed on the search thread-pool */ private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + ActionListener actionListener = ActionListener.wrap(r -> + threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(request); + } + }), listener::onFailure); + IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); + if (shardOrNull != null) { + // now we need to check if there is a pending refresh and register + ActionListener finalListener = actionListener; + actionListener = ActionListener.wrap(r -> + shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); + } // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead - Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), - ActionListener.wrap(r -> - threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); + - @Override - protected void doRun() throws Exception { - listener.onResponse(request); - } - }), listener::onFailure)); } /** @@ -1003,4 +1013,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return indicesService.getRewriteContext(nowInMillis); } + + public IndicesService getIndicesService() { + return indicesService; + } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java index da593d57b77..403bf833878 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java @@ -20,21 +20,16 @@ package org.elasticsearch.search.fetch.subphase; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Map; -import static org.elasticsearch.common.xcontent.XContentFactory.contentBuilder; - public final class FetchSourceSubPhase implements FetchSubPhase { @Override @@ -65,7 +60,17 @@ public final class FetchSourceSubPhase implements FetchSubPhase { final int initialCapacity = nestedHit ? 1024 : Math.min(1024, source.internalSourceRef().length()); BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity); XContentBuilder builder = new XContentBuilder(source.sourceContentType().xContent(), streamOutput); - builder.value(value); + if (value != null) { + builder.value(value); + } else { + // This happens if the source filtering could not find the specified in the _source. + // Just doing `builder.value(null)` is valid, but the xcontent validation can't detect what format + // it is. In certain cases, for example response serialization we fail if no xcontent type can't be + // detected. So instead we just return an empty top level object. Also this is in inline with what was + // being return in this situation in 5.x and earlier. + builder.startObject(); + builder.endObject(); + } hitContext.hit().sourceRef(builder.bytes()); } catch (IOException e) { throw new ElasticsearchException("Error filtering source", e); diff --git a/core/src/test/java/org/elasticsearch/cli/CommandTests.java b/core/src/test/java/org/elasticsearch/cli/CommandTests.java index e3c5c254d32..2b2437eea65 100644 --- a/core/src/test/java/org/elasticsearch/cli/CommandTests.java +++ b/core/src/test/java/org/elasticsearch/cli/CommandTests.java @@ -28,7 +28,7 @@ public class CommandTests extends ESTestCase { static class UserErrorCommand extends Command { UserErrorCommand() { - super("Throws a user error"); + super("Throws a user error", () -> {}); } @Override @@ -46,7 +46,7 @@ public class CommandTests extends ESTestCase { static class UsageErrorCommand extends Command { UsageErrorCommand() { - super("Throws a usage error"); + super("Throws a usage error", () -> {}); } @Override @@ -66,7 +66,7 @@ public class CommandTests extends ESTestCase { boolean executed = false; NoopCommand() { - super("Does nothing"); + super("Does nothing", () -> {}); } @Override diff --git a/core/src/test/java/org/elasticsearch/cli/MultiCommandTests.java b/core/src/test/java/org/elasticsearch/cli/MultiCommandTests.java index f4680492028..f4448bbedfe 100644 --- a/core/src/test/java/org/elasticsearch/cli/MultiCommandTests.java +++ b/core/src/test/java/org/elasticsearch/cli/MultiCommandTests.java @@ -26,13 +26,13 @@ public class MultiCommandTests extends CommandTestCase { static class DummyMultiCommand extends MultiCommand { DummyMultiCommand() { - super("A dummy multi command"); + super("A dummy multi command", () -> {}); } } static class DummySubCommand extends Command { DummySubCommand() { - super("A dummy subcommand"); + super("A dummy subcommand", () -> {}); } @Override protected void execute(Terminal terminal, OptionSet options) throws Exception { diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 689ecea9f4a..5d4385cbd38 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -60,23 +60,20 @@ public class CombinedDeletionPolicyTests extends ESTestCase { EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); List commitList = new ArrayList<>(); long count = randomIntBetween(10, 20); - long minGen = Long.MAX_VALUE; + long lastGen = 0; for (int i = 0; i < count; i++) { - long lastGen = randomIntBetween(10, 20000); - minGen = Math.min(minGen, lastGen); + lastGen += randomIntBetween(10, 20000); commitList.add(mockIndexCommitWithTranslogGen(lastGen)); } combinedDeletionPolicy.onInit(commitList); - verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(minGen); + verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); commitList.clear(); - minGen = Long.MAX_VALUE; for (int i = 0; i < count; i++) { - long lastGen = randomIntBetween(10, 20000); - minGen = Math.min(minGen, lastGen); + lastGen += randomIntBetween(10, 20000); commitList.add(mockIndexCommitWithTranslogGen(lastGen)); } combinedDeletionPolicy.onCommit(commitList); - verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(minGen); + verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); } IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index cf4dab733f2..8c15a2a84dd 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -46,16 +46,23 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import org.hamcrest.Matcher; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase { @@ -299,6 +306,68 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase } } + public void testSeqNoCollision() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + int initDocs = shards.indexDocs(randomInt(10)); + List replicas = shards.getReplicas(); + IndexShard replica1 = replicas.get(0); + IndexShard replica2 = replicas.get(1); + shards.syncGlobalCheckpoint(); + + logger.info("--> Isolate replica1"); + IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); + indexOnReplica(replicationRequest, replica2); + + final Translog.Operation op1; + final List initOperations = new ArrayList<>(initDocs); + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + for (int i = 0; i < initDocs; i++) { + Translog.Operation op = snapshot.next(); + assertThat(op, is(notNullValue())); + initOperations.add(op); + } + op1 = snapshot.next(); + assertThat(op1, notNullValue()); + assertThat(snapshot.next(), nullValue()); + assertThat(snapshot.overriddenOperations(), equalTo(0)); + } + // Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1). + logger.info("--> Promote replica1 as the primary"); + shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. + shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); + final Translog.Operation op2; + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 2)); + op2 = snapshot.next(); + assertThat(op2.seqNo(), equalTo(op1.seqNo())); + assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat(snapshot.overriddenOperations(), equalTo(1)); + } + + // Make sure that peer-recovery transfers all but non-overridden operations. + IndexShard replica3 = shards.addReplica(); + logger.info("--> Promote replica2 as the primary"); + shards.promoteReplicaToPrimary(replica2); + logger.info("--> Recover replica3 from replica2"); + recoverReplica(replica3, replica2); + try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + assertThat(snapshot.next(), equalTo(op2)); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0)); + } + // TODO: We should assert the content of shards in the ReplicationGroup. + // Without rollback replicas(current implementation), we don't have the same content across shards: + // - replica1 has {doc1} + // - replica2 has {doc1, doc2} + // - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery + } + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index b29ba2d9efc..7c38b7c211f 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -18,14 +18,14 @@ */ package org.elasticsearch.index.shard; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; @@ -41,11 +41,11 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -56,11 +56,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.flush.FlushStats; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; @@ -82,8 +77,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -97,6 +94,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; import static org.hamcrest.Matchers.equalTo; public class IndexShardIT extends ESSingleNodeTestCase { @@ -106,21 +104,6 @@ public class IndexShardIT extends ESSingleNodeTestCase { return pluginList(InternalSettingsPlugin.class); } - private ParsedDocument testParsedDocument(String id, String type, String routing, long seqNo, - ParseContext.Document document, BytesReference source, XContentType xContentType, - Mapping mappingUpdate) { - Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); - Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - return new ParsedDocument(versionField, seqID, id, type, routing, - Collections.singletonList(document), source, xContentType, mappingUpdate); - } - public void testLockTryingToDelete() throws Exception { createIndex("test"); ensureGreen(); @@ -550,4 +533,96 @@ public class IndexShardIT extends ESSingleNodeTestCase { RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE); return shardRouting; } + + public void testAutomaticRefresh() throws InterruptedException { + TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000))); + Settings.Builder builder = Settings.builder(); + if (randomTimeValue != null) { + builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue); + } + IndexService indexService = createIndex("test", builder.build()); + assertFalse(indexService.getIndexSettings().isExplicitRefresh()); + ensureGreen(); + AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE); + CountDownLatch started = new CountDownLatch(1); + Thread t = new Thread(() -> { + SearchResponse searchResponse; + started.countDown(); + do { + searchResponse = client().prepareSearch().get(); + } while (searchResponse.getHits().totalHits != totalNumDocs.get()); + }); + t.start(); + started.await(); + assertNoSearchHits(client().prepareSearch().get()); + int numDocs = scaledRandomIntBetween(25, 100); + totalNumDocs.set(numDocs); + CountDownLatch indexingDone = new CountDownLatch(numDocs); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + indexingDone.countDown(); // one doc is indexed above blocking + IndexShard shard = indexService.getShard(0); + boolean hasRefreshed = shard.scheduledRefresh(); + if (randomTimeValue == TimeValue.ZERO) { + // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background + assertFalse(hasRefreshed); + assertTrue(shard.isSearchIdle()); + } else if (randomTimeValue == null){ + // with null we are guaranteed to see the doc since do execute the refresh. + // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently + assertFalse(shard.isSearchIdle()); + } + assertHitCount(client().prepareSearch().get(), 1); + for (int i = 1; i < numDocs; i++) { + client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON) + .execute(new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + indexingDone.countDown(); + } + + @Override + public void onFailure(Exception e) { + indexingDone.countDown(); + throw new AssertionError(e); + } + }); + } + indexingDone.await(); + t.join(); + } + + public void testPendingRefreshWithIntervalChange() throws InterruptedException { + Settings.Builder builder = Settings.builder(); + builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO); + IndexService indexService = createIndex("test", builder.build()); + assertFalse(indexService.getIndexSettings().isExplicitRefresh()); + ensureGreen(); + assertNoSearchHits(client().prepareSearch().get()); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + IndexShard shard = indexService.getShard(0); + assertFalse(shard.scheduledRefresh()); + assertTrue(shard.isSearchIdle()); + CountDownLatch refreshLatch = new CountDownLatch(1); + client().admin().indices().prepareRefresh() + .execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently + assertHitCount(client().prepareSearch().get(), 1); + client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + assertFalse(shard.scheduledRefresh()); + + // now disable background refresh and make sure the refresh happens + CountDownLatch updateSettingsLatch = new CountDownLatch(1); + client().admin().indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build()) + .execute(ActionListener.wrap(updateSettingsLatch::countDown)); + assertHitCount(client().prepareSearch().get(), 2); + // wait for both to ensure we don't have in-flight operations + updateSettingsLatch.await(); + refreshLatch.await(); + + client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + assertTrue(shard.scheduledRefresh()); + assertTrue(shard.isSearchIdle()); + assertHitCount(client().prepareSearch().get(), 3); + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5caa749edb7..c0c97901d3e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -62,7 +62,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -70,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -2271,11 +2274,17 @@ public class IndexShardTests extends IndexShardTestCase { final String id = Integer.toString(i); indexDoc(indexShard, "test", id); } - - indexShard.refresh("test"); + if (randomBoolean()) { + indexShard.refresh("test"); + } else { + indexShard.flush(new FlushRequest()); + } { final DocsStats docsStats = indexShard.docStats(); assertThat(docsStats.getCount(), equalTo(numDocs)); + try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) { + assertTrue(searcher.reader().numDocs() <= docsStats.getCount()); + } assertThat(docsStats.getDeleted(), equalTo(0L)); assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L)); } @@ -2295,9 +2304,14 @@ public class IndexShardTests extends IndexShardTestCase { flushRequest.waitIfOngoing(false); indexShard.flush(flushRequest); - indexShard.refresh("test"); + if (randomBoolean()) { + indexShard.refresh("test"); + } { final DocsStats docStats = indexShard.docStats(); + try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) { + assertTrue(searcher.reader().numDocs() <= docStats.getCount()); + } assertThat(docStats.getCount(), equalTo(numDocs)); // Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete)); @@ -2309,7 +2323,11 @@ public class IndexShardTests extends IndexShardTestCase { forceMergeRequest.maxNumSegments(1); indexShard.forceMerge(forceMergeRequest); - indexShard.refresh("test"); + if (randomBoolean()) { + indexShard.refresh("test"); + } else { + indexShard.flush(new FlushRequest()); + } { final DocsStats docStats = indexShard.docStats(); assertThat(docStats.getCount(), equalTo(numDocs)); @@ -2340,8 +2358,11 @@ public class IndexShardTests extends IndexShardTestCase { assertThat("Without flushing, segment sizes should be zero", indexShard.docStats().getTotalSizeInBytes(), equalTo(0L)); - indexShard.flush(new FlushRequest()); - indexShard.refresh("test"); + if (randomBoolean()) { + indexShard.flush(new FlushRequest()); + } else { + indexShard.refresh("test"); + } { final DocsStats docsStats = indexShard.docStats(); final StoreStats storeStats = indexShard.storeStats(); @@ -2361,9 +2382,11 @@ public class IndexShardTests extends IndexShardTestCase { indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}"); } } - - indexShard.flush(new FlushRequest()); - indexShard.refresh("test"); + if (randomBoolean()) { + indexShard.flush(new FlushRequest()); + } else { + indexShard.refresh("test"); + } { final DocsStats docsStats = indexShard.docStats(); final StoreStats storeStats = indexShard.storeStats(); @@ -2567,4 +2590,137 @@ public class IndexShardTests extends IndexShardTestCase { public void verify(String verificationToken, DiscoveryNode localNode) { } } + + public void testIsSearchIdle() throws Exception { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + assertFalse(primary.isSearchIdle()); + + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + assertTrue(primary.isSearchIdle()); + + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMinutes(1)) + .build(); + scopedSettings.applySettings(settings); + assertFalse(primary.isSearchIdle()); + + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(10)) + .build(); + scopedSettings.applySettings(settings); + assertBusy(() -> assertFalse(primary.isSearchIdle())); + do { + // now loop until we are fast enough... shouldn't take long + primary.acquireSearcher("test").close(); + } while (primary.isSearchIdle()); + closeShards(primary); + } + + public void testScheduledRefresh() throws IOException, InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + + assertFalse(primary.getEngine().refreshNeeded()); + indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + long lastSearchAccess = primary.getLastSearcherAccess(); + assertFalse(primary.scheduledRefresh()); + assertEquals(lastSearchAccess, primary.getLastSearcherAccess()); + // wait until the thread-pool has moved the timestamp otherwise we can't assert on this below + awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess); + CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + primary.awaitShardSearchActive(refreshed -> { + assertTrue(refreshed); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(2, searcher.reader().numDocs()); + } finally { + latch.countDown(); + } + }); + } + assertNotEquals("awaitShardSearchActive must access a searcher to remove search idle state", lastSearchAccess, + primary.getLastSearcherAccess()); + assertTrue(lastSearchAccess < primary.getLastSearcherAccess()); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(1, searcher.reader().numDocs()); + } + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch.await(); + CountDownLatch latch1 = new CountDownLatch(1); + primary.awaitShardSearchActive(refreshed -> { + assertFalse(refreshed); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(2, searcher.reader().numDocs()); + } finally { + latch1.countDown(); + } + + }); + latch1.await(); + closeShards(primary); + } + + public void testRefreshIsNeededWithRefreshListeners() throws IOException, InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + Engine.IndexResult doc = indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + CountDownLatch latch = new CountDownLatch(1); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown()); + assertEquals(1, latch.getCount()); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch.await(); + + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + + doc = indexDoc(primary, "test", "2", "{\"foo\" : \"bar\"}"); + CountDownLatch latch1 = new CountDownLatch(1); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown()); + assertEquals(1, latch1.getCount()); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch1.await(); + closeShards(primary); + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java new file mode 100644 index 00000000000..7ee2a6c3366 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongSet; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class MultiSnapshotTests extends ESTestCase { + + public void testTrackSeqNoSimpleRange() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final List values = LongStream.range(0, 1024).boxed().collect(Collectors.toList()); + Randomness.shuffle(values); + for (int i = 0; i < 1023; i++) { + assertThat(bitSet.getAndSet(values.get(i)), equalTo(false)); + assertThat(bitSet.ongoingSetsSize(), equalTo(1L)); + assertThat(bitSet.completeSetsSize(), equalTo(0L)); + } + + assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false)); + assertThat(bitSet.ongoingSetsSize(), equalTo(0L)); + assertThat(bitSet.completeSetsSize(), equalTo(1L)); + + assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true)); + assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false)); + } + + public void testTrackSeqNoDenseRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { + long seq = between(0, 5000); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed)); + assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L)); + }); + } + + public void testTrackSeqNoSparseRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { + long seq = between(i * 10_000, i * 30_000); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + }); + } + + public void testTrackSeqNoMimicTranslogRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final LongSet normalSet = new LongHashSet(); + long currentSeq = between(10_000_000, 1_000_000_000); + final int iterations = scaledRandomIntBetween(100, 2000); + assertThat(bitSet.completeSetsSize(), equalTo(0L)); + assertThat(bitSet.ongoingSetsSize(), equalTo(0L)); + long totalDocs = 0; + for (long i = 0; i < iterations; i++) { + int batchSize = between(1, 1500); + totalDocs += batchSize; + currentSeq -= batchSize; + List batch = LongStream.range(currentSeq, currentSeq + batchSize) + .boxed() + .collect(Collectors.toList()); + Randomness.shuffle(batch); + batch.forEach(seq -> { + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L)); + }); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); + } + assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024)); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index c45da660b00..4ca6057bd6b 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -26,6 +26,9 @@ import org.hamcrest.TypeSafeMatcher; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; public final class SnapshotMatchers { @@ -50,10 +53,14 @@ public final class SnapshotMatchers { /** * Consumes a snapshot and make sure it's content is as expected */ - public static Matcher equalsTo(ArrayList ops) { + public static Matcher equalsTo(List ops) { return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()])); } + public static Matcher containsOperationsInAnyOrder(Collection expectedOperations) { + return new ContainingInAnyOrderMatcher(expectedOperations); + } + public static class SizeMatcher extends TypeSafeMatcher { private final int size; @@ -127,5 +134,60 @@ public final class SnapshotMatchers { } } + public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher { + private final Collection expectedOps; + private List notFoundOps; + private List notExpectedOps; + static List drainAll(Translog.Snapshot snapshot) throws IOException { + final List actualOps = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + actualOps.add(op); + } + return actualOps; + } + + public ContainingInAnyOrderMatcher(Collection expectedOps) { + this.expectedOps = expectedOps; + } + + @Override + protected boolean matchesSafely(Translog.Snapshot snapshot) { + try { + List actualOps = drainAll(snapshot); + notFoundOps = expectedOps.stream() + .filter(o -> actualOps.contains(o) == false) + .collect(Collectors.toList()); + notExpectedOps = actualOps.stream() + .filter(o -> expectedOps.contains(o) == false) + .collect(Collectors.toList()); + return notFoundOps.isEmpty() && notExpectedOps.isEmpty(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to read snapshot content", ex); + } + } + + @Override + protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) { + if (notFoundOps.isEmpty() == false) { + mismatchDescription + .appendText("not found ").appendValueList("[", ", ", "]", notFoundOps); + } + if (notExpectedOps.isEmpty() == false) { + if (notFoundOps.isEmpty() == false) { + mismatchDescription.appendText("; "); + } + mismatchDescription + .appendText("not expected ").appendValueList("[", ", ", "]", notExpectedOps); + } + } + + @Override + public void describeTo(Description description) { + description.appendText("snapshot contains ") + .appendValueList("[", ", ", "]", expectedOps) + .appendText(" in any order."); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index c16488e149d..3b7b9913435 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -75,7 +75,6 @@ import org.junit.Before; import java.io.Closeable; import java.io.EOFException; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; @@ -84,11 +83,13 @@ import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -107,10 +108,10 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; -import java.util.stream.Stream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -221,7 +222,7 @@ public class TranslogTests extends ESTestCase { return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } - private void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { + private void addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { list.add(op); translog.add(op); } @@ -524,7 +525,7 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); markCurrentGenAsCommitted(translog); - assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { IOUtils.closeWhileHandlingException(toClose); @@ -1032,7 +1033,7 @@ public class TranslogTests extends ESTestCase { } assertEquals(max.generation, translog.currentFileGeneration()); - try (Translog.Snapshot snap = translog.newSnapshot()) { + try (Translog.Snapshot snap = new SortedSnapshot(translog.newSnapshot())) { Translog.Operation next; Translog.Operation maxOp = null; while ((next = snap.next()) != null) { @@ -1256,7 +1257,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1270,7 +1271,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1314,7 +1315,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1329,7 +1330,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1378,7 +1379,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -2065,7 +2066,7 @@ public class TranslogTests extends ESTestCase { } public void testRecoverWithUnbackedNextGen() throws IOException { - translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); translog.close(); TranslogConfig config = translog.getConfig(); @@ -2076,21 +2077,25 @@ public class TranslogTests extends ESTestCase { try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - for (int i = 0; i < 1; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); - } - tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + + Translog.Operation op = snapshot.next(); + assertNotNull("operation 1 must be non-null", op); + assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString())); + + tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8")))); } + try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - for (int i = 0; i < 2; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); - } + + Translog.Operation secondOp = snapshot.next(); + assertNotNull("operation 2 must be non-null", secondOp); + assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2); + + Translog.Operation firstOp = snapshot.next(); + assertNotNull("operation 1 must be non-null", firstOp); + assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1); } } @@ -2493,6 +2498,7 @@ public class TranslogTests extends ESTestCase { assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); readFromSnapshot++; } + readFromSnapshot += snapshot.overriddenOperations(); } assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo; @@ -2570,4 +2576,84 @@ public class TranslogTests extends ESTestCase { } } } + + public void testSnapshotReadOperationInReverse() throws Exception { + final Deque> views = new ArrayDeque<>(); + views.push(new ArrayList<>()); + final AtomicLong seqNo = new AtomicLong(); + + final int generations = randomIntBetween(2, 20); + for (int gen = 0; gen < generations; gen++) { + final int operations = randomIntBetween(1, 100); + for (int i = 0; i < operations; i++) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), new byte[]{1}); + translog.add(op); + views.peek().add(op); + } + if (frequently()) { + translog.rollGeneration(); + views.push(new ArrayList<>()); + } + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + final List expectedSeqNo = new ArrayList<>(); + while (views.isEmpty() == false) { + expectedSeqNo.addAll(views.pop()); + } + assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo)); + } + } + + public void testSnapshotDedupOperations() throws Exception { + final Map latestOperations = new HashMap<>(); + final int generations = between(2, 20); + for (int gen = 0; gen < generations; gen++) { + List batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList()); + Randomness.shuffle(batch); + for (Long seqNo : batch) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1}); + translog.add(op); + latestOperations.put(op.seqNo(), op); + } + translog.rollGeneration(); + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, containsOperationsInAnyOrder(latestOperations.values())); + } + } + + static class SortedSnapshot implements Translog.Snapshot { + private final Translog.Snapshot snapshot; + private List operations = null; + + SortedSnapshot(Translog.Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public Translog.Operation next() throws IOException { + if (operations == null) { + operations = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operations.add(op); + } + operations.sort(Comparator.comparing(Translog.Operation::seqNo)); + } + if (operations.isEmpty()) { + return null; + } + return operations.remove(0); + } + + @Override + public void close() throws IOException { + snapshot.close(); + } + } } diff --git a/core/src/test/java/org/elasticsearch/search/fetch/subphase/InnerHitsIT.java b/core/src/test/java/org/elasticsearch/search/fetch/subphase/InnerHitsIT.java index b41ba7a85f7..f0cad042158 100644 --- a/core/src/test/java/org/elasticsearch/search/fetch/subphase/InnerHitsIT.java +++ b/core/src/test/java/org/elasticsearch/search/fetch/subphase/InnerHitsIT.java @@ -56,7 +56,6 @@ import java.util.function.Function; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; @@ -635,6 +634,18 @@ public class InnerHitsIT extends ESIntegTestCase { assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(0).getSourceAsMap().size(), equalTo(2)); assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(1).getSourceAsMap().get("message"), equalTo("fox ate rabbit x y z")); + + // Source filter on a field that does not exist inside the nested document and just check that we do not fail and + // return an empty _source: + response = client().prepareSearch() + .setQuery(nestedQuery("comments", matchQuery("comments.message", "away"), ScoreMode.None) + .innerHit(new InnerHitBuilder().setFetchSourceContext(new FetchSourceContext(true, + new String[]{"comments.missing_field"}, null)))) + .get(); + assertNoFailures(response); + assertHitCount(response, 1); + assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getTotalHits(), equalTo(1L)); + assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(0).getSourceAsMap().size(), equalTo(0)); } public void testInnerHitsWithIgnoreUnmapped() throws Exception { diff --git a/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/PluginCli.java b/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/PluginCli.java index ccc96c94eb7..aac22302d3b 100644 --- a/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/PluginCli.java +++ b/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/PluginCli.java @@ -21,6 +21,7 @@ package org.elasticsearch.plugins; import org.apache.lucene.util.IOUtils; import org.elasticsearch.cli.Command; +import org.elasticsearch.cli.LoggingAwareMultiCommand; import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.Terminal; @@ -31,7 +32,7 @@ import java.util.Collections; /** * A cli tool for adding, removing and listing plugins for elasticsearch. */ -public class PluginCli extends MultiCommand { +public class PluginCli extends LoggingAwareMultiCommand { private final Collection commands; diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index bf93f62847f..6dd0fad4365 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -107,11 +107,22 @@ specific index module: Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all` for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled). +`index.search.idle.after`:: + How long a shard can not receive a search or get request until it's considered + search idle. (default is `30s`) + `index.refresh_interval`:: How often to perform a refresh operation, which makes recent changes to the - index visible to search. Defaults to `1s`. Can be set to `-1` to disable - refresh. + index visible to search. Defaults to `1s`. Can be set to `-1` to disable + refresh. If this setting is not explicitly set, shards that haven't seen + search traffic for at least `index.search.idle.after` seconds will not receive + background refreshes until they receive a search request. Searches that hit an + idle shard where a refresh is pending will wait for the next background + refresh (within `1s`). This behavior aims to automatically optimize bulk + indexing in the default case when no searches are performed. In order to opt + out of this behavior an explicit value of `1s` should set as the refresh + interval. `index.max_result_window`:: diff --git a/docs/reference/migration/migrate_7_0.asciidoc b/docs/reference/migration/migrate_7_0.asciidoc index 75a5131f8fb..7d84077ab86 100644 --- a/docs/reference/migration/migrate_7_0.asciidoc +++ b/docs/reference/migration/migrate_7_0.asciidoc @@ -35,10 +35,10 @@ way to reindex old indices is to use the `reindex` API. include::migrate_7_0/aggregations.asciidoc[] +include::migrate_7_0/analysis.asciidoc[] include::migrate_7_0/cluster.asciidoc[] include::migrate_7_0/indices.asciidoc[] include::migrate_7_0/mappings.asciidoc[] include::migrate_7_0/search.asciidoc[] include::migrate_7_0/plugins.asciidoc[] include::migrate_7_0/api.asciidoc[] - diff --git a/docs/reference/migration/migrate_7_0/indices.asciidoc b/docs/reference/migration/migrate_7_0/indices.asciidoc index 92f56a2ddbb..16e437b4156 100644 --- a/docs/reference/migration/migrate_7_0/indices.asciidoc +++ b/docs/reference/migration/migrate_7_0/indices.asciidoc @@ -44,4 +44,14 @@ Indices created with version `7.0.0` onwards will have an automatic `index.numbe value set. This might change how documents are distributed across shards depending on how many shards the index has. In order to maintain the exact same distribution as a pre `7.0.0` index, the `index.number_of_routing_shards` must be set to the `index.number_of_shards` at index creation time. -Note: if the number of routing shards equals the number of shards `_split` operations are not supported. \ No newline at end of file +Note: if the number of routing shards equals the number of shards `_split` operations are not supported. + +==== Skipped background refresh on search idle shards. + +Shards belonging to an index that does not have an explicit +`index.refresh_interval` configured will no longer refresh in the background +once the shard becomes "search idle", ie the shard hasn't seen any search +traffic for `index.search.idle.after` seconds (defaults to `30s`). Searches +that access a search idle shard will be "parked" until the next refresh +happens. Indexing requests with `wait_for_refresh` will also trigger +a background refresh. diff --git a/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java b/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java index 35bce1f4f4f..1df6935c61a 100644 --- a/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java +++ b/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java @@ -244,20 +244,9 @@ public class PercolatorFieldMapper extends FieldMapper { Query percolateQuery(String name, PercolateQuery.QueryStore queryStore, List documents, IndexSearcher searcher, Version indexVersion) throws IOException { IndexReader indexReader = searcher.getIndexReader(); - Tuple, Boolean> t = createCandidateQueryClauses(indexReader); - BooleanQuery.Builder candidateQuery = new BooleanQuery.Builder(); - if (t.v2() && indexVersion.onOrAfter(Version.V_6_1_0)) { - LongValuesSource valuesSource = LongValuesSource.fromIntField(minimumShouldMatchField.name()); - candidateQuery.add(new CoveringQuery(t.v1(), valuesSource), BooleanClause.Occur.SHOULD); - } else { - for (Query query : t.v1()) { - candidateQuery.add(query, BooleanClause.Occur.SHOULD); - } - } - // include extractionResultField:failed, because docs with this term have no extractedTermsField - // and otherwise we would fail to return these docs. Docs that failed query term extraction - // always need to be verified by MemoryIndex: - candidateQuery.add(new TermQuery(new Term(extractionResultField.name(), EXTRACTION_FAILED)), BooleanClause.Occur.SHOULD); + Tuple t = createCandidateQuery(indexReader, indexVersion); + Query candidateQuery = t.v1(); + boolean canUseMinimumShouldMatchField = t.v2(); Query verifiedMatchesQuery; // We can only skip the MemoryIndex verification when percolating a single non nested document. We cannot @@ -265,15 +254,55 @@ public class PercolatorFieldMapper extends FieldMapper { // ranges are extracted from IndexReader backed by a RamDirectory holding multiple documents we do // not know to which document the terms belong too and for certain queries we incorrectly emit candidate // matches as actual match. - if (t.v2() && indexReader.maxDoc() == 1) { + if (canUseMinimumShouldMatchField && indexReader.maxDoc() == 1) { verifiedMatchesQuery = new TermQuery(new Term(extractionResultField.name(), EXTRACTION_COMPLETE)); } else { verifiedMatchesQuery = new MatchNoDocsQuery("multiple or nested docs or CoveringQuery could not be used"); } - return new PercolateQuery(name, queryStore, documents, candidateQuery.build(), searcher, verifiedMatchesQuery); + return new PercolateQuery(name, queryStore, documents, candidateQuery, searcher, verifiedMatchesQuery); } - Tuple, Boolean> createCandidateQueryClauses(IndexReader indexReader) throws IOException { + Tuple createCandidateQuery(IndexReader indexReader, Version indexVersion) throws IOException { + Tuple, Map>> t = extractTermsAndRanges(indexReader); + List extractedTerms = t.v1(); + Map> encodedPointValuesByField = t.v2(); + // `1 + ` is needed to take into account the EXTRACTION_FAILED should clause + boolean canUseMinimumShouldMatchField = 1 + extractedTerms.size() + encodedPointValuesByField.size() <= + BooleanQuery.getMaxClauseCount(); + + List subQueries = new ArrayList<>(); + for (Map.Entry> entry : encodedPointValuesByField.entrySet()) { + String rangeFieldName = entry.getKey(); + List encodedPointValues = entry.getValue(); + byte[] min = encodedPointValues.get(0); + byte[] max = encodedPointValues.get(1); + Query query = BinaryRange.newIntersectsQuery(rangeField.name(), encodeRange(rangeFieldName, min, max)); + subQueries.add(query); + } + + BooleanQuery.Builder candidateQuery = new BooleanQuery.Builder(); + if (canUseMinimumShouldMatchField && indexVersion.onOrAfter(Version.V_6_1_0)) { + LongValuesSource valuesSource = LongValuesSource.fromIntField(minimumShouldMatchField.name()); + for (BytesRef extractedTerm : extractedTerms) { + subQueries.add(new TermQuery(new Term(queryTermsField.name(), extractedTerm))); + } + candidateQuery.add(new CoveringQuery(subQueries, valuesSource), BooleanClause.Occur.SHOULD); + } else { + candidateQuery.add(new TermInSetQuery(queryTermsField.name(), extractedTerms), BooleanClause.Occur.SHOULD); + for (Query subQuery : subQueries) { + candidateQuery.add(subQuery, BooleanClause.Occur.SHOULD); + } + } + // include extractionResultField:failed, because docs with this term have no extractedTermsField + // and otherwise we would fail to return these docs. Docs that failed query term extraction + // always need to be verified by MemoryIndex: + candidateQuery.add(new TermQuery(new Term(extractionResultField.name(), EXTRACTION_FAILED)), BooleanClause.Occur.SHOULD); + return new Tuple<>(candidateQuery.build(), canUseMinimumShouldMatchField); + } + + // This was extracted the method above, because otherwise it is difficult to test what terms are included in + // the query in case a CoveringQuery is used (it does not have a getter to retrieve the clauses) + Tuple, Map>> extractTermsAndRanges(IndexReader indexReader) throws IOException { List extractedTerms = new ArrayList<>(); Map> encodedPointValuesByField = new HashMap<>(); @@ -299,28 +328,7 @@ public class PercolatorFieldMapper extends FieldMapper { encodedPointValuesByField.put(info.name, encodedPointValues); } } - - final boolean canUseMinimumShouldMatchField; - final List queries = new ArrayList<>(); - if (extractedTerms.size() + encodedPointValuesByField.size() <= BooleanQuery.getMaxClauseCount()) { - canUseMinimumShouldMatchField = true; - for (BytesRef extractedTerm : extractedTerms) { - queries.add(new TermQuery(new Term(queryTermsField.name(), extractedTerm))); - } - } else { - canUseMinimumShouldMatchField = false; - queries.add(new TermInSetQuery(queryTermsField.name(), extractedTerms)); - } - - for (Map.Entry> entry : encodedPointValuesByField.entrySet()) { - String rangeFieldName = entry.getKey(); - List encodedPointValues = entry.getValue(); - byte[] min = encodedPointValues.get(0); - byte[] max = encodedPointValues.get(1); - Query query = BinaryRange.newIntersectsQuery(rangeField.name(), encodeRange(rangeFieldName, min, max)); - queries.add(query); - } - return new Tuple<>(queries, canUseMinimumShouldMatchField); + return new Tuple<>(extractedTerms, encodedPointValuesByField); } } diff --git a/modules/percolator/src/test/java/org/elasticsearch/percolator/CandidateQueryTests.java b/modules/percolator/src/test/java/org/elasticsearch/percolator/CandidateQueryTests.java index 971be4931e6..965182e5334 100644 --- a/modules/percolator/src/test/java/org/elasticsearch/percolator/CandidateQueryTests.java +++ b/modules/percolator/src/test/java/org/elasticsearch/percolator/CandidateQueryTests.java @@ -45,6 +45,7 @@ import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.CoveringQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FilterScorer; @@ -505,16 +506,17 @@ public class CandidateQueryTests extends ESSingleNodeTestCase { } try (IndexReader ir = DirectoryReader.open(directory)){ IndexSearcher percolateSearcher = new IndexSearcher(ir); - Query query = + PercolateQuery query = (PercolateQuery) fieldType.percolateQuery("_name", queryStore, Collections.singletonList(new BytesArray("{}")), percolateSearcher, v); + BooleanQuery candidateQuery = (BooleanQuery) query.getCandidateMatchesQuery(); + assertThat(candidateQuery.clauses().get(0).getQuery(), instanceOf(CoveringQuery.class)); TopDocs topDocs = shardSearcher.search(query, 10); assertEquals(2L, topDocs.totalHits); assertEquals(2, topDocs.scoreDocs.length); assertEquals(0, topDocs.scoreDocs[0].doc); assertEquals(2, topDocs.scoreDocs[1].doc); - query = new ConstantScoreQuery(query); - topDocs = shardSearcher.search(query, 10); + topDocs = shardSearcher.search(new ConstantScoreQuery(query), 10); assertEquals(2L, topDocs.totalHits); assertEquals(2, topDocs.scoreDocs.length); assertEquals(0, topDocs.scoreDocs[0].doc); @@ -526,7 +528,7 @@ public class CandidateQueryTests extends ESSingleNodeTestCase { try (RAMDirectory directory = new RAMDirectory()) { try (IndexWriter iw = new IndexWriter(directory, newIndexWriterConfig())) { Document document = new Document(); - for (int i = 0; i < 1025; i++) { + for (int i = 0; i < 1024; i++) { int fieldNumber = 2 + i; document.add(new StringField("field", "value" + fieldNumber, Field.Store.NO)); } diff --git a/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolateQueryBuilderTests.java b/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolateQueryBuilderTests.java index 655c0d508ec..193d0e8fe06 100644 --- a/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolateQueryBuilderTests.java +++ b/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolateQueryBuilderTests.java @@ -53,6 +53,7 @@ import org.elasticsearch.test.AbstractQueryTestCase; import org.hamcrest.Matchers; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -340,7 +341,7 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase, Boolean> t = fieldType.createCandidateQueryClauses(indexReader); - assertTrue(t.v2()); - List clauses = t.v1(); - clauses.sort(Comparator.comparing(Query::toString)); - assertEquals(15, clauses.size()); - assertEquals(fieldType.queryTermsField.name() + ":_field3\u0000me", clauses.get(0).toString()); - assertEquals(fieldType.queryTermsField.name() + ":_field3\u0000unhide", clauses.get(1).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000brown", clauses.get(2).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000dog", clauses.get(3).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000fox", clauses.get(4).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000jumps", clauses.get(5).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000lazy", clauses.get(6).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000over", clauses.get(7).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000quick", clauses.get(8).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field1\u0000the", clauses.get(9).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field2\u0000more", clauses.get(10).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field2\u0000some", clauses.get(11).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field2\u0000text", clauses.get(12).toString()); - assertEquals(fieldType.queryTermsField.name() + ":field4\u0000123", clauses.get(13).toString()); - assertThat(clauses.get(14).toString(), containsString(fieldName + ".range_field:, Map>> t = fieldType.extractTermsAndRanges(indexReader); + assertEquals(1, t.v2().size()); + Map> rangesMap = t.v2(); + assertEquals(1, rangesMap.size()); + + List range = rangesMap.get("number_field2"); + assertNotNull(range); + assertEquals(10, LongPoint.decodeDimension(range.get(0), 0)); + assertEquals(10, LongPoint.decodeDimension(range.get(1), 0)); + + List terms = t.v1(); + terms.sort(BytesRef::compareTo); + assertEquals(14, terms.size()); + assertEquals("_field3\u0000me", terms.get(0).utf8ToString()); + assertEquals("_field3\u0000unhide", terms.get(1).utf8ToString()); + assertEquals("field1\u0000brown", terms.get(2).utf8ToString()); + assertEquals("field1\u0000dog", terms.get(3).utf8ToString()); + assertEquals("field1\u0000fox", terms.get(4).utf8ToString()); + assertEquals("field1\u0000jumps", terms.get(5).utf8ToString()); + assertEquals("field1\u0000lazy", terms.get(6).utf8ToString()); + assertEquals("field1\u0000over", terms.get(7).utf8ToString()); + assertEquals("field1\u0000quick", terms.get(8).utf8ToString()); + assertEquals("field1\u0000the", terms.get(9).utf8ToString()); + assertEquals("field2\u0000more", terms.get(10).utf8ToString()); + assertEquals("field2\u0000some", terms.get(11).utf8ToString()); + assertEquals("field2\u0000text", terms.get(12).utf8ToString()); + assertEquals("field4\u0000123", terms.get(13).utf8ToString()); } - public void testCreateCandidateQuery_largeDocument() throws Exception { + public void testCreateCandidateQuery() throws Exception { addQueryFieldMappings(); MemoryIndex memoryIndex = new MemoryIndex(false); StringBuilder text = new StringBuilder(); - for (int i = 0; i < 1023; i++) { + for (int i = 0; i < 1022; i++) { text.append(i).append(' '); } memoryIndex.addField("field1", text.toString(), new WhitespaceAnalyzer()); memoryIndex.addField(new LongPoint("field2", 10L), new WhitespaceAnalyzer()); IndexReader indexReader = memoryIndex.createSearcher().getIndexReader(); - Tuple, Boolean> t = fieldType.createCandidateQueryClauses(indexReader); + Tuple t = fieldType.createCandidateQuery(indexReader, Version.CURRENT); assertTrue(t.v2()); - List clauses = t.v1(); - assertEquals(1024, clauses.size()); - assertThat(clauses.get(1023).toString(), containsString(fieldName + ".range_field: t = fieldType.createCandidateQuery(indexReader, Version.CURRENT); + assertTrue(t.v2()); + assertEquals(2, t.v1().clauses().size()); + assertThat(t.v1().clauses().get(0).getQuery(), instanceOf(CoveringQuery.class)); + assertThat(t.v1().clauses().get(1).getQuery(), instanceOf(TermQuery.class)); + + t = fieldType.createCandidateQuery(indexReader, Version.V_6_0_0); + assertTrue(t.v2()); + assertEquals(2, t.v1().clauses().size()); + assertThat(t.v1().clauses().get(0).getQuery(), instanceOf(TermInSetQuery.class)); + assertThat(t.v1().clauses().get(1).getQuery(), instanceOf(TermQuery.class)); + } + + public void testExtractTermsAndRanges_numberFields() throws Exception { addQueryFieldMappings(); MemoryIndex memoryIndex = new MemoryIndex(false); @@ -385,17 +413,45 @@ public class PercolatorFieldMapperTests extends ESSingleNodeTestCase { IndexReader indexReader = memoryIndex.createSearcher().getIndexReader(); - Tuple, Boolean> t = fieldType.createCandidateQueryClauses(indexReader); - assertThat(t.v2(), is(true)); - List clauses = t.v1(); - assertEquals(7, clauses.size()); - assertThat(clauses.get(0).toString(), containsString(fieldName + ".range_field:, Map>> t = fieldType.extractTermsAndRanges(indexReader); + assertEquals(0, t.v1().size()); + Map> rangesMap = t.v2(); + assertEquals(7, rangesMap.size()); + + List range = rangesMap.get("number_field1"); + assertNotNull(range); + assertEquals(10, IntPoint.decodeDimension(range.get(0), 0)); + assertEquals(10, IntPoint.decodeDimension(range.get(1), 0)); + + range = rangesMap.get("number_field2"); + assertNotNull(range); + assertEquals(20L, LongPoint.decodeDimension(range.get(0), 0)); + assertEquals(20L, LongPoint.decodeDimension(range.get(1), 0)); + + range = rangesMap.get("number_field3"); + assertNotNull(range); + assertEquals(30L, LongPoint.decodeDimension(range.get(0), 0)); + assertEquals(30L, LongPoint.decodeDimension(range.get(1), 0)); + + range = rangesMap.get("number_field4"); + assertNotNull(range); + assertEquals(30F, HalfFloatPoint.decodeDimension(range.get(0), 0), 0F); + assertEquals(30F, HalfFloatPoint.decodeDimension(range.get(1), 0), 0F); + + range = rangesMap.get("number_field5"); + assertNotNull(range); + assertEquals(40F, FloatPoint.decodeDimension(range.get(0), 0), 0F); + assertEquals(40F, FloatPoint.decodeDimension(range.get(1), 0), 0F); + + range = rangesMap.get("number_field6"); + assertNotNull(range); + assertEquals(50D, DoublePoint.decodeDimension(range.get(0), 0), 0D); + assertEquals(50D, DoublePoint.decodeDimension(range.get(1), 0), 0D); + + range = rangesMap.get("number_field7"); + assertNotNull(range); + assertEquals(InetAddresses.forString("192.168.1.12"), InetAddressPoint.decode(range.get(0))); + assertEquals(InetAddresses.forString("192.168.1.24"), InetAddressPoint.decode(range.get(1))); } public void testPercolatorFieldMapper() throws Exception { diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/cli/EvilCommandTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/cli/EvilCommandTests.java index 7c51f8afe69..2990101134f 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/cli/EvilCommandTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/cli/EvilCommandTests.java @@ -33,7 +33,7 @@ public class EvilCommandTests extends ESTestCase { public void testCommandShutdownHook() throws Exception { final AtomicBoolean closed = new AtomicBoolean(); final boolean shouldThrow = randomBoolean(); - final Command command = new Command("test-command-shutdown-hook") { + final Command command = new Command("test-command-shutdown-hook", () -> {}) { @Override protected void execute(Terminal terminal, OptionSet options) throws Exception { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml index be534364d4c..f5a9469f357 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -3,12 +3,9 @@ setup: - do: indices.create: index: test - body: - settings: - index: - # initializing replicas maintain the translog causing the test to fail. - # remove once https://github.com/elastic/elasticsearch/issues/25623 is fixed. - number_of_replicas: 0 + - do: + cluster.health: + wait_for_no_initializing_shards: true --- "Translog retention": diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index bf2ffc5236e..708a95e4a49 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -347,15 +347,15 @@ public class ElasticsearchAssertions { public static void assertAllSuccessful(BroadcastResponse response) { assertNoFailures(response); - assertThat("Expected all shards successful but got successful [" + response.getSuccessfulShards() + "] total [" + response.getTotalShards() + "]", - response.getTotalShards(), equalTo(response.getSuccessfulShards())); + assertThat("Expected all shards successful", + response.getSuccessfulShards(), equalTo(response.getTotalShards())); assertVersionSerializable(response); } public static void assertAllSuccessful(SearchResponse response) { assertNoFailures(response); - assertThat("Expected all shards successful but got successful [" + response.getSuccessfulShards() + "] total [" + response.getTotalShards() + "]", - response.getTotalShards(), equalTo(response.getSuccessfulShards())); + assertThat("Expected all shards successful", + response.getSuccessfulShards(), equalTo(response.getTotalShards())); assertVersionSerializable(response); }