Merge branch 'master' into ccr

* master:
  Skip shard refreshes if shard is `search idle` (#27500)
  Remove workaround in translog rest test (#27530)
  inner_hits: Return an empty _source for nested inner hit when filtering on a field that doesn't exist.
  percolator: Avoid TooManyClauses exception if number of terms / ranges is exactly equal to 1024
  Dedup translog operations by reading in reverse (#27268)
  Ensure logging is configured for CLI commands
  Ensure `doc_stats` are changing even if refresh is disabled (#27505)
  Fix classes that can exit
  Revert "Adjust CombinedDeletionPolicy for multiple commits (#27456)"
  Transpose expected and actual, and remove duplicate info from message. (#27515)
  [DOCS] Fixed broken link in breaking changes
This commit is contained in:
Jason Tedor 2017-11-27 12:48:39 -05:00
commit 11d46ad2aa
48 changed files with 1426 additions and 324 deletions

View File

@ -38,6 +38,8 @@ public abstract class Command implements Closeable {
/** A description of the command, used in the help output. */ /** A description of the command, used in the help output. */
protected final String description; protected final String description;
private final Runnable beforeMain;
/** The option parser for this command. */ /** The option parser for this command. */
protected final OptionParser parser = new OptionParser(); protected final OptionParser parser = new OptionParser();
@ -46,8 +48,15 @@ public abstract class Command implements Closeable {
private final OptionSpec<Void> verboseOption = private final OptionSpec<Void> verboseOption =
parser.acceptsAll(Arrays.asList("v", "verbose"), "show verbose output").availableUnless(silentOption); parser.acceptsAll(Arrays.asList("v", "verbose"), "show verbose output").availableUnless(silentOption);
public Command(String description) { /**
* Construct the command with the specified command description and runnable to execute before main is invoked.
*
* @param description the command description
* @param beforeMain the before-main runnable
*/
public Command(final String description, final Runnable beforeMain) {
this.description = description; this.description = description;
this.beforeMain = beforeMain;
} }
private Thread shutdownHookThread; private Thread shutdownHookThread;
@ -75,7 +84,7 @@ public abstract class Command implements Closeable {
Runtime.getRuntime().addShutdownHook(shutdownHookThread); Runtime.getRuntime().addShutdownHook(shutdownHookThread);
} }
beforeExecute(); beforeMain.run();
try { try {
mainWithoutErrorHandling(args, terminal); mainWithoutErrorHandling(args, terminal);
@ -93,12 +102,6 @@ public abstract class Command implements Closeable {
return ExitCodes.OK; return ExitCodes.OK;
} }
/**
* Setup method to be executed before parsing or execution of the command being run. Any exceptions thrown by the
* method will not be cleanly caught by the parser.
*/
protected void beforeExecute() {}
/** /**
* Executes the command, but all errors are thrown. * Executes the command, but all errors are thrown.
*/ */

View File

@ -35,8 +35,14 @@ public class MultiCommand extends Command {
private final NonOptionArgumentSpec<String> arguments = parser.nonOptions("command"); private final NonOptionArgumentSpec<String> arguments = parser.nonOptions("command");
public MultiCommand(String description) { /**
super(description); * Construct the multi-command with the specified command description and runnable to execute before main is invoked.
*
* @param description the multi-command description
* @param beforeMain the before-main runnable
*/
public MultiCommand(final String description, final Runnable beforeMain) {
super(description, beforeMain);
parser.posixlyCorrect(true); parser.posixlyCorrect(true);
} }

View File

@ -33,8 +33,10 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.AliasFilter;
@ -86,6 +88,19 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
} }
} }
@Override
protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener<ExplainResponse> listener) throws IOException {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
@Override @Override
protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException { protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException {
ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId, ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,

View File

@ -19,13 +19,13 @@
package org.elasticsearch.action.get; package org.elasticsearch.action.get;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -38,6 +38,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/** /**
* Performs the get operation. * Performs the get operation.
*/ */
@ -76,6 +78,23 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
} }
} }
@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}
@Override @Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) { protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
@ -47,6 +48,8 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Executor;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@ -78,7 +81,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
if (!isSubAction()) { if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
} }
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler()); transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
} }
/** /**
@ -97,6 +100,19 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(this.executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
}
});
}
protected abstract Response newResponse(); protected abstract Response newResponse();
protected abstract boolean resolveIndex(Request request); protected abstract boolean resolveIndex(Request request);
@ -291,11 +307,27 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
} }
Response response = shardOperation(request, request.internalShardId); asyncShardOperation(request, request.internalShardId, new ActionListener<Response>() {
channel.sendResponse(response); @Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
} }
} }
/** /**
* Internal request class that gets built on each node. Holds the original request plus additional info. * Internal request class that gets built on each node. Holds the original request plus additional info.
*/ */

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.termvectors; package org.elasticsearch.action.termvectors;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@ -37,6 +38,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/** /**
* Performs the get operation. * Performs the get operation.
*/ */
@ -82,6 +85,23 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
} }
} }
@Override
protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, ActionListener<TermVectorsResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}
@Override @Override
protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) { protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());

View File

@ -51,7 +51,7 @@ class Elasticsearch extends EnvironmentAwareCommand {
// visible for testing // visible for testing
Elasticsearch() { Elasticsearch() {
super("starts elasticsearch"); super("starts elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging
versionOption = parser.acceptsAll(Arrays.asList("V", "version"), versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
"Prints elasticsearch version information and exits"); "Prints elasticsearch version information and exits");
daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
@ -92,15 +92,6 @@ class Elasticsearch extends EnvironmentAwareCommand {
return elasticsearch.main(args, terminal); return elasticsearch.main(args, terminal);
} }
@Override
protected boolean shouldConfigureLoggingWithoutConfig() {
/*
* If we allow logging to be configured without a config before we are ready to read the log4j2.properties file, then we will fail
* to detect uses of logging before it is properly configured.
*/
return false;
}
@Override @Override
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException { protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
if (options.nonOptionArguments().isEmpty() == false) { if (options.nonOptionArguments().isEmpty() == false) {

View File

@ -65,12 +65,10 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH
} }
} }
// visible for testing
static boolean isFatalUncaught(Throwable e) { static boolean isFatalUncaught(Throwable e) {
return e instanceof Error; return e instanceof Error;
} }
// visible for testing
void onFatalUncaught(final String threadName, final Throwable t) { void onFatalUncaught(final String threadName, final Throwable t) {
final Logger logger = Loggers.getLogger(ElasticsearchUncaughtExceptionHandler.class, loggingPrefixSupplier.get()); final Logger logger = Loggers.getLogger(ElasticsearchUncaughtExceptionHandler.class, loggingPrefixSupplier.get());
logger.error( logger.error(
@ -78,24 +76,32 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH
() -> new ParameterizedMessage("fatal error in thread [{}], exiting", threadName), t); () -> new ParameterizedMessage("fatal error in thread [{}], exiting", threadName), t);
} }
// visible for testing
void onNonFatalUncaught(final String threadName, final Throwable t) { void onNonFatalUncaught(final String threadName, final Throwable t) {
final Logger logger = Loggers.getLogger(ElasticsearchUncaughtExceptionHandler.class, loggingPrefixSupplier.get()); final Logger logger = Loggers.getLogger(ElasticsearchUncaughtExceptionHandler.class, loggingPrefixSupplier.get());
logger.warn((org.apache.logging.log4j.util.Supplier<?>) logger.warn((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("uncaught exception in thread [{}]", threadName), t); () -> new ParameterizedMessage("uncaught exception in thread [{}]", threadName), t);
} }
// visible for testing
void halt(int status) { void halt(int status) {
AccessController.doPrivileged(new PrivilegedAction<Void>() { AccessController.doPrivileged(new PrivilegedHaltAction(status));
@SuppressForbidden(reason = "halt") }
@Override
public Void run() { static class PrivilegedHaltAction implements PrivilegedAction<Void> {
// we halt to prevent shutdown hooks from running
Runtime.getRuntime().halt(status); private final int status;
return null;
} private PrivilegedHaltAction(final int status) {
}); this.status = status;
}
@SuppressForbidden(reason = "halt")
@Override
public Void run() {
// we halt to prevent shutdown hooks from running
Runtime.getRuntime().halt(status);
return null;
}
} }
} }

View File

@ -119,7 +119,11 @@ final class Security {
Policy.setPolicy(new ESPolicy(createPermissions(environment), getPluginPermissions(environment), filterBadDefaults)); Policy.setPolicy(new ESPolicy(createPermissions(environment), getPluginPermissions(environment), filterBadDefaults));
// enable security manager // enable security manager
final String[] classesThatCanExit = new String[]{ElasticsearchUncaughtExceptionHandler.class.getName(), Command.class.getName()}; final String[] classesThatCanExit =
new String[]{
// SecureSM matches class names as regular expressions so we escape the $ that arises from the nested class name
ElasticsearchUncaughtExceptionHandler.PrivilegedHaltAction.class.getName().replace("$", "\\$"),
Command.class.getName()};
System.setSecurityManager(new SecureSM(classesThatCanExit)); System.setSecurityManager(new SecureSM(classesThatCanExit));
// do some basic tests // do some basic tests

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cli;
import org.apache.logging.log4j.Level;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
/**
* Holder class for method to configure logging without Elasticsearch configuration files for use in CLI tools that will not read such
* files.
*/
final class CommandLoggingConfigurator {
/**
* Configures logging without Elasticsearch configuration files based on the system property "es.logger.level" only. As such, any
* logging will be written to the console.
*/
static void configureLoggingWithoutConfig() {
// initialize default for es.logger.level because we will not read the log4j2.properties
final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name());
final Settings settings = Settings.builder().put("logger.level", loggerLevel).build();
LogConfigurator.configureWithoutConfig(settings);
}
}

View File

@ -22,9 +22,7 @@ package org.elasticsearch.cli;
import joptsimple.OptionSet; import joptsimple.OptionSet;
import joptsimple.OptionSpec; import joptsimple.OptionSpec;
import joptsimple.util.KeyValuePair; import joptsimple.util.KeyValuePair;
import org.apache.logging.log4j.Level;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.node.InternalSettingsPreparer;
@ -40,8 +38,25 @@ public abstract class EnvironmentAwareCommand extends Command {
private final OptionSpec<KeyValuePair> settingOption; private final OptionSpec<KeyValuePair> settingOption;
public EnvironmentAwareCommand(String description) { /**
super(description); * Construct the command with the specified command description. This command will have logging configured without reading Elasticsearch
* configuration files.
*
* @param description the command description
*/
public EnvironmentAwareCommand(final String description) {
this(description, CommandLoggingConfigurator::configureLoggingWithoutConfig);
}
/**
* Construct the command with the specified command description and runnable to execute before main is invoked. Commands constructed
* with this constructor must take ownership of configuring logging.
*
* @param description the command description
* @param beforeMain the before-main runnable
*/
public EnvironmentAwareCommand(final String description, final Runnable beforeMain) {
super(description, beforeMain);
this.settingOption = parser.accepts("E", "Configure a setting").withRequiredArg().ofType(KeyValuePair.class); this.settingOption = parser.accepts("E", "Configure a setting").withRequiredArg().ofType(KeyValuePair.class);
} }
@ -104,26 +119,6 @@ public abstract class EnvironmentAwareCommand extends Command {
} }
} }
@Override
protected final void beforeExecute() {
if (shouldConfigureLoggingWithoutConfig()) {
// initialize default for es.logger.level because we will not read the log4j2.properties
final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name());
final Settings settings = Settings.builder().put("logger.level", loggerLevel).build();
LogConfigurator.configureWithoutConfig(settings);
}
}
/**
* Indicate whether or not logging should be configured without reading a log4j2.properties. Most commands should do this because we do
* not configure logging for CLI tools. Only commands that configure logging on their own should not do this.
*
* @return true if logging should be configured without reading a log4j2.properties file
*/
protected boolean shouldConfigureLoggingWithoutConfig() {
return true;
}
/** Execute the command with the initialized {@link Environment}. */ /** Execute the command with the initialized {@link Environment}. */
protected abstract void execute(Terminal terminal, OptionSet options, Environment env) throws Exception; protected abstract void execute(Terminal terminal, OptionSet options, Environment env) throws Exception;

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cli;
/**
* A command that is aware of logging. This class should be preferred over the base {@link Command} class for any CLI tools that depend on
* core Elasticsearch as they could directly or indirectly touch classes that touch logging and as such logging needs to be configured.
*/
public abstract class LoggingAwareCommand extends Command {
/**
* Construct the command with the specified command description. This command will have logging configured without reading Elasticsearch
* configuration files.
*
* @param description the command description
*/
public LoggingAwareCommand(final String description) {
super(description, CommandLoggingConfigurator::configureLoggingWithoutConfig);
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cli;
/**
* A multi-command that is aware of logging. This class should be preferred over the base {@link MultiCommand} class for any CLI tools that
* depend on core Elasticsearch as they could directly or indirectly touch classes that touch logging and as such logging needs to be
* configured.
*/
public abstract class LoggingAwareMultiCommand extends MultiCommand {
/**
* Construct the command with the specified command description. This command will have logging configured without reading Elasticsearch
* configuration files.
*
* @param description the command description
*/
public LoggingAwareMultiCommand(final String description) {
super(description, CommandLoggingConfigurator::configureLoggingWithoutConfig);
}
}

View File

@ -30,16 +30,19 @@ import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField; import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation; import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.FieldDoc;
@ -650,6 +653,21 @@ public class Lucene {
return LenientParser.parse(toParse, defaultValue); return LenientParser.parse(toParse, defaultValue);
} }
/**
* Tries to extract a segment reader from the given index reader.
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
*/
public static SegmentReader segmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof FilterLeafReader) {
final FilterLeafReader fReader = (FilterLeafReader) reader;
return segmentReader(FilterLeafReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}
@SuppressForbidden(reason = "Version#parseLeniently() used in a central place") @SuppressForbidden(reason = "Version#parseLeniently() used in a central place")
private static final class LenientParser { private static final class LenientParser {
public static Version parse(String toParse, Version defaultValue) { public static Version parse(String toParse, Version defaultValue) {

View File

@ -36,7 +36,6 @@ import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING, FieldMapper.COERCE_SETTING,

View File

@ -19,13 +19,14 @@
package org.elasticsearch.common.settings; package org.elasticsearch.common.settings;
import org.elasticsearch.cli.LoggingAwareMultiCommand;
import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.MultiCommand;
import org.elasticsearch.cli.Terminal; import org.elasticsearch.cli.Terminal;
/** /**
* A cli tool for managing secrets in the elasticsearch keystore. * A cli tool for managing secrets in the elasticsearch keystore.
*/ */
public class KeyStoreCli extends MultiCommand { public class KeyStoreCli extends LoggingAwareMultiCommand {
private KeyStoreCli() { private KeyStoreCli() {
super("A tool for managing settings stored in the elasticsearch keystore"); super("A tool for managing settings stored in the elasticsearch keystore");
@ -39,4 +40,5 @@ public class KeyStoreCli extends MultiCommand {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
exit(new KeyStoreCli().main(args, Terminal.DEFAULT)); exit(new KeyStoreCli().main(args, Terminal.DEFAULT));
} }
} }

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -624,6 +625,27 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
} }
} }
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search reqeusts
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("forced refresh failed after interval change", e);
}
@Override
protected void doRun() throws Exception {
maybeRefreshEngine(true);
}
@Override
public boolean isForceExecution() {
return true;
}
});
rescheduleRefreshTasks(); rescheduleRefreshTasks();
} }
final Translog.Durability durability = indexSettings.getTranslogDurability(); final Translog.Durability durability = indexSettings.getTranslogDurability();
@ -686,17 +708,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
} }
} }
private void maybeRefreshEngine() { private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0) { if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) { for (IndexShard shard : this.shards.values()) {
if (shard.isReadAllowed()) { try {
try { shard.scheduledRefresh();
if (shard.isRefreshNeeded()) { } catch (IndexShardClosedException | AlreadyClosedException ex) {
shard.refresh("schedule"); // fine - continue;
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
} }
} }
} }
@ -896,7 +914,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
@Override @Override
protected void runInternal() { protected void runInternal() {
indexService.maybeRefreshEngine(); indexService.maybeRefreshEngine(false);
} }
@Override @Override

View File

@ -62,6 +62,9 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
Property.IndexScope); Property.IndexScope);
public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING = public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING =
new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(),
(value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope); (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope);
@ -262,6 +265,8 @@ public final class IndexSettings {
private volatile int maxNgramDiff; private volatile int maxNgramDiff;
private volatile int maxShingleDiff; private volatile int maxShingleDiff;
private volatile boolean TTLPurgeDisabled; private volatile boolean TTLPurgeDisabled;
private volatile TimeValue searchIdleAfter;
/** /**
* The maximum number of refresh listeners allows on this shard. * The maximum number of refresh listeners allows on this shard.
*/ */
@ -371,6 +376,7 @@ public final class IndexSettings {
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL); maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
this.mergePolicyConfig = new MergePolicyConfig(logger, this); this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this); this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered
if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) { if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) {
throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: [" throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: ["
@ -411,8 +417,11 @@ public final class IndexSettings {
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
} }
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue; this.flushThresholdSize = byteSizeValue;
} }
@ -752,4 +761,16 @@ public final class IndexSettings {
} }
public IndexScopedSettings getScopedSettings() { return scopedSettings;} public IndexScopedSettings getScopedSettings() { return scopedSettings;}
/**
* Returns true iff the refresh setting exists or in other words is explicitly set.
*/
public boolean isExplicitRefresh() {
return INDEX_REFRESH_INTERVAL_SETTING.exists(settings);
}
/**
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
} }

View File

@ -71,18 +71,12 @@ class CombinedDeletionPolicy extends IndexDeletionPolicy {
} }
private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException { private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
// We need to keep translog since the smallest translog generation of un-deleted commits. // when opening an existing lucene index, we currently always open the last commit.
// However, there are commits that are not deleted just because they are being snapshotted (rather than being kept by the policy). // we therefore use the translog gen as the one that will be required for recovery
// TODO: We need to distinguish those commits and skip them in calculating the minimum required translog generation. final IndexCommit indexCommit = commits.get(commits.size() - 1);
long minRequiredGen = Long.MAX_VALUE; assert indexCommit.isDeleted() == false : "last commit is deleted";
for (IndexCommit indexCommit : commits) { long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
if (indexCommit.isDeleted() == false) { translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
long translogGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
minRequiredGen = Math.min(translogGen, minRequiredGen);
}
}
assert minRequiredGen != Long.MAX_VALUE : "All commits are deleted";
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
} }
public SnapshotDeletionPolicy getIndexDeletionPolicy() { public SnapshotDeletionPolicy getIndexDeletionPolicy() {

View File

@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
@ -143,27 +142,12 @@ public abstract class Engine implements Closeable {
return a.ramBytesUsed(); return a.ramBytesUsed();
} }
/**
* Tries to extract a segment reader from the given index reader.
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
*/
protected static SegmentReader segmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof FilterLeafReader) {
final FilterLeafReader fReader = (FilterLeafReader) reader;
return segmentReader(FilterLeafReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}
/** /**
* Returns whether a leaf reader comes from a merge (versus flush or addIndexes). * Returns whether a leaf reader comes from a merge (versus flush or addIndexes).
*/ */
protected static boolean isMergedSegment(LeafReader reader) { protected static boolean isMergedSegment(LeafReader reader) {
// We expect leaves to be segment readers // We expect leaves to be segment readers
final Map<String, String> diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics(); final Map<String, String> diagnostics = Lucene.segmentReader(reader).getSegmentInfo().info.getDiagnostics();
final String source = diagnostics.get(IndexWriter.SOURCE); final String source = diagnostics.get(IndexWriter.SOURCE);
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH, assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH,
IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source; IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
@ -611,7 +595,7 @@ public abstract class Engine implements Closeable {
try (Searcher searcher = acquireSearcher("segments_stats")) { try (Searcher searcher = acquireSearcher("segments_stats")) {
SegmentsStats stats = new SegmentsStats(); SegmentsStats stats = new SegmentsStats();
for (LeafReaderContext reader : searcher.reader().leaves()) { for (LeafReaderContext reader : searcher.reader().leaves()) {
final SegmentReader segmentReader = segmentReader(reader.reader()); final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
stats.add(1, segmentReader.ramBytesUsed()); stats.add(1, segmentReader.ramBytesUsed());
stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader())); stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader()));
stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader())); stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader()));
@ -718,7 +702,7 @@ public abstract class Engine implements Closeable {
// first, go over and compute the search ones... // first, go over and compute the search ones...
try (Searcher searcher = acquireSearcher("segments")){ try (Searcher searcher = acquireSearcher("segments")){
for (LeafReaderContext reader : searcher.reader().leaves()) { for (LeafReaderContext reader : searcher.reader().leaves()) {
final SegmentReader segmentReader = segmentReader(reader.reader()); final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo(); SegmentCommitInfo info = segmentReader.getSegmentInfo();
assert !segments.containsKey(info.info.name); assert !segments.containsKey(info.info.name);
Segment segment = new Segment(info.info.name); Segment segment = new Segment(info.info.name);

View File

@ -21,10 +21,14 @@ package org.elasticsearch.index.shard;
import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ReferenceManager;
@ -58,7 +62,6 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -233,6 +236,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/ */
private final RefreshListeners refreshListeners; private final RefreshListeners refreshListeners;
private final AtomicLong lastSearcherAccess = new AtomicLong();
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
public IndexShard( public IndexShard(
ShardRouting shardRouting, ShardRouting shardRouting,
IndexSettings indexSettings, IndexSettings indexSettings,
@ -297,6 +303,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
searcherWrapper = indexSearcherWrapper; searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners(); refreshListeners = buildRefreshListeners();
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
persistMetadata(path, indexSettings, shardRouting, null, logger); persistMetadata(path, indexSettings, shardRouting, null, logger);
} }
@ -855,15 +862,30 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
public DocsStats docStats() { public DocsStats docStats() {
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
// safe here.
long numDocs = 0; long numDocs = 0;
long numDeletedDocs = 0; long numDeletedDocs = 0;
long sizeInBytes = 0; long sizeInBytes = 0;
List<Segment> segments = segments(false); try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
for (Segment segment : segments) { // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause
if (segment.search) { // the next scheduled refresh to go through and refresh the stats as well
numDocs += segment.getNumDocs(); markSearcherAccessed();
numDeletedDocs += segment.getDeletedDocs(); for (LeafReaderContext reader : searcher.reader().leaves()) {
sizeInBytes += segment.getSizeInBytes(); // we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
numDocs += reader.reader().numDocs();
numDeletedDocs += reader.reader().numDeletedDocs();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {
logger.trace((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
} }
} }
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
@ -948,6 +970,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public CompletionStats completionStats(String... fields) { public CompletionStats completionStats(String... fields) {
CompletionStats completionStats = new CompletionStats(); CompletionStats completionStats = new CompletionStats();
try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) { try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) {
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause
// the next scheduled refresh to go through and refresh the stats as well
markSearcherAccessed();
completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields)); completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields));
} }
return completionStats; return completionStats;
@ -1117,6 +1142,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); return acquireSearcher(source, Engine.SearcherScope.EXTERNAL);
} }
private void markSearcherAccessed() {
lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
}
private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
readAllowed(); readAllowed();
final Engine engine = getEngine(); final Engine engine = getEngine();
@ -2418,14 +2447,74 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
/** /**
* Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher *or* there are pending * Executes a scheduled refresh if necessary.
* refresh listeners.
* Otherwise <code>false</code>.
* *
* @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed * @return <code>true</code> iff the engine got refreshed otherwise <code>false</code>
*/ */
public boolean isRefreshNeeded() { public boolean scheduledRefresh() {
return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded()); boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle() && indexSettings.isExplicitRefresh() == false) {
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
// cause the next schedule to refresh.
setRefreshPending();
return false;
} else {
refresh("schedule");
return true;
}
}
return false;
}
/**
* Returns true if this shards is search idle
*/
final boolean isSearchIdle() {
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
}
/**
* Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds.
*/
final long getLastSearcherAccess() {
return lastSearcherAccess.get();
}
private void setRefreshPending() {
Engine engine = getEngine();
Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation();
Translog.Location location;
do {
location = this.pendingRefreshLocation.get();
if (location != null && lastWriteLocation.compareTo(location) <= 0) {
break;
}
} while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false);
}
/**
* Registers the given listener and invokes it once the shard is active again and all
* pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be
* invoked immediately.
* @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with
* <code>true</code> if the listener was registered to wait for a refresh.
*/
public final void awaitShardSearchActive(Consumer<Boolean> listener) {
if (isSearchIdle()) {
markSearcherAccessed(); // move the shard into non-search idle
}
final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
addRefreshListener(location, (b) -> {
pendingRefreshLocation.compareAndSet(location, null);
listener.accept(true);
});
} else {
listener.accept(false);
}
} }
/** /**

View File

@ -19,6 +19,12 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongObjectHashMap;
import com.carrotsearch.hppc.LongSet;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -30,32 +36,44 @@ final class MultiSnapshot implements Translog.Snapshot {
private final TranslogSnapshot[] translogs; private final TranslogSnapshot[] translogs;
private final int totalOperations; private final int totalOperations;
private int overriddenOperations;
private final Closeable onClose; private final Closeable onClose;
private int index; private int index;
private final SeqNoSet seenSeqNo;
/** /**
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
*/ */
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
this.translogs = translogs; this.translogs = translogs;
totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
this.overriddenOperations = 0;
this.onClose = onClose; this.onClose = onClose;
index = 0; this.seenSeqNo = new SeqNoSet();
this.index = translogs.length - 1;
} }
@Override @Override
public int totalOperations() { public int totalOperations() {
return totalOperations; return totalOperations;
} }
@Override
public int overriddenOperations() {
return overriddenOperations;
}
@Override @Override
public Translog.Operation next() throws IOException { public Translog.Operation next() throws IOException {
for (; index < translogs.length; index++) { for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index]; final TranslogSnapshot current = translogs[index];
Translog.Operation op = current.next(); Translog.Operation op;
if (op != null) { // if we are null we move to the next snapshot while ((op = current.next()) != null) {
return op; if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
}
} }
} }
return null; return null;
@ -65,4 +83,76 @@ final class MultiSnapshot implements Translog.Snapshot {
public void close() throws IOException { public void close() throws IOException {
onClose.close(); onClose.close();
} }
/**
* A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1).
*/
private static final class CountedBitSet {
private short onBits;
private final FixedBitSet bitset;
CountedBitSet(short numBits) {
assert numBits > 0;
this.onBits = 0;
this.bitset = new FixedBitSet(numBits);
}
boolean getAndSet(int index) {
assert index >= 0;
boolean wasOn = bitset.getAndSet(index);
if (wasOn == false) {
onBits++;
}
return wasOn;
}
boolean hasAllBitsOn() {
return onBits == bitset.length();
}
}
/**
* Sequence numbers from translog are likely to form contiguous ranges,
* thus collapsing a completed bitset into a single entry will reduce memory usage.
*/
static final class SeqNoSet {
static final short BIT_SET_SIZE = 1024;
private final LongSet completedSets = new LongHashSet();
private final LongObjectHashMap<CountedBitSet> ongoingSets = new LongObjectHashMap<>();
/**
* Marks this sequence number and returns <tt>true</tt> if it is seen before.
*/
boolean getAndSet(long value) {
assert value >= 0;
final long key = value / BIT_SET_SIZE;
if (completedSets.contains(key)) {
return true;
}
CountedBitSet bitset = ongoingSets.get(key);
if (bitset == null) {
bitset = new CountedBitSet(BIT_SET_SIZE);
ongoingSets.put(key, bitset);
}
final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE));
if (bitset.hasAllBitsOn()) {
ongoingSets.remove(key);
completedSets.add(key);
}
return wasOn;
}
// For testing
long completeSetsSize() {
return completedSets.size();
}
// For testing
long ongoingSetsSize() {
return ongoingSets.size();
}
}
} }

View File

@ -845,10 +845,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public interface Snapshot extends Closeable { public interface Snapshot extends Closeable {
/** /**
* The total number of operations in the translog. * The total estimated number of operations in the snapshot.
*/ */
int totalOperations(); int totalOperations();
/**
* The number of operations have been overridden (eg. superseded) in the snapshot so far.
* If two operations have the same sequence number, the operation with a lower term will be overridden by the operation
* with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called.
*/
default int overriddenOperations() {
return 0;
}
/** /**
* Returns the next operation in the snapshot or <code>null</code> if we reached the end. * Returns the next operation in the snapshot or <code>null</code> if we reached the end.
*/ */

View File

@ -19,13 +19,14 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import org.elasticsearch.cli.LoggingAwareMultiCommand;
import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.MultiCommand;
import org.elasticsearch.cli.Terminal; import org.elasticsearch.cli.Terminal;
/** /**
* Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool
*/ */
public class TranslogToolCli extends MultiCommand { public class TranslogToolCli extends LoggingAwareMultiCommand {
private TranslogToolCli() { private TranslogToolCli() {
super("A CLI tool for various Elasticsearch translog actions"); super("A CLI tool for various Elasticsearch translog actions");

View File

@ -66,6 +66,7 @@ import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -567,8 +568,9 @@ public class RecoverySourceHandler {
cancellableThreads.executeIO(sendBatch); cancellableThreads.executeIO(sendBatch);
} }
assert expectedTotalOps == skippedOps + totalSentOps assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);

View File

@ -582,6 +582,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
throws IOException { throws IOException {
return createSearchContext(request, timeout, true); return createSearchContext(request, timeout, true);
} }
private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout,
boolean assertAsyncActions) boolean assertAsyncActions)
throws IOException { throws IOException {
@ -979,22 +980,31 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
* The action listener is guaranteed to be executed on the search thread-pool * The action listener is guaranteed to be executed on the search thread-pool
*/ */
private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) { private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
listener.onResponse(request);
}
}), listener::onFailure);
IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId());
if (shardOrNull != null) {
// now we need to check if there is a pending refresh and register
ActionListener<Rewriteable> finalListener = actionListener;
actionListener = ActionListener.wrap(r ->
shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure);
}
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
// adding a lot of overhead // adding a lot of overhead
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener);
ActionListener.wrap(r ->
threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
listener.onResponse(request);
}
}), listener::onFailure));
} }
/** /**
@ -1003,4 +1013,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return indicesService.getRewriteContext(nowInMillis); return indicesService.getRewriteContext(nowInMillis);
} }
public IndicesService getIndicesService() {
return indicesService;
}
} }

View File

@ -20,21 +20,16 @@
package org.elasticsearch.search.fetch.subphase; package org.elasticsearch.search.fetch.subphase;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup; import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.contentBuilder;
public final class FetchSourceSubPhase implements FetchSubPhase { public final class FetchSourceSubPhase implements FetchSubPhase {
@Override @Override
@ -65,7 +60,17 @@ public final class FetchSourceSubPhase implements FetchSubPhase {
final int initialCapacity = nestedHit ? 1024 : Math.min(1024, source.internalSourceRef().length()); final int initialCapacity = nestedHit ? 1024 : Math.min(1024, source.internalSourceRef().length());
BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity); BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity);
XContentBuilder builder = new XContentBuilder(source.sourceContentType().xContent(), streamOutput); XContentBuilder builder = new XContentBuilder(source.sourceContentType().xContent(), streamOutput);
builder.value(value); if (value != null) {
builder.value(value);
} else {
// This happens if the source filtering could not find the specified in the _source.
// Just doing `builder.value(null)` is valid, but the xcontent validation can't detect what format
// it is. In certain cases, for example response serialization we fail if no xcontent type can't be
// detected. So instead we just return an empty top level object. Also this is in inline with what was
// being return in this situation in 5.x and earlier.
builder.startObject();
builder.endObject();
}
hitContext.hit().sourceRef(builder.bytes()); hitContext.hit().sourceRef(builder.bytes());
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("Error filtering source", e); throw new ElasticsearchException("Error filtering source", e);

View File

@ -28,7 +28,7 @@ public class CommandTests extends ESTestCase {
static class UserErrorCommand extends Command { static class UserErrorCommand extends Command {
UserErrorCommand() { UserErrorCommand() {
super("Throws a user error"); super("Throws a user error", () -> {});
} }
@Override @Override
@ -46,7 +46,7 @@ public class CommandTests extends ESTestCase {
static class UsageErrorCommand extends Command { static class UsageErrorCommand extends Command {
UsageErrorCommand() { UsageErrorCommand() {
super("Throws a usage error"); super("Throws a usage error", () -> {});
} }
@Override @Override
@ -66,7 +66,7 @@ public class CommandTests extends ESTestCase {
boolean executed = false; boolean executed = false;
NoopCommand() { NoopCommand() {
super("Does nothing"); super("Does nothing", () -> {});
} }
@Override @Override

View File

@ -26,13 +26,13 @@ public class MultiCommandTests extends CommandTestCase {
static class DummyMultiCommand extends MultiCommand { static class DummyMultiCommand extends MultiCommand {
DummyMultiCommand() { DummyMultiCommand() {
super("A dummy multi command"); super("A dummy multi command", () -> {});
} }
} }
static class DummySubCommand extends Command { static class DummySubCommand extends Command {
DummySubCommand() { DummySubCommand() {
super("A dummy subcommand"); super("A dummy subcommand", () -> {});
} }
@Override @Override
protected void execute(Terminal terminal, OptionSet options) throws Exception { protected void execute(Terminal terminal, OptionSet options) throws Exception {

View File

@ -60,23 +60,20 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
List<IndexCommit> commitList = new ArrayList<>(); List<IndexCommit> commitList = new ArrayList<>();
long count = randomIntBetween(10, 20); long count = randomIntBetween(10, 20);
long minGen = Long.MAX_VALUE; long lastGen = 0;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
long lastGen = randomIntBetween(10, 20000); lastGen += randomIntBetween(10, 20000);
minGen = Math.min(minGen, lastGen);
commitList.add(mockIndexCommitWithTranslogGen(lastGen)); commitList.add(mockIndexCommitWithTranslogGen(lastGen));
} }
combinedDeletionPolicy.onInit(commitList); combinedDeletionPolicy.onInit(commitList);
verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(minGen); verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen);
commitList.clear(); commitList.clear();
minGen = Long.MAX_VALUE;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
long lastGen = randomIntBetween(10, 20000); lastGen += randomIntBetween(10, 20000);
minGen = Math.min(minGen, lastGen);
commitList.add(mockIndexCommitWithTranslogGen(lastGen)); commitList.add(mockIndexCommitWithTranslogGen(lastGen));
} }
combinedDeletionPolicy.onCommit(commitList); combinedDeletionPolicy.onCommit(commitList);
verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(minGen); verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen);
} }
IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException { IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException {

View File

@ -46,16 +46,23 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase { public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
@ -299,6 +306,68 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
} }
} }
public void testSeqNoCollision() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
int initDocs = shards.indexDocs(randomInt(10));
List<IndexShard> replicas = shards.getReplicas();
IndexShard replica1 = replicas.get(0);
IndexShard replica2 = replicas.get(1);
shards.syncGlobalCheckpoint();
logger.info("--> Isolate replica1");
IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
indexOnReplica(replicationRequest, replica2);
final Translog.Operation op1;
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
for (int i = 0; i < initDocs; i++) {
Translog.Operation op = snapshot.next();
assertThat(op, is(notNullValue()));
initOperations.add(op);
}
op1 = snapshot.next();
assertThat(op1, notNullValue());
assertThat(snapshot.next(), nullValue());
assertThat(snapshot.overriddenOperations(), equalTo(0));
}
// Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1).
logger.info("--> Promote replica1 as the primary");
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
final Translog.Operation op2;
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 2));
op2 = snapshot.next();
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
assertThat(snapshot.overriddenOperations(), equalTo(1));
}
// Make sure that peer-recovery transfers all but non-overridden operations.
IndexShard replica3 = shards.addReplica();
logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2);
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2);
try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0));
}
// TODO: We should assert the content of shards in the ReplicationGroup.
// Without rollback replicas(current implementation), we don't have the same content across shards:
// - replica1 has {doc1}
// - replica2 has {doc1, doc2}
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
}
}
/** Throws <code>documentFailure</code> on every indexing operation */ /** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory { static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage; final String documentFailureMessage;

View File

@ -18,14 +18,14 @@
*/ */
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
@ -41,11 +41,11 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -56,11 +56,6 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -82,8 +77,10 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -97,6 +94,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class IndexShardIT extends ESSingleNodeTestCase { public class IndexShardIT extends ESSingleNodeTestCase {
@ -106,21 +104,6 @@ public class IndexShardIT extends ESSingleNodeTestCase {
return pluginList(InternalSettingsPlugin.class); return pluginList(InternalSettingsPlugin.class);
} }
private ParsedDocument testParsedDocument(String id, String type, String routing, long seqNo,
ParseContext.Document document, BytesReference source, XContentType xContentType,
Mapping mappingUpdate) {
Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
return new ParsedDocument(versionField, seqID, id, type, routing,
Collections.singletonList(document), source, xContentType, mappingUpdate);
}
public void testLockTryingToDelete() throws Exception { public void testLockTryingToDelete() throws Exception {
createIndex("test"); createIndex("test");
ensureGreen(); ensureGreen();
@ -550,4 +533,96 @@ public class IndexShardIT extends ESSingleNodeTestCase {
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE); RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE);
return shardRouting; return shardRouting;
} }
public void testAutomaticRefresh() throws InterruptedException {
TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
Settings.Builder builder = Settings.builder();
if (randomTimeValue != null) {
builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue);
}
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE);
CountDownLatch started = new CountDownLatch(1);
Thread t = new Thread(() -> {
SearchResponse searchResponse;
started.countDown();
do {
searchResponse = client().prepareSearch().get();
} while (searchResponse.getHits().totalHits != totalNumDocs.get());
});
t.start();
started.await();
assertNoSearchHits(client().prepareSearch().get());
int numDocs = scaledRandomIntBetween(25, 100);
totalNumDocs.set(numDocs);
CountDownLatch indexingDone = new CountDownLatch(numDocs);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
indexingDone.countDown(); // one doc is indexed above blocking
IndexShard shard = indexService.getShard(0);
boolean hasRefreshed = shard.scheduledRefresh();
if (randomTimeValue == TimeValue.ZERO) {
// with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background
assertFalse(hasRefreshed);
assertTrue(shard.isSearchIdle());
} else if (randomTimeValue == null){
// with null we are guaranteed to see the doc since do execute the refresh.
// we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently
assertFalse(shard.isSearchIdle());
}
assertHitCount(client().prepareSearch().get(), 1);
for (int i = 1; i < numDocs; i++) {
client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
indexingDone.countDown();
}
@Override
public void onFailure(Exception e) {
indexingDone.countDown();
throw new AssertionError(e);
}
});
}
indexingDone.await();
t.join();
}
public void testPendingRefreshWithIntervalChange() throws InterruptedException {
Settings.Builder builder = Settings.builder();
builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO);
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
assertNoSearchHits(client().prepareSearch().get());
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
IndexShard shard = indexService.getShard(0);
assertFalse(shard.scheduledRefresh());
assertTrue(shard.isSearchIdle());
CountDownLatch refreshLatch = new CountDownLatch(1);
client().admin().indices().prepareRefresh()
.execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently
assertHitCount(client().prepareSearch().get(), 1);
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertFalse(shard.scheduledRefresh());
// now disable background refresh and make sure the refresh happens
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
client().admin().indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build())
.execute(ActionListener.wrap(updateSettingsLatch::countDown));
assertHitCount(client().prepareSearch().get(), 2);
// wait for both to ensure we don't have in-flight operations
updateSettingsLatch.await();
refreshLatch.await();
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertTrue(shard.scheduledRefresh());
assertTrue(shard.isSearchIdle());
assertHitCount(client().prepareSearch().get(), 3);
}
} }

View File

@ -62,7 +62,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -70,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
@ -2271,11 +2274,17 @@ public class IndexShardTests extends IndexShardTestCase {
final String id = Integer.toString(i); final String id = Integer.toString(i);
indexDoc(indexShard, "test", id); indexDoc(indexShard, "test", id);
} }
if (randomBoolean()) {
indexShard.refresh("test"); indexShard.refresh("test");
} else {
indexShard.flush(new FlushRequest());
}
{ {
final DocsStats docsStats = indexShard.docStats(); final DocsStats docsStats = indexShard.docStats();
assertThat(docsStats.getCount(), equalTo(numDocs)); assertThat(docsStats.getCount(), equalTo(numDocs));
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
}
assertThat(docsStats.getDeleted(), equalTo(0L)); assertThat(docsStats.getDeleted(), equalTo(0L));
assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L)); assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L));
} }
@ -2295,9 +2304,14 @@ public class IndexShardTests extends IndexShardTestCase {
flushRequest.waitIfOngoing(false); flushRequest.waitIfOngoing(false);
indexShard.flush(flushRequest); indexShard.flush(flushRequest);
indexShard.refresh("test"); if (randomBoolean()) {
indexShard.refresh("test");
}
{ {
final DocsStats docStats = indexShard.docStats(); final DocsStats docStats = indexShard.docStats();
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
assertTrue(searcher.reader().numDocs() <= docStats.getCount());
}
assertThat(docStats.getCount(), equalTo(numDocs)); assertThat(docStats.getCount(), equalTo(numDocs));
// Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs // Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete)); assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete));
@ -2309,7 +2323,11 @@ public class IndexShardTests extends IndexShardTestCase {
forceMergeRequest.maxNumSegments(1); forceMergeRequest.maxNumSegments(1);
indexShard.forceMerge(forceMergeRequest); indexShard.forceMerge(forceMergeRequest);
indexShard.refresh("test"); if (randomBoolean()) {
indexShard.refresh("test");
} else {
indexShard.flush(new FlushRequest());
}
{ {
final DocsStats docStats = indexShard.docStats(); final DocsStats docStats = indexShard.docStats();
assertThat(docStats.getCount(), equalTo(numDocs)); assertThat(docStats.getCount(), equalTo(numDocs));
@ -2340,8 +2358,11 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat("Without flushing, segment sizes should be zero", assertThat("Without flushing, segment sizes should be zero",
indexShard.docStats().getTotalSizeInBytes(), equalTo(0L)); indexShard.docStats().getTotalSizeInBytes(), equalTo(0L));
indexShard.flush(new FlushRequest()); if (randomBoolean()) {
indexShard.refresh("test"); indexShard.flush(new FlushRequest());
} else {
indexShard.refresh("test");
}
{ {
final DocsStats docsStats = indexShard.docStats(); final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats(); final StoreStats storeStats = indexShard.storeStats();
@ -2361,9 +2382,11 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}"); indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}");
} }
} }
if (randomBoolean()) {
indexShard.flush(new FlushRequest()); indexShard.flush(new FlushRequest());
indexShard.refresh("test"); } else {
indexShard.refresh("test");
}
{ {
final DocsStats docsStats = indexShard.docStats(); final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats(); final StoreStats storeStats = indexShard.storeStats();
@ -2567,4 +2590,137 @@ public class IndexShardTests extends IndexShardTestCase {
public void verify(String verificationToken, DiscoveryNode localNode) { public void verify(String verificationToken, DiscoveryNode localNode) {
} }
} }
public void testIsSearchIdle() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
assertTrue(primary.scheduledRefresh());
assertFalse(primary.isSearchIdle());
IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings();
settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
scopedSettings.applySettings(settings);
assertTrue(primary.isSearchIdle());
settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMinutes(1))
.build();
scopedSettings.applySettings(settings);
assertFalse(primary.isSearchIdle());
settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(10))
.build();
scopedSettings.applySettings(settings);
assertBusy(() -> assertFalse(primary.isSearchIdle()));
do {
// now loop until we are fast enough... shouldn't take long
primary.acquireSearcher("test").close();
} while (primary.isSearchIdle());
closeShards(primary);
}
public void testScheduledRefresh() throws IOException, InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
assertTrue(primary.scheduledRefresh());
IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings();
settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
scopedSettings.applySettings(settings);
assertFalse(primary.getEngine().refreshNeeded());
indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
long lastSearchAccess = primary.getLastSearcherAccess();
assertFalse(primary.scheduledRefresh());
assertEquals(lastSearchAccess, primary.getLastSearcherAccess());
// wait until the thread-pool has moved the timestamp otherwise we can't assert on this below
awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess);
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
primary.awaitShardSearchActive(refreshed -> {
assertTrue(refreshed);
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(2, searcher.reader().numDocs());
} finally {
latch.countDown();
}
});
}
assertNotEquals("awaitShardSearchActive must access a searcher to remove search idle state", lastSearchAccess,
primary.getLastSearcherAccess());
assertTrue(lastSearchAccess < primary.getLastSearcherAccess());
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(1, searcher.reader().numDocs());
}
assertTrue(primary.getEngine().refreshNeeded());
assertTrue(primary.scheduledRefresh());
latch.await();
CountDownLatch latch1 = new CountDownLatch(1);
primary.awaitShardSearchActive(refreshed -> {
assertFalse(refreshed);
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(2, searcher.reader().numDocs());
} finally {
latch1.countDown();
}
});
latch1.await();
closeShards(primary);
}
public void testRefreshIsNeededWithRefreshListeners() throws IOException, InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
assertTrue(primary.scheduledRefresh());
Engine.IndexResult doc = indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}");
CountDownLatch latch = new CountDownLatch(1);
primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown());
assertEquals(1, latch.getCount());
assertTrue(primary.getEngine().refreshNeeded());
assertTrue(primary.scheduledRefresh());
latch.await();
IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings();
settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
scopedSettings.applySettings(settings);
doc = indexDoc(primary, "test", "2", "{\"foo\" : \"bar\"}");
CountDownLatch latch1 = new CountDownLatch(1);
primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown());
assertEquals(1, latch1.getCount());
assertTrue(primary.getEngine().refreshNeeded());
assertTrue(primary.scheduledRefresh());
latch1.await();
closeShards(primary);
}
} }

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.test.ESTestCase;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class MultiSnapshotTests extends ESTestCase {
public void testTrackSeqNoSimpleRange() throws Exception {
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
final List<Long> values = LongStream.range(0, 1024).boxed().collect(Collectors.toList());
Randomness.shuffle(values);
for (int i = 0; i < 1023; i++) {
assertThat(bitSet.getAndSet(values.get(i)), equalTo(false));
assertThat(bitSet.ongoingSetsSize(), equalTo(1L));
assertThat(bitSet.completeSetsSize(), equalTo(0L));
}
assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false));
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
assertThat(bitSet.completeSetsSize(), equalTo(1L));
assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true));
assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false));
}
public void testTrackSeqNoDenseRanges() throws Exception {
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
final LongSet normalSet = new LongHashSet();
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
long seq = between(0, 5000);
boolean existed = normalSet.add(seq) == false;
assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed));
assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L));
});
}
public void testTrackSeqNoSparseRanges() throws Exception {
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
final LongSet normalSet = new LongHashSet();
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
long seq = between(i * 10_000, i * 30_000);
boolean existed = normalSet.add(seq) == false;
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
});
}
public void testTrackSeqNoMimicTranslogRanges() throws Exception {
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
final LongSet normalSet = new LongHashSet();
long currentSeq = between(10_000_000, 1_000_000_000);
final int iterations = scaledRandomIntBetween(100, 2000);
assertThat(bitSet.completeSetsSize(), equalTo(0L));
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
long totalDocs = 0;
for (long i = 0; i < iterations; i++) {
int batchSize = between(1, 1500);
totalDocs += batchSize;
currentSeq -= batchSize;
List<Long> batch = LongStream.range(currentSeq, currentSeq + batchSize)
.boxed()
.collect(Collectors.toList());
Randomness.shuffle(batch);
batch.forEach(seq -> {
boolean existed = normalSet.add(seq) == false;
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L));
});
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
}
assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024));
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
}
}

View File

@ -26,6 +26,9 @@ import org.hamcrest.TypeSafeMatcher;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
public final class SnapshotMatchers { public final class SnapshotMatchers {
@ -50,10 +53,14 @@ public final class SnapshotMatchers {
/** /**
* Consumes a snapshot and make sure it's content is as expected * Consumes a snapshot and make sure it's content is as expected
*/ */
public static Matcher<Translog.Snapshot> equalsTo(ArrayList<Translog.Operation> ops) { public static Matcher<Translog.Snapshot> equalsTo(List<Translog.Operation> ops) {
return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()])); return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()]));
} }
public static Matcher<Translog.Snapshot> containsOperationsInAnyOrder(Collection<Translog.Operation> expectedOperations) {
return new ContainingInAnyOrderMatcher(expectedOperations);
}
public static class SizeMatcher extends TypeSafeMatcher<Translog.Snapshot> { public static class SizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
private final int size; private final int size;
@ -127,5 +134,60 @@ public final class SnapshotMatchers {
} }
} }
public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher<Translog.Snapshot> {
private final Collection<Translog.Operation> expectedOps;
private List<Translog.Operation> notFoundOps;
private List<Translog.Operation> notExpectedOps;
static List<Translog.Operation> drainAll(Translog.Snapshot snapshot) throws IOException {
final List<Translog.Operation> actualOps = new ArrayList<>();
Translog.Operation op;
while ((op = snapshot.next()) != null) {
actualOps.add(op);
}
return actualOps;
}
public ContainingInAnyOrderMatcher(Collection<Translog.Operation> expectedOps) {
this.expectedOps = expectedOps;
}
@Override
protected boolean matchesSafely(Translog.Snapshot snapshot) {
try {
List<Translog.Operation> actualOps = drainAll(snapshot);
notFoundOps = expectedOps.stream()
.filter(o -> actualOps.contains(o) == false)
.collect(Collectors.toList());
notExpectedOps = actualOps.stream()
.filter(o -> expectedOps.contains(o) == false)
.collect(Collectors.toList());
return notFoundOps.isEmpty() && notExpectedOps.isEmpty();
} catch (IOException ex) {
throw new ElasticsearchException("failed to read snapshot content", ex);
}
}
@Override
protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) {
if (notFoundOps.isEmpty() == false) {
mismatchDescription
.appendText("not found ").appendValueList("[", ", ", "]", notFoundOps);
}
if (notExpectedOps.isEmpty() == false) {
if (notFoundOps.isEmpty() == false) {
mismatchDescription.appendText("; ");
}
mismatchDescription
.appendText("not expected ").appendValueList("[", ", ", "]", notExpectedOps);
}
}
@Override
public void describeTo(Description description) {
description.appendText("snapshot contains ")
.appendValueList("[", ", ", "]", expectedOps)
.appendText(" in any order.");
}
}
} }

View File

@ -75,7 +75,6 @@ import org.junit.Before;
import java.io.Closeable; import java.io.Closeable;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -84,11 +83,13 @@ import java.nio.file.Files;
import java.nio.file.InvalidPathException; import java.nio.file.InvalidPathException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -107,10 +108,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import java.util.stream.Stream;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
@ -221,7 +222,7 @@ public class TranslogTests extends ESTestCase {
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
} }
private void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException { private void addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
list.add(op); list.add(op);
translog.add(op); translog.add(op);
} }
@ -524,7 +525,7 @@ public class TranslogTests extends ESTestCase {
Translog.Snapshot snapshot2 = translog.newSnapshot(); Translog.Snapshot snapshot2 = translog.newSnapshot();
toClose.add(snapshot2); toClose.add(snapshot2);
markCurrentGenAsCommitted(translog); markCurrentGenAsCommitted(translog);
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot2, containsOperationsInAnyOrder(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size())); assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
} finally { } finally {
IOUtils.closeWhileHandlingException(toClose); IOUtils.closeWhileHandlingException(toClose);
@ -1032,7 +1033,7 @@ public class TranslogTests extends ESTestCase {
} }
assertEquals(max.generation, translog.currentFileGeneration()); assertEquals(max.generation, translog.currentFileGeneration());
try (Translog.Snapshot snap = translog.newSnapshot()) { try (Translog.Snapshot snap = new SortedSnapshot(translog.newSnapshot())) {
Translog.Operation next; Translog.Operation next;
Translog.Operation maxOp = null; Translog.Operation maxOp = null;
while ((next = snap.next()) != null) { while ((next = snap.next()) != null) {
@ -1256,7 +1257,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) { try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp; int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) { for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation next = snapshot.next();
@ -1270,7 +1271,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) { try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp; int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) { for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation next = snapshot.next();
@ -1314,7 +1315,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) { try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp; int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) { for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation next = snapshot.next();
@ -1329,7 +1330,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) { try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp; int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) { for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation next = snapshot.next();
@ -1378,7 +1379,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) { try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp; int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) { for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation next = snapshot.next();
@ -2065,7 +2066,7 @@ public class TranslogTests extends ESTestCase {
} }
public void testRecoverWithUnbackedNextGen() throws IOException { public void testRecoverWithUnbackedNextGen() throws IOException {
translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
translog.close(); translog.close();
TranslogConfig config = translog.getConfig(); TranslogConfig config = translog.getConfig();
@ -2076,21 +2077,25 @@ public class TranslogTests extends ESTestCase {
try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
Translog.Snapshot snapshot = tlog.newSnapshot()) { Translog.Snapshot snapshot = tlog.newSnapshot()) {
assertFalse(tlog.syncNeeded()); assertFalse(tlog.syncNeeded());
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation op = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next); assertNotNull("operation 1 must be non-null", op);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString()));
}
tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8"))));
} }
try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
Translog.Snapshot snapshot = tlog.newSnapshot()) { Translog.Snapshot snapshot = tlog.newSnapshot()) {
assertFalse(tlog.syncNeeded()); assertFalse(tlog.syncNeeded());
for (int i = 0; i < 2; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation secondOp = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next); assertNotNull("operation 2 must be non-null", secondOp);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2);
}
Translog.Operation firstOp = snapshot.next();
assertNotNull("operation 1 must be non-null", firstOp);
assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1);
} }
} }
@ -2493,6 +2498,7 @@ public class TranslogTests extends ESTestCase {
assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos));
readFromSnapshot++; readFromSnapshot++;
} }
readFromSnapshot += snapshot.overriddenOperations();
} }
assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); assertThat(readFromSnapshot, equalTo(expectedSnapshotOps));
final long seqNoLowerBound = seqNo; final long seqNoLowerBound = seqNo;
@ -2570,4 +2576,84 @@ public class TranslogTests extends ESTestCase {
} }
} }
} }
public void testSnapshotReadOperationInReverse() throws Exception {
final Deque<List<Translog.Operation>> views = new ArrayDeque<>();
views.push(new ArrayList<>());
final AtomicLong seqNo = new AtomicLong();
final int generations = randomIntBetween(2, 20);
for (int gen = 0; gen < generations; gen++) {
final int operations = randomIntBetween(1, 100);
for (int i = 0; i < operations; i++) {
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), new byte[]{1});
translog.add(op);
views.peek().add(op);
}
if (frequently()) {
translog.rollGeneration();
views.push(new ArrayList<>());
}
}
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
final List<Translog.Operation> expectedSeqNo = new ArrayList<>();
while (views.isEmpty() == false) {
expectedSeqNo.addAll(views.pop());
}
assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo));
}
}
public void testSnapshotDedupOperations() throws Exception {
final Map<Long, Translog.Operation> latestOperations = new HashMap<>();
final int generations = between(2, 20);
for (int gen = 0; gen < generations; gen++) {
List<Long> batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList());
Randomness.shuffle(batch);
for (Long seqNo : batch) {
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1});
translog.add(op);
latestOperations.put(op.seqNo(), op);
}
translog.rollGeneration();
}
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(snapshot, containsOperationsInAnyOrder(latestOperations.values()));
}
}
static class SortedSnapshot implements Translog.Snapshot {
private final Translog.Snapshot snapshot;
private List<Translog.Operation> operations = null;
SortedSnapshot(Translog.Snapshot snapshot) {
this.snapshot = snapshot;
}
@Override
public int totalOperations() {
return snapshot.totalOperations();
}
@Override
public Translog.Operation next() throws IOException {
if (operations == null) {
operations = new ArrayList<>();
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
}
operations.sort(Comparator.comparing(Translog.Operation::seqNo));
}
if (operations.isEmpty()) {
return null;
}
return operations.remove(0);
}
@Override
public void close() throws IOException {
snapshot.close();
}
}
} }

View File

@ -56,7 +56,6 @@ import java.util.function.Function;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
@ -635,6 +634,18 @@ public class InnerHitsIT extends ESIntegTestCase {
assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(0).getSourceAsMap().size(), equalTo(2)); assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(0).getSourceAsMap().size(), equalTo(2));
assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(1).getSourceAsMap().get("message"), assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(1).getSourceAsMap().get("message"),
equalTo("fox ate rabbit x y z")); equalTo("fox ate rabbit x y z"));
// Source filter on a field that does not exist inside the nested document and just check that we do not fail and
// return an empty _source:
response = client().prepareSearch()
.setQuery(nestedQuery("comments", matchQuery("comments.message", "away"), ScoreMode.None)
.innerHit(new InnerHitBuilder().setFetchSourceContext(new FetchSourceContext(true,
new String[]{"comments.missing_field"}, null))))
.get();
assertNoFailures(response);
assertHitCount(response, 1);
assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getTotalHits(), equalTo(1L));
assertThat(response.getHits().getAt(0).getInnerHits().get("comments").getAt(0).getSourceAsMap().size(), equalTo(0));
} }
public void testInnerHitsWithIgnoreUnmapped() throws Exception { public void testInnerHitsWithIgnoreUnmapped() throws Exception {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.plugins;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cli.Command; import org.elasticsearch.cli.Command;
import org.elasticsearch.cli.LoggingAwareMultiCommand;
import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.MultiCommand;
import org.elasticsearch.cli.Terminal; import org.elasticsearch.cli.Terminal;
@ -31,7 +32,7 @@ import java.util.Collections;
/** /**
* A cli tool for adding, removing and listing plugins for elasticsearch. * A cli tool for adding, removing and listing plugins for elasticsearch.
*/ */
public class PluginCli extends MultiCommand { public class PluginCli extends LoggingAwareMultiCommand {
private final Collection<Command> commands; private final Collection<Command> commands;

View File

@ -107,11 +107,22 @@ specific index module:
Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all` Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled). for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled).
`index.search.idle.after`::
How long a shard can not receive a search or get request until it's considered
search idle. (default is `30s`)
`index.refresh_interval`:: `index.refresh_interval`::
How often to perform a refresh operation, which makes recent changes to the How often to perform a refresh operation, which makes recent changes to the
index visible to search. Defaults to `1s`. Can be set to `-1` to disable index visible to search. Defaults to `1s`. Can be set to `-1` to disable
refresh. refresh. If this setting is not explicitly set, shards that haven't seen
search traffic for at least `index.search.idle.after` seconds will not receive
background refreshes until they receive a search request. Searches that hit an
idle shard where a refresh is pending will wait for the next background
refresh (within `1s`). This behavior aims to automatically optimize bulk
indexing in the default case when no searches are performed. In order to opt
out of this behavior an explicit value of `1s` should set as the refresh
interval.
`index.max_result_window`:: `index.max_result_window`::

View File

@ -35,10 +35,10 @@ way to reindex old indices is to use the `reindex` API.
include::migrate_7_0/aggregations.asciidoc[] include::migrate_7_0/aggregations.asciidoc[]
include::migrate_7_0/analysis.asciidoc[]
include::migrate_7_0/cluster.asciidoc[] include::migrate_7_0/cluster.asciidoc[]
include::migrate_7_0/indices.asciidoc[] include::migrate_7_0/indices.asciidoc[]
include::migrate_7_0/mappings.asciidoc[] include::migrate_7_0/mappings.asciidoc[]
include::migrate_7_0/search.asciidoc[] include::migrate_7_0/search.asciidoc[]
include::migrate_7_0/plugins.asciidoc[] include::migrate_7_0/plugins.asciidoc[]
include::migrate_7_0/api.asciidoc[] include::migrate_7_0/api.asciidoc[]

View File

@ -44,4 +44,14 @@ Indices created with version `7.0.0` onwards will have an automatic `index.numbe
value set. This might change how documents are distributed across shards depending on how many value set. This might change how documents are distributed across shards depending on how many
shards the index has. In order to maintain the exact same distribution as a pre `7.0.0` index, the shards the index has. In order to maintain the exact same distribution as a pre `7.0.0` index, the
`index.number_of_routing_shards` must be set to the `index.number_of_shards` at index creation time. `index.number_of_routing_shards` must be set to the `index.number_of_shards` at index creation time.
Note: if the number of routing shards equals the number of shards `_split` operations are not supported. Note: if the number of routing shards equals the number of shards `_split` operations are not supported.
==== Skipped background refresh on search idle shards.
Shards belonging to an index that does not have an explicit
`index.refresh_interval` configured will no longer refresh in the background
once the shard becomes "search idle", ie the shard hasn't seen any search
traffic for `index.search.idle.after` seconds (defaults to `30s`). Searches
that access a search idle shard will be "parked" until the next refresh
happens. Indexing requests with `wait_for_refresh` will also trigger
a background refresh.

View File

@ -244,20 +244,9 @@ public class PercolatorFieldMapper extends FieldMapper {
Query percolateQuery(String name, PercolateQuery.QueryStore queryStore, List<BytesReference> documents, Query percolateQuery(String name, PercolateQuery.QueryStore queryStore, List<BytesReference> documents,
IndexSearcher searcher, Version indexVersion) throws IOException { IndexSearcher searcher, Version indexVersion) throws IOException {
IndexReader indexReader = searcher.getIndexReader(); IndexReader indexReader = searcher.getIndexReader();
Tuple<List<Query>, Boolean> t = createCandidateQueryClauses(indexReader); Tuple<BooleanQuery, Boolean> t = createCandidateQuery(indexReader, indexVersion);
BooleanQuery.Builder candidateQuery = new BooleanQuery.Builder(); Query candidateQuery = t.v1();
if (t.v2() && indexVersion.onOrAfter(Version.V_6_1_0)) { boolean canUseMinimumShouldMatchField = t.v2();
LongValuesSource valuesSource = LongValuesSource.fromIntField(minimumShouldMatchField.name());
candidateQuery.add(new CoveringQuery(t.v1(), valuesSource), BooleanClause.Occur.SHOULD);
} else {
for (Query query : t.v1()) {
candidateQuery.add(query, BooleanClause.Occur.SHOULD);
}
}
// include extractionResultField:failed, because docs with this term have no extractedTermsField
// and otherwise we would fail to return these docs. Docs that failed query term extraction
// always need to be verified by MemoryIndex:
candidateQuery.add(new TermQuery(new Term(extractionResultField.name(), EXTRACTION_FAILED)), BooleanClause.Occur.SHOULD);
Query verifiedMatchesQuery; Query verifiedMatchesQuery;
// We can only skip the MemoryIndex verification when percolating a single non nested document. We cannot // We can only skip the MemoryIndex verification when percolating a single non nested document. We cannot
@ -265,15 +254,55 @@ public class PercolatorFieldMapper extends FieldMapper {
// ranges are extracted from IndexReader backed by a RamDirectory holding multiple documents we do // ranges are extracted from IndexReader backed by a RamDirectory holding multiple documents we do
// not know to which document the terms belong too and for certain queries we incorrectly emit candidate // not know to which document the terms belong too and for certain queries we incorrectly emit candidate
// matches as actual match. // matches as actual match.
if (t.v2() && indexReader.maxDoc() == 1) { if (canUseMinimumShouldMatchField && indexReader.maxDoc() == 1) {
verifiedMatchesQuery = new TermQuery(new Term(extractionResultField.name(), EXTRACTION_COMPLETE)); verifiedMatchesQuery = new TermQuery(new Term(extractionResultField.name(), EXTRACTION_COMPLETE));
} else { } else {
verifiedMatchesQuery = new MatchNoDocsQuery("multiple or nested docs or CoveringQuery could not be used"); verifiedMatchesQuery = new MatchNoDocsQuery("multiple or nested docs or CoveringQuery could not be used");
} }
return new PercolateQuery(name, queryStore, documents, candidateQuery.build(), searcher, verifiedMatchesQuery); return new PercolateQuery(name, queryStore, documents, candidateQuery, searcher, verifiedMatchesQuery);
} }
Tuple<List<Query>, Boolean> createCandidateQueryClauses(IndexReader indexReader) throws IOException { Tuple<BooleanQuery, Boolean> createCandidateQuery(IndexReader indexReader, Version indexVersion) throws IOException {
Tuple<List<BytesRef>, Map<String, List<byte[]>>> t = extractTermsAndRanges(indexReader);
List<BytesRef> extractedTerms = t.v1();
Map<String, List<byte[]>> encodedPointValuesByField = t.v2();
// `1 + ` is needed to take into account the EXTRACTION_FAILED should clause
boolean canUseMinimumShouldMatchField = 1 + extractedTerms.size() + encodedPointValuesByField.size() <=
BooleanQuery.getMaxClauseCount();
List<Query> subQueries = new ArrayList<>();
for (Map.Entry<String, List<byte[]>> entry : encodedPointValuesByField.entrySet()) {
String rangeFieldName = entry.getKey();
List<byte[]> encodedPointValues = entry.getValue();
byte[] min = encodedPointValues.get(0);
byte[] max = encodedPointValues.get(1);
Query query = BinaryRange.newIntersectsQuery(rangeField.name(), encodeRange(rangeFieldName, min, max));
subQueries.add(query);
}
BooleanQuery.Builder candidateQuery = new BooleanQuery.Builder();
if (canUseMinimumShouldMatchField && indexVersion.onOrAfter(Version.V_6_1_0)) {
LongValuesSource valuesSource = LongValuesSource.fromIntField(minimumShouldMatchField.name());
for (BytesRef extractedTerm : extractedTerms) {
subQueries.add(new TermQuery(new Term(queryTermsField.name(), extractedTerm)));
}
candidateQuery.add(new CoveringQuery(subQueries, valuesSource), BooleanClause.Occur.SHOULD);
} else {
candidateQuery.add(new TermInSetQuery(queryTermsField.name(), extractedTerms), BooleanClause.Occur.SHOULD);
for (Query subQuery : subQueries) {
candidateQuery.add(subQuery, BooleanClause.Occur.SHOULD);
}
}
// include extractionResultField:failed, because docs with this term have no extractedTermsField
// and otherwise we would fail to return these docs. Docs that failed query term extraction
// always need to be verified by MemoryIndex:
candidateQuery.add(new TermQuery(new Term(extractionResultField.name(), EXTRACTION_FAILED)), BooleanClause.Occur.SHOULD);
return new Tuple<>(candidateQuery.build(), canUseMinimumShouldMatchField);
}
// This was extracted the method above, because otherwise it is difficult to test what terms are included in
// the query in case a CoveringQuery is used (it does not have a getter to retrieve the clauses)
Tuple<List<BytesRef>, Map<String, List<byte[]>>> extractTermsAndRanges(IndexReader indexReader) throws IOException {
List<BytesRef> extractedTerms = new ArrayList<>(); List<BytesRef> extractedTerms = new ArrayList<>();
Map<String, List<byte[]>> encodedPointValuesByField = new HashMap<>(); Map<String, List<byte[]>> encodedPointValuesByField = new HashMap<>();
@ -299,28 +328,7 @@ public class PercolatorFieldMapper extends FieldMapper {
encodedPointValuesByField.put(info.name, encodedPointValues); encodedPointValuesByField.put(info.name, encodedPointValues);
} }
} }
return new Tuple<>(extractedTerms, encodedPointValuesByField);
final boolean canUseMinimumShouldMatchField;
final List<Query> queries = new ArrayList<>();
if (extractedTerms.size() + encodedPointValuesByField.size() <= BooleanQuery.getMaxClauseCount()) {
canUseMinimumShouldMatchField = true;
for (BytesRef extractedTerm : extractedTerms) {
queries.add(new TermQuery(new Term(queryTermsField.name(), extractedTerm)));
}
} else {
canUseMinimumShouldMatchField = false;
queries.add(new TermInSetQuery(queryTermsField.name(), extractedTerms));
}
for (Map.Entry<String, List<byte[]>> entry : encodedPointValuesByField.entrySet()) {
String rangeFieldName = entry.getKey();
List<byte[]> encodedPointValues = entry.getValue();
byte[] min = encodedPointValues.get(0);
byte[] max = encodedPointValues.get(1);
Query query = BinaryRange.newIntersectsQuery(rangeField.name(), encodeRange(rangeFieldName, min, max));
queries.add(query);
}
return new Tuple<>(queries, canUseMinimumShouldMatchField);
} }
} }

View File

@ -45,6 +45,7 @@ import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.ConstantScoreScorer; import org.apache.lucene.search.ConstantScoreScorer;
import org.apache.lucene.search.CoveringQuery;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation; import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FilterScorer; import org.apache.lucene.search.FilterScorer;
@ -505,16 +506,17 @@ public class CandidateQueryTests extends ESSingleNodeTestCase {
} }
try (IndexReader ir = DirectoryReader.open(directory)){ try (IndexReader ir = DirectoryReader.open(directory)){
IndexSearcher percolateSearcher = new IndexSearcher(ir); IndexSearcher percolateSearcher = new IndexSearcher(ir);
Query query = PercolateQuery query = (PercolateQuery)
fieldType.percolateQuery("_name", queryStore, Collections.singletonList(new BytesArray("{}")), percolateSearcher, v); fieldType.percolateQuery("_name", queryStore, Collections.singletonList(new BytesArray("{}")), percolateSearcher, v);
BooleanQuery candidateQuery = (BooleanQuery) query.getCandidateMatchesQuery();
assertThat(candidateQuery.clauses().get(0).getQuery(), instanceOf(CoveringQuery.class));
TopDocs topDocs = shardSearcher.search(query, 10); TopDocs topDocs = shardSearcher.search(query, 10);
assertEquals(2L, topDocs.totalHits); assertEquals(2L, topDocs.totalHits);
assertEquals(2, topDocs.scoreDocs.length); assertEquals(2, topDocs.scoreDocs.length);
assertEquals(0, topDocs.scoreDocs[0].doc); assertEquals(0, topDocs.scoreDocs[0].doc);
assertEquals(2, topDocs.scoreDocs[1].doc); assertEquals(2, topDocs.scoreDocs[1].doc);
query = new ConstantScoreQuery(query); topDocs = shardSearcher.search(new ConstantScoreQuery(query), 10);
topDocs = shardSearcher.search(query, 10);
assertEquals(2L, topDocs.totalHits); assertEquals(2L, topDocs.totalHits);
assertEquals(2, topDocs.scoreDocs.length); assertEquals(2, topDocs.scoreDocs.length);
assertEquals(0, topDocs.scoreDocs[0].doc); assertEquals(0, topDocs.scoreDocs[0].doc);
@ -526,7 +528,7 @@ public class CandidateQueryTests extends ESSingleNodeTestCase {
try (RAMDirectory directory = new RAMDirectory()) { try (RAMDirectory directory = new RAMDirectory()) {
try (IndexWriter iw = new IndexWriter(directory, newIndexWriterConfig())) { try (IndexWriter iw = new IndexWriter(directory, newIndexWriterConfig())) {
Document document = new Document(); Document document = new Document();
for (int i = 0; i < 1025; i++) { for (int i = 0; i < 1024; i++) {
int fieldNumber = 2 + i; int fieldNumber = 2 + i;
document.add(new StringField("field", "value" + fieldNumber, Field.Store.NO)); document.add(new StringField("field", "value" + fieldNumber, Field.Store.NO));
} }

View File

@ -53,6 +53,7 @@ import org.elasticsearch.test.AbstractQueryTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64; import java.util.Base64;
@ -340,7 +341,7 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase<PercolateQ
xContent.map(source); xContent.map(source);
return xContent.bytes(); return xContent.bytes();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(e);
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.index.memory.MemoryIndex; import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.CoveringQuery;
import org.apache.lucene.search.PhraseQuery; import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermInSetQuery; import org.apache.lucene.search.TermInSetQuery;
@ -39,6 +40,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TermRangeQuery; import org.apache.lucene.search.TermRangeQuery;
import org.apache.lucene.search.join.ScoreMode; import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
@ -114,7 +116,6 @@ import static org.elasticsearch.percolator.PercolatorFieldMapper.EXTRACTION_PART
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
public class PercolatorFieldMapperTests extends ESSingleNodeTestCase { public class PercolatorFieldMapperTests extends ESSingleNodeTestCase {
@ -304,7 +305,7 @@ public class PercolatorFieldMapperTests extends ESSingleNodeTestCase {
assertThat(document.getField(fieldType.extractionResultField.name()).stringValue(), equalTo(EXTRACTION_PARTIAL)); assertThat(document.getField(fieldType.extractionResultField.name()).stringValue(), equalTo(EXTRACTION_PARTIAL));
} }
public void testCreateCandidateQuery() throws Exception { public void testExtractTermsAndRanges() throws Exception {
addQueryFieldMappings(); addQueryFieldMappings();
MemoryIndex memoryIndex = new MemoryIndex(false); MemoryIndex memoryIndex = new MemoryIndex(false);
@ -316,60 +317,87 @@ public class PercolatorFieldMapperTests extends ESSingleNodeTestCase {
IndexReader indexReader = memoryIndex.createSearcher().getIndexReader(); IndexReader indexReader = memoryIndex.createSearcher().getIndexReader();
Tuple<List<Query>, Boolean> t = fieldType.createCandidateQueryClauses(indexReader); Tuple<List<BytesRef>, Map<String, List<byte[]>>> t = fieldType.extractTermsAndRanges(indexReader);
assertTrue(t.v2()); assertEquals(1, t.v2().size());
List<Query> clauses = t.v1(); Map<String, List<byte[]>> rangesMap = t.v2();
clauses.sort(Comparator.comparing(Query::toString)); assertEquals(1, rangesMap.size());
assertEquals(15, clauses.size());
assertEquals(fieldType.queryTermsField.name() + ":_field3\u0000me", clauses.get(0).toString()); List<byte[]> range = rangesMap.get("number_field2");
assertEquals(fieldType.queryTermsField.name() + ":_field3\u0000unhide", clauses.get(1).toString()); assertNotNull(range);
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000brown", clauses.get(2).toString()); assertEquals(10, LongPoint.decodeDimension(range.get(0), 0));
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000dog", clauses.get(3).toString()); assertEquals(10, LongPoint.decodeDimension(range.get(1), 0));
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000fox", clauses.get(4).toString());
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000jumps", clauses.get(5).toString()); List<BytesRef> terms = t.v1();
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000lazy", clauses.get(6).toString()); terms.sort(BytesRef::compareTo);
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000over", clauses.get(7).toString()); assertEquals(14, terms.size());
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000quick", clauses.get(8).toString()); assertEquals("_field3\u0000me", terms.get(0).utf8ToString());
assertEquals(fieldType.queryTermsField.name() + ":field1\u0000the", clauses.get(9).toString()); assertEquals("_field3\u0000unhide", terms.get(1).utf8ToString());
assertEquals(fieldType.queryTermsField.name() + ":field2\u0000more", clauses.get(10).toString()); assertEquals("field1\u0000brown", terms.get(2).utf8ToString());
assertEquals(fieldType.queryTermsField.name() + ":field2\u0000some", clauses.get(11).toString()); assertEquals("field1\u0000dog", terms.get(3).utf8ToString());
assertEquals(fieldType.queryTermsField.name() + ":field2\u0000text", clauses.get(12).toString()); assertEquals("field1\u0000fox", terms.get(4).utf8ToString());
assertEquals(fieldType.queryTermsField.name() + ":field4\u0000123", clauses.get(13).toString()); assertEquals("field1\u0000jumps", terms.get(5).utf8ToString());
assertThat(clauses.get(14).toString(), containsString(fieldName + ".range_field:<ranges:")); assertEquals("field1\u0000lazy", terms.get(6).utf8ToString());
assertEquals("field1\u0000over", terms.get(7).utf8ToString());
assertEquals("field1\u0000quick", terms.get(8).utf8ToString());
assertEquals("field1\u0000the", terms.get(9).utf8ToString());
assertEquals("field2\u0000more", terms.get(10).utf8ToString());
assertEquals("field2\u0000some", terms.get(11).utf8ToString());
assertEquals("field2\u0000text", terms.get(12).utf8ToString());
assertEquals("field4\u0000123", terms.get(13).utf8ToString());
} }
public void testCreateCandidateQuery_largeDocument() throws Exception { public void testCreateCandidateQuery() throws Exception {
addQueryFieldMappings(); addQueryFieldMappings();
MemoryIndex memoryIndex = new MemoryIndex(false); MemoryIndex memoryIndex = new MemoryIndex(false);
StringBuilder text = new StringBuilder(); StringBuilder text = new StringBuilder();
for (int i = 0; i < 1023; i++) { for (int i = 0; i < 1022; i++) {
text.append(i).append(' '); text.append(i).append(' ');
} }
memoryIndex.addField("field1", text.toString(), new WhitespaceAnalyzer()); memoryIndex.addField("field1", text.toString(), new WhitespaceAnalyzer());
memoryIndex.addField(new LongPoint("field2", 10L), new WhitespaceAnalyzer()); memoryIndex.addField(new LongPoint("field2", 10L), new WhitespaceAnalyzer());
IndexReader indexReader = memoryIndex.createSearcher().getIndexReader(); IndexReader indexReader = memoryIndex.createSearcher().getIndexReader();
Tuple<List<Query>, Boolean> t = fieldType.createCandidateQueryClauses(indexReader); Tuple<BooleanQuery, Boolean> t = fieldType.createCandidateQuery(indexReader, Version.CURRENT);
assertTrue(t.v2()); assertTrue(t.v2());
List<Query> clauses = t.v1(); assertEquals(2, t.v1().clauses().size());
assertEquals(1024, clauses.size()); assertThat(t.v1().clauses().get(0).getQuery(), instanceOf(CoveringQuery.class));
assertThat(clauses.get(1023).toString(), containsString(fieldName + ".range_field:<ranges:")); assertThat(t.v1().clauses().get(1).getQuery(), instanceOf(TermQuery.class));
// Now push it over the edge, so that it falls back using TermInSetQuery // Now push it over the edge, so that it falls back using TermInSetQuery
memoryIndex.addField("field2", "value", new WhitespaceAnalyzer()); memoryIndex.addField("field2", "value", new WhitespaceAnalyzer());
indexReader = memoryIndex.createSearcher().getIndexReader(); indexReader = memoryIndex.createSearcher().getIndexReader();
t = fieldType.createCandidateQueryClauses(indexReader); t = fieldType.createCandidateQuery(indexReader, Version.CURRENT);
assertFalse(t.v2()); assertFalse(t.v2());
clauses = t.v1(); assertEquals(3, t.v1().clauses().size());
assertEquals(2, clauses.size()); TermInSetQuery terms = (TermInSetQuery) t.v1().clauses().get(0).getQuery();
TermInSetQuery termInSetQuery = (TermInSetQuery) clauses.get(0); assertEquals(1023, terms.getTermData().size());
assertEquals(1024, termInSetQuery.getTermData().size()); assertThat(t.v1().clauses().get(1).getQuery().toString(), containsString(fieldName + ".range_field:<ranges:"));
assertThat(clauses.get(1).toString(), containsString(fieldName + ".range_field:<ranges:")); assertThat(t.v1().clauses().get(2).getQuery().toString(), containsString(fieldName + ".extraction_result:failed"));
} }
public void testCreateCandidateQuery_numberFields() throws Exception { public void testCreateCandidateQuery_oldIndex() throws Exception {
addQueryFieldMappings();
MemoryIndex memoryIndex = new MemoryIndex(false);
memoryIndex.addField("field1", "value1", new WhitespaceAnalyzer());
IndexReader indexReader = memoryIndex.createSearcher().getIndexReader();
Tuple<BooleanQuery, Boolean> t = fieldType.createCandidateQuery(indexReader, Version.CURRENT);
assertTrue(t.v2());
assertEquals(2, t.v1().clauses().size());
assertThat(t.v1().clauses().get(0).getQuery(), instanceOf(CoveringQuery.class));
assertThat(t.v1().clauses().get(1).getQuery(), instanceOf(TermQuery.class));
t = fieldType.createCandidateQuery(indexReader, Version.V_6_0_0);
assertTrue(t.v2());
assertEquals(2, t.v1().clauses().size());
assertThat(t.v1().clauses().get(0).getQuery(), instanceOf(TermInSetQuery.class));
assertThat(t.v1().clauses().get(1).getQuery(), instanceOf(TermQuery.class));
}
public void testExtractTermsAndRanges_numberFields() throws Exception {
addQueryFieldMappings(); addQueryFieldMappings();
MemoryIndex memoryIndex = new MemoryIndex(false); MemoryIndex memoryIndex = new MemoryIndex(false);
@ -385,17 +413,45 @@ public class PercolatorFieldMapperTests extends ESSingleNodeTestCase {
IndexReader indexReader = memoryIndex.createSearcher().getIndexReader(); IndexReader indexReader = memoryIndex.createSearcher().getIndexReader();
Tuple<List<Query>, Boolean> t = fieldType.createCandidateQueryClauses(indexReader); Tuple<List<BytesRef>, Map<String, List<byte[]>>> t = fieldType.extractTermsAndRanges(indexReader);
assertThat(t.v2(), is(true)); assertEquals(0, t.v1().size());
List<Query> clauses = t.v1(); Map<String, List<byte[]>> rangesMap = t.v2();
assertEquals(7, clauses.size()); assertEquals(7, rangesMap.size());
assertThat(clauses.get(0).toString(), containsString(fieldName + ".range_field:<ranges:[["));
assertThat(clauses.get(1).toString(), containsString(fieldName + ".range_field:<ranges:[[")); List<byte[]> range = rangesMap.get("number_field1");
assertThat(clauses.get(2).toString(), containsString(fieldName + ".range_field:<ranges:[[")); assertNotNull(range);
assertThat(clauses.get(3).toString(), containsString(fieldName + ".range_field:<ranges:[[")); assertEquals(10, IntPoint.decodeDimension(range.get(0), 0));
assertThat(clauses.get(4).toString(), containsString(fieldName + ".range_field:<ranges:[[")); assertEquals(10, IntPoint.decodeDimension(range.get(1), 0));
assertThat(clauses.get(5).toString(), containsString(fieldName + ".range_field:<ranges:[["));
assertThat(clauses.get(6).toString(), containsString(fieldName + ".range_field:<ranges:[[")); range = rangesMap.get("number_field2");
assertNotNull(range);
assertEquals(20L, LongPoint.decodeDimension(range.get(0), 0));
assertEquals(20L, LongPoint.decodeDimension(range.get(1), 0));
range = rangesMap.get("number_field3");
assertNotNull(range);
assertEquals(30L, LongPoint.decodeDimension(range.get(0), 0));
assertEquals(30L, LongPoint.decodeDimension(range.get(1), 0));
range = rangesMap.get("number_field4");
assertNotNull(range);
assertEquals(30F, HalfFloatPoint.decodeDimension(range.get(0), 0), 0F);
assertEquals(30F, HalfFloatPoint.decodeDimension(range.get(1), 0), 0F);
range = rangesMap.get("number_field5");
assertNotNull(range);
assertEquals(40F, FloatPoint.decodeDimension(range.get(0), 0), 0F);
assertEquals(40F, FloatPoint.decodeDimension(range.get(1), 0), 0F);
range = rangesMap.get("number_field6");
assertNotNull(range);
assertEquals(50D, DoublePoint.decodeDimension(range.get(0), 0), 0D);
assertEquals(50D, DoublePoint.decodeDimension(range.get(1), 0), 0D);
range = rangesMap.get("number_field7");
assertNotNull(range);
assertEquals(InetAddresses.forString("192.168.1.12"), InetAddressPoint.decode(range.get(0)));
assertEquals(InetAddresses.forString("192.168.1.24"), InetAddressPoint.decode(range.get(1)));
} }
public void testPercolatorFieldMapper() throws Exception { public void testPercolatorFieldMapper() throws Exception {

View File

@ -33,7 +33,7 @@ public class EvilCommandTests extends ESTestCase {
public void testCommandShutdownHook() throws Exception { public void testCommandShutdownHook() throws Exception {
final AtomicBoolean closed = new AtomicBoolean(); final AtomicBoolean closed = new AtomicBoolean();
final boolean shouldThrow = randomBoolean(); final boolean shouldThrow = randomBoolean();
final Command command = new Command("test-command-shutdown-hook") { final Command command = new Command("test-command-shutdown-hook", () -> {}) {
@Override @Override
protected void execute(Terminal terminal, OptionSet options) throws Exception { protected void execute(Terminal terminal, OptionSet options) throws Exception {

View File

@ -3,12 +3,9 @@ setup:
- do: - do:
indices.create: indices.create:
index: test index: test
body: - do:
settings: cluster.health:
index: wait_for_no_initializing_shards: true
# initializing replicas maintain the translog causing the test to fail.
# remove once https://github.com/elastic/elasticsearch/issues/25623 is fixed.
number_of_replicas: 0
--- ---
"Translog retention": "Translog retention":

View File

@ -347,15 +347,15 @@ public class ElasticsearchAssertions {
public static void assertAllSuccessful(BroadcastResponse response) { public static void assertAllSuccessful(BroadcastResponse response) {
assertNoFailures(response); assertNoFailures(response);
assertThat("Expected all shards successful but got successful [" + response.getSuccessfulShards() + "] total [" + response.getTotalShards() + "]", assertThat("Expected all shards successful",
response.getTotalShards(), equalTo(response.getSuccessfulShards())); response.getSuccessfulShards(), equalTo(response.getTotalShards()));
assertVersionSerializable(response); assertVersionSerializable(response);
} }
public static void assertAllSuccessful(SearchResponse response) { public static void assertAllSuccessful(SearchResponse response) {
assertNoFailures(response); assertNoFailures(response);
assertThat("Expected all shards successful but got successful [" + response.getSuccessfulShards() + "] total [" + response.getTotalShards() + "]", assertThat("Expected all shards successful",
response.getTotalShards(), equalTo(response.getSuccessfulShards())); response.getSuccessfulShards(), equalTo(response.getTotalShards()));
assertVersionSerializable(response); assertVersionSerializable(response);
} }