Merge branch 'master' into feature/search-request-refactoring

This commit is contained in:
Colin Goodheart-Smithe 2015-09-29 18:50:52 +02:00
commit 117d8d2606
29 changed files with 790 additions and 498 deletions

View File

@ -43,6 +43,7 @@ import org.elasticsearch.node.internal.InternalSettingsPreparer;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.file.Path;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
@ -82,7 +83,7 @@ final class Bootstrap {
}
/** initialize native resources */
public static void initializeNatives(boolean mlockAll, boolean seccomp, boolean ctrlHandler) {
public static void initializeNatives(Path tmpFile, boolean mlockAll, boolean seccomp, boolean ctrlHandler) {
final ESLogger logger = Loggers.getLogger(Bootstrap.class);
// check if the user is running as root, and bail
@ -96,7 +97,7 @@ final class Bootstrap {
// enable secure computing mode
if (seccomp) {
Natives.trySeccomp();
Natives.trySeccomp(tmpFile);
}
// mlockall if requested
@ -141,7 +142,8 @@ final class Bootstrap {
}
private void setup(boolean addShutdownHook, Settings settings, Environment environment) throws Exception {
initializeNatives(settings.getAsBoolean("bootstrap.mlockall", false),
initializeNatives(environment.tmpFile(),
settings.getAsBoolean("bootstrap.mlockall", false),
settings.getAsBoolean("bootstrap.seccomp", true),
settings.getAsBoolean("bootstrap.ctrlhandler", true));

View File

@ -21,11 +21,14 @@ package org.elasticsearch.bootstrap;
import com.sun.jna.Native;
import com.sun.jna.Pointer;
import org.apache.lucene.util.Constants;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.nio.file.Path;
import static org.elasticsearch.bootstrap.JNAKernel32Library.SizeT;
/**
@ -172,19 +175,17 @@ class JNANatives {
}
}
static void trySeccomp() {
if (Constants.LINUX && "amd64".equals(Constants.OS_ARCH)) {
try {
Seccomp.installFilter();
LOCAL_SECCOMP = true;
} catch (Exception e) {
// this is likely to happen unless the kernel is newish, its a best effort at the moment
// so we log stacktrace at debug for now...
if (logger.isDebugEnabled()) {
logger.debug("unable to install seccomp filter", e);
}
logger.warn("unable to install seccomp filter: " + e.getMessage());
static void trySeccomp(Path tmpFile) {
try {
Seccomp.init(tmpFile);
LOCAL_SECCOMP = true;
} catch (Throwable t) {
// this is likely to happen unless the kernel is newish, its a best effort at the moment
// so we log stacktrace at debug for now...
if (logger.isDebugEnabled()) {
logger.debug("unable to install syscall filter", t);
}
logger.warn("unable to install syscall filter: " + t.getMessage());
}
}
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.bootstrap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import java.nio.file.Path;
/**
* The Natives class is a wrapper class that checks if the classes necessary for calling native methods are available on
* startup. If they are not available, this class will avoid calling code that loads these classes.
@ -89,12 +91,12 @@ final class Natives {
return JNANatives.LOCAL_MLOCKALL;
}
static void trySeccomp() {
static void trySeccomp(Path tmpFile) {
if (!JNA_AVAILABLE) {
logger.warn("cannot install seccomp filters because JNA is not available");
logger.warn("cannot install syscall filters because JNA is not available");
return;
}
JNANatives.trySeccomp();
JNANatives.trySeccomp(tmpFile);
}
static boolean isSeccompInstalled() {

View File

@ -24,46 +24,65 @@ import com.sun.jna.Memory;
import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.Structure;
import com.sun.jna.ptr.PointerByReference;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Installs a limited form of Linux secure computing mode (filter mode).
* This filters system calls to block process execution.
* Installs a limited form of secure computing mode,
* to filters system calls to block process execution.
* <p>
* This is only supported on the amd64 architecture, on Linux kernels 3.5 or above, and requires
* This is only supported on the Linux and Mac OS X operating systems.
* <p>
* On Linux it currently supports on the amd64 architecture, on Linux kernels 3.5 or above, and requires
* {@code CONFIG_SECCOMP} and {@code CONFIG_SECCOMP_FILTER} compiled into the kernel.
* <p>
* Filters are installed using either {@code seccomp(2)} (3.17+) or {@code prctl(2)} (3.5+). {@code seccomp(2)}
* On Linux BPF Filters are installed using either {@code seccomp(2)} (3.17+) or {@code prctl(2)} (3.5+). {@code seccomp(2)}
* is preferred, as it allows filters to be applied to any existing threads in the process, and one motivation
* here is to protect against bugs in the JVM. Otherwise, code will fall back to the {@code prctl(2)} method
* which will at least protect elasticsearch application threads.
* <p>
* The filters will return {@code EACCES} (Access Denied) for the following system calls:
* Linux BPF filters will return {@code EACCES} (Access Denied) for the following system calls:
* <ul>
* <li>{@code execve}</li>
* <li>{@code fork}</li>
* <li>{@code vfork}</li>
* <li>{@code execveat}</li>
* </ul>
* <p>
* On Mac OS X Leopard or above, a custom {@code sandbox(7)} ("Seatbelt") profile is installed that
* denies the following rules:
* <ul>
* <li>{@code process-fork}</li>
* <li>{@code process-exec}</li>
* </ul>
* <p>
* This is not intended as a sandbox. It is another level of security, mostly intended to annoy
* security researchers and make their lives more difficult in achieving "remote execution" exploits.
* @see <a href="http://www.kernel.org/doc/Documentation/prctl/seccomp_filter.txt">
* http://www.kernel.org/doc/Documentation/prctl/seccomp_filter.txt</a>
* @see <a href="https://reverse.put.as/wp-content/uploads/2011/06/The-Apple-Sandbox-BHDC2011-Paper.pdf">
* https://reverse.put.as/wp-content/uploads/2011/06/The-Apple-Sandbox-BHDC2011-Paper.pdf</a>
*/
// only supported on linux/amd64
// not an example of how to write code!!!
final class Seccomp {
private static final ESLogger logger = Loggers.getLogger(Seccomp.class);
/** we use an explicit interface for native methods, for varargs support */
// Linux implementation, based on seccomp(2) or prctl(2) with bpf filtering
/** Access to non-standard Linux libc methods */
static interface LinuxLibrary extends Library {
/**
* maps to prctl(2)
@ -76,17 +95,19 @@ final class Seccomp {
long syscall(long number, Object... args);
};
// null if something goes wrong.
static final LinuxLibrary libc;
// null if unavailable or something goes wrong.
static final LinuxLibrary linux_libc;
static {
LinuxLibrary lib = null;
try {
lib = (LinuxLibrary) Native.loadLibrary("c", LinuxLibrary.class);
} catch (UnsatisfiedLinkError e) {
logger.warn("unable to link C library. native methods (seccomp) will be disabled.", e);
if (Constants.LINUX) {
try {
lib = (LinuxLibrary) Native.loadLibrary("c", LinuxLibrary.class);
} catch (UnsatisfiedLinkError e) {
logger.warn("unable to link C library. native methods (seccomp) will be disabled.", e);
}
}
libc = lib;
linux_libc = lib;
}
/** the preferred method is seccomp(2), since we can apply to all threads of the process */
@ -176,34 +197,35 @@ final class Seccomp {
static final int SECCOMP_DATA_NR_OFFSET = 0x00;
static final int SECCOMP_DATA_ARCH_OFFSET = 0x04;
// currently this range is blocked (inclusive):
// currently these ranges are blocked (inclusive):
// execve is really the only one needed but why let someone fork a 30G heap? (not really what happens)
// ...
// 57: fork
// 58: vfork
// 59: execve
// ...
static final int BLACKLIST_START = 57;
static final int BLACKLIST_END = 59;
// TODO: execveat()? its less of a risk since the jvm does not use it...
// 322: execveat
// ...
static final int NR_SYSCALL_FORK = 57;
static final int NR_SYSCALL_EXECVE = 59;
static final int NR_SYSCALL_EXECVEAT = 322; // since Linux 3.19
/** try to install our filters */
static void installFilter() {
/** try to install our BPF filters via seccomp() or prctl() to block execution */
private static void linuxImpl() {
// first be defensive: we can give nice errors this way, at the very least.
// also, some of these security features get backported to old versions, checking kernel version here is a big no-no!
boolean supported = Constants.LINUX && "amd64".equals(Constants.OS_ARCH);
if (supported == false) {
throw new IllegalStateException("bug: should not be trying to initialize seccomp for an unsupported architecture");
throw new UnsupportedOperationException("seccomp unavailable: '" + Constants.OS_ARCH + "' architecture unsupported");
}
// we couldn't link methods, could be some really ancient kernel (e.g. < 2.1.57) or some bug
if (libc == null) {
if (linux_libc == null) {
throw new UnsupportedOperationException("seccomp unavailable: could not link methods. requires kernel 3.5+ with CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");
}
// check for kernel version
if (libc.prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0) < 0) {
if (linux_libc.prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0) < 0) {
int errno = Native.getLastError();
switch (errno) {
case ENOSYS: throw new UnsupportedOperationException("seccomp unavailable: requires kernel 3.5+ with CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");
@ -211,7 +233,7 @@ final class Seccomp {
}
}
// check for SECCOMP
if (libc.prctl(PR_GET_SECCOMP, 0, 0, 0, 0) < 0) {
if (linux_libc.prctl(PR_GET_SECCOMP, 0, 0, 0, 0) < 0) {
int errno = Native.getLastError();
switch (errno) {
case EINVAL: throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP not compiled into kernel, CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");
@ -219,7 +241,7 @@ final class Seccomp {
}
}
// check for SECCOMP_MODE_FILTER
if (libc.prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, 0, 0, 0) < 0) {
if (linux_libc.prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, 0, 0, 0) < 0) {
int errno = Native.getLastError();
switch (errno) {
case EFAULT: break; // available
@ -229,19 +251,20 @@ final class Seccomp {
}
// ok, now set PR_SET_NO_NEW_PRIVS, needed to be able to set a seccomp filter as ordinary user
if (libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) < 0) {
if (linux_libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) < 0) {
throw new UnsupportedOperationException("prctl(PR_SET_NO_NEW_PRIVS): " + JNACLibrary.strerror(Native.getLastError()));
}
// BPF installed to check arch, then syscall range. See https://www.kernel.org/doc/Documentation/prctl/seccomp_filter.txt for details.
SockFilter insns[] = {
/* 1 */ BPF_STMT(BPF_LD + BPF_W + BPF_ABS, SECCOMP_DATA_ARCH_OFFSET), // if (arch != amd64) goto fail;
/* 2 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, AUDIT_ARCH_X86_64, 0, 3), //
/* 3 */ BPF_STMT(BPF_LD + BPF_W + BPF_ABS, SECCOMP_DATA_NR_OFFSET), // if (syscall < BLACKLIST_START) goto pass;
/* 4 */ BPF_JUMP(BPF_JMP + BPF_JGE + BPF_K, BLACKLIST_START, 0, 2), //
/* 5 */ BPF_JUMP(BPF_JMP + BPF_JGT + BPF_K, BLACKLIST_END, 1, 0), // if (syscall > BLACKLIST_END) goto pass;
/* 6 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ERRNO | (EACCES & SECCOMP_RET_DATA)), // fail: return EACCES;
/* 7 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ALLOW) // pass: return OK;
/* 1 */ BPF_STMT(BPF_LD + BPF_W + BPF_ABS, SECCOMP_DATA_ARCH_OFFSET), //
/* 2 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, AUDIT_ARCH_X86_64, 0, 4), // if (arch != amd64) goto fail;
/* 3 */ BPF_STMT(BPF_LD + BPF_W + BPF_ABS, SECCOMP_DATA_NR_OFFSET), //
/* 4 */ BPF_JUMP(BPF_JMP + BPF_JGE + BPF_K, NR_SYSCALL_FORK, 0, 3), // if (syscall < SYSCALL_FORK) goto pass;
/* 5 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, NR_SYSCALL_EXECVEAT, 1, 0), // if (syscall == SYSCALL_EXECVEAT) goto fail;
/* 6 */ BPF_JUMP(BPF_JMP + BPF_JGT + BPF_K, NR_SYSCALL_EXECVE, 1, 0), // if (syscall > SYSCALL_EXECVE) goto pass;
/* 7 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ERRNO | (EACCES & SECCOMP_RET_DATA)), // fail: return EACCES;
/* 8 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ALLOW) // pass: return OK;
};
// seccomp takes a long, so we pass it one explicitly to keep the JNA simple
@ -251,12 +274,12 @@ final class Seccomp {
// install filter, if this works, after this there is no going back!
// first try it with seccomp(SECCOMP_SET_MODE_FILTER), falling back to prctl()
if (libc.syscall(SECCOMP_SYSCALL_NR, SECCOMP_SET_MODE_FILTER, SECCOMP_FILTER_FLAG_TSYNC, pointer) != 0) {
if (linux_libc.syscall(SECCOMP_SYSCALL_NR, SECCOMP_SET_MODE_FILTER, SECCOMP_FILTER_FLAG_TSYNC, pointer) != 0) {
int errno1 = Native.getLastError();
if (logger.isDebugEnabled()) {
logger.debug("seccomp(SECCOMP_SET_MODE_FILTER): " + JNACLibrary.strerror(errno1) + ", falling back to prctl(PR_SET_SECCOMP)...");
}
if (libc.prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, pointer, 0, 0) < 0) {
if (linux_libc.prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, pointer, 0, 0) < 0) {
int errno2 = Native.getLastError();
throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER): " + JNACLibrary.strerror(errno1) +
", prctl(PR_SET_SECCOMP): " + JNACLibrary.strerror(errno2));
@ -264,8 +287,99 @@ final class Seccomp {
}
// now check that the filter was really installed, we should be in filter mode.
if (libc.prctl(PR_GET_SECCOMP, 0, 0, 0, 0) != 2) {
if (linux_libc.prctl(PR_GET_SECCOMP, 0, 0, 0, 0) != 2) {
throw new UnsupportedOperationException("seccomp filter installation did not really succeed. seccomp(PR_GET_SECCOMP): " + JNACLibrary.strerror(Native.getLastError()));
}
logger.debug("Linux seccomp filter installation successful");
}
// OS X implementation via sandbox(7)
/** Access to non-standard OS X libc methods */
static interface MacLibrary extends Library {
/**
* maps to sandbox_init(3), since Leopard
*/
int sandbox_init(String profile, long flags, PointerByReference errorbuf);
/**
* releases memory when an error occurs during initialization (e.g. syntax bug)
*/
void sandbox_free_error(Pointer errorbuf);
}
// null if unavailable, or something goes wrong.
static final MacLibrary libc_mac;
static {
MacLibrary lib = null;
if (Constants.MAC_OS_X) {
try {
lib = (MacLibrary) Native.loadLibrary("c", MacLibrary.class);
} catch (UnsatisfiedLinkError e) {
logger.warn("unable to link C library. native methods (seatbelt) will be disabled.", e);
}
}
libc_mac = lib;
}
/** The only supported flag... */
static final int SANDBOX_NAMED = 1;
/** Allow everything except process fork and execution */
static final String SANDBOX_RULES = "(version 1) (allow default) (deny process-fork) (deny process-exec)";
/** try to install our custom rule profile into sandbox_init() to block execution */
private static void macImpl(Path tmpFile) throws IOException {
// first be defensive: we can give nice errors this way, at the very least.
boolean supported = Constants.MAC_OS_X;
if (supported == false) {
throw new IllegalStateException("bug: should not be trying to initialize seccomp for an unsupported OS");
}
// we couldn't link methods, could be some really ancient OS X (< Leopard) or some bug
if (libc_mac == null) {
throw new UnsupportedOperationException("seatbelt unavailable: could not link methods. requires Leopard or above.");
}
// write rules to a temporary file, which will be passed to sandbox_init()
Path rules = Files.createTempFile(tmpFile, "es", "sb");
Files.write(rules, Collections.singleton(SANDBOX_RULES));
boolean success = false;
try {
PointerByReference errorRef = new PointerByReference();
int ret = libc_mac.sandbox_init(rules.toAbsolutePath().toString(), SANDBOX_NAMED, errorRef);
// if sandbox_init() fails, add the message from the OS (e.g. syntax error) and free the buffer
if (ret != 0) {
Pointer errorBuf = errorRef.getValue();
RuntimeException e = new UnsupportedOperationException("sandbox_init(): " + errorBuf.getString(0));
libc_mac.sandbox_free_error(errorBuf);
throw e;
}
logger.debug("OS X seatbelt initialization successful");
success = true;
} finally {
if (success) {
Files.delete(rules);
} else {
IOUtils.deleteFilesIgnoringExceptions(rules);
}
}
}
/**
* Attempt to drop the capability to execute for the process.
* <p>
* This is best effort and OS and architecture dependent. It may throw any Throwable.
*/
static void init(Path tmpFile) throws Throwable {
if (Constants.LINUX) {
linuxImpl();
} else if (Constants.MAC_OS_X) {
macImpl(tmpFile);
} else {
logger.debug("syscall filtering not supported for OS {}", Constants.OS_NAME);
}
}
}

View File

@ -166,6 +166,8 @@ final class Security {
m.put("repository-s3", "org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin");
m.put("discovery-ec2", "org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin");
m.put("cloud-gce", "org.elasticsearch.plugin.cloud.gce.CloudGcePlugin");
m.put("lang-expression", "org.elasticsearch.script.expression.ExpressionPlugin");
m.put("lang-groovy", "org.elasticsearch.script.groovy.GroovyPlugin");
m.put("lang-javascript", "org.elasticsearch.plugin.javascript.JavaScriptPlugin");
m.put("lang-python", "org.elasticsearch.plugin.python.PythonPlugin");
SPECIAL_PLUGINS = Collections.unmodifiableMap(m);

View File

@ -21,7 +21,7 @@ package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -31,7 +31,12 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Nullable;
@ -52,7 +57,11 @@ import org.elasticsearch.discovery.local.LocalDiscovery;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import java.io.IOException;
import java.util.*;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
/**
* Represents the current state of the cluster.

View File

@ -16,13 +16,19 @@
package org.elasticsearch.common.inject;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.internal.*;
import org.elasticsearch.common.inject.internal.BindingImpl;
import org.elasticsearch.common.inject.internal.Errors;
import org.elasticsearch.common.inject.internal.ErrorsException;
import org.elasticsearch.common.inject.internal.InternalContext;
import org.elasticsearch.common.inject.internal.InternalFactory;
import org.elasticsearch.common.inject.internal.Scoping;
import org.elasticsearch.common.inject.internal.ToStringBuilder;
import org.elasticsearch.common.inject.spi.BindingTargetVisitor;
import org.elasticsearch.common.inject.spi.ConstructorBinding;
import org.elasticsearch.common.inject.spi.Dependency;
import org.elasticsearch.common.inject.spi.InjectionPoint;
import java.util.HashSet;
import java.util.Set;
class ConstructorBindingImpl<T> extends BindingImpl<T> implements ConstructorBinding<T> {
@ -74,10 +80,10 @@ class ConstructorBindingImpl<T> extends BindingImpl<T> implements ConstructorBin
@Override
public Set<Dependency<?>> getDependencies() {
return Dependency.forInjectionPoints(new ImmutableSet.Builder<InjectionPoint>()
.add(getConstructor())
.addAll(getInjectableMembers())
.build());
Set<InjectionPoint> dependencies = new HashSet<>();
dependencies.add(getConstructor());
dependencies.addAll(getInjectableMembers());
return Dependency.forInjectionPoints(dependencies);
}
@Override

View File

@ -16,9 +16,21 @@
package org.elasticsearch.common.inject;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.internal.*;
import org.elasticsearch.common.inject.spi.*;
import org.elasticsearch.common.inject.internal.Errors;
import org.elasticsearch.common.inject.internal.ErrorsException;
import org.elasticsearch.common.inject.internal.InternalContext;
import org.elasticsearch.common.inject.internal.InternalFactory;
import org.elasticsearch.common.inject.internal.PrivateElementsImpl;
import org.elasticsearch.common.inject.internal.ProviderInstanceBindingImpl;
import org.elasticsearch.common.inject.internal.Scoping;
import org.elasticsearch.common.inject.internal.SourceProvider;
import org.elasticsearch.common.inject.internal.Stopwatch;
import org.elasticsearch.common.inject.spi.Dependency;
import org.elasticsearch.common.inject.spi.Element;
import org.elasticsearch.common.inject.spi.Elements;
import org.elasticsearch.common.inject.spi.InjectionPoint;
import org.elasticsearch.common.inject.spi.PrivateElements;
import org.elasticsearch.common.inject.spi.TypeListenerBinding;
import java.util.ArrayList;
import java.util.List;

View File

@ -16,8 +16,6 @@
package org.elasticsearch.common.inject.spi;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.ConfigurationException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Key;

View File

@ -135,7 +135,8 @@ public final class Modules {
private final Set<Module> baseModules;
private RealOverriddenModuleBuilder(Iterable<? extends Module> baseModules) {
this.baseModules = unmodifiableSet(newHashSet(baseModules));
HashSet<? extends Module> modules = newHashSet(baseModules);
this.baseModules = unmodifiableSet(modules);
}
@Override

View File

@ -53,7 +53,6 @@ import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
@ -442,9 +441,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
// ignore
}
}
closeInjectorResource(sId, shardInjector,
StoreRecoveryService.class);
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId, indexShard, indexSettings);
}

View File

@ -87,6 +87,7 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -155,7 +156,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final TranslogConfig translogConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndicesQueryCache indicesQueryCache;
private final StoreRecoveryService storeRecoveryService;
private TimeValue refreshInterval;
@ -200,7 +200,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService,
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
IndicesQueryCache indicesQueryCache, CodecService codecService,
TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@ -217,7 +217,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.storeRecoveryService = storeRecoveryService;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
this.threadPool = threadPool;
this.mapperService = mapperService;
@ -844,13 +843,12 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* After the store has been recovered, we need to start the engine in order to apply operations
*/
public Map<String, Mapping> performTranslogRecovery(boolean indexExists) {
final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false, indexExists);
public void performTranslogRecovery(boolean indexExists) {
internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
return recoveredTypes;
}
private Map<String, Mapping> internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) {
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
@ -869,7 +867,7 @@ public class IndexShard extends AbstractIndexShardComponent {
engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);
createNewEngine(skipTranslogRecovery, engineConfig);
return engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes();
}
/**
@ -879,8 +877,7 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public void skipTranslogRecovery() throws IOException {
assert engineUnsafe() == null : "engine was already created";
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true, true);
assert recoveredTypes.isEmpty();
internalPerformTranslogRecovery(true, true);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}
@ -1063,12 +1060,19 @@ public class IndexShard extends AbstractIndexShardComponent {
return path;
}
public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) {
public boolean recoverFromStore(ShardRouting shard) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
final boolean shouldExist = shard.allocatedPostIndexCreate();
storeRecoveryService.recover(this, shouldExist, recoveryListener);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
}
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository) {
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
}
/**

View File

@ -66,7 +66,6 @@ public class IndexShardModule extends AbstractModule {
}
bind(EngineFactory.class).to(engineFactoryImpl);
bind(StoreRecoveryService.class).asEagerSingleton();
bind(IndexSearcherWrappingService.class).asEagerSingleton();
// this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService
Multibinder<IndexSearcherWrapper> multibinder

View File

@ -55,7 +55,7 @@ public final class ShadowIndexShard extends IndexShard {
@Inject
public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService,
IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService,
IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService,
IndexQueryParserService queryParserService, IndexCache indexCache,
IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache,
@ -64,7 +64,7 @@ public final class ShadowIndexShard extends IndexShard {
SimilarityService similarityService,
EngineFactory factory, ClusterService clusterService,
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
super(shardId, indexSettingsService, indicesLifecycle, store,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, codecService,
termVectorsService, indexFieldDataService,

View File

@ -0,0 +1,278 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
* disk or from a snapshot in a repository.
*/
final class StoreRecovery {
private final ESLogger logger;
private final ShardId shardId;
StoreRecovery(ShardId shardId, ESLogger logger) {
this.logger = logger;
this.shardId = shardId;
}
/**
* Recovers a shard from it's local file system store. This method required pre-knowledge about if the shard should
* exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index
* files / transaction logs. This
* @param indexShard the index shard instance to recovery the shard into
* @param indexShouldExists <code>true</code> iff the index should exist on disk ie. has the shard been allocated previously on the shards store.
* @param localNode the reference to the local node
* @return <code>true</code> if the the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @see Store
*/
boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists, DiscoveryNode localNode) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() != null) {
throw new IllegalStateException("can't recover - restore source is not null");
}
try {
indexShard.recovering("from store", RecoveryState.Type.STORE, localNode);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("starting recovery from store ...");
internalRecoverFromStore(indexShard, indexShouldExists);
});
}
return false;
}
/**
* Recovers an index from a given {@link IndexShardRepository}. This method restores a
* previously created index snapshot into an existing initializing shard.
* @param indexShard the index shard instance to recovery the snapshot from
* @param repository the repository holding the physical files the shard should be recovered from
* @return <code>true</code> if the the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() == null) {
throw new IllegalStateException("can't restore - restore source is null");
}
try {
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
restore(indexShard, repository);
});
}
return false;
}
private boolean canRecover(IndexShard indexShard) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
return false;
}
if (!indexShard.routingEntry().primary()) {
throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null);
}
return true;
}
/**
* Recovers the state of the shard from the store.
*/
private boolean executeRecovery(final IndexShard indexShard, Runnable recoveryRunnable) throws IndexShardRecoveryException {
try {
recoveryRunnable.run();
// Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway
// to call post recovery.
final IndexShardState shardState = indexShard.state();
final RecoveryState recoveryState = indexShard.recoveryState();
assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]";
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append("shard_store").append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n");
RecoveryState.Index index = recoveryState.getIndex();
sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.totalBytes())).append("], took[")
.append(TimeValue.timeValueMillis(index.time())).append("]\n");
sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.recoveredBytes())).append("]\n");
sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time()));
}
return true;
} catch (IndexShardRecoveryException e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
return false;
}
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
return false;
}
throw e;
} catch (IndexShardClosedException | IndexShardNotStartedException e) {
} catch (Exception e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
return false;
}
throw new IndexShardRecoveryException(shardId, "failed recovery", e);
}
return false;
}
/**
* Recovers the state of the shard from the store.
*/
private void internalRecoverFromStore(IndexShard indexShard, boolean indexShouldExists) throws IndexShardRecoveryException {
final RecoveryState recoveryState = indexShard.recoveryState();
indexShard.prepareForIndexRecovery();
long version = -1;
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
try {
try {
store.failIfCorrupted();
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Throwable e1) {
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(shardId, "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
}
}
if (si != null) {
if (indexShouldExists) {
version = si.getVersion();
} else {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
recoveryState.getTranslog().totalOperations(0);
}
}
} catch (Throwable e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
index.addFileDetail(name, length, true);
}
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
if (indexShouldExists == false) {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
}
indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException e) {
throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e);
} finally {
store.decRef();
}
}
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*/
private void restore(final IndexShard indexShard, final IndexShardRepository indexShardRepository) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId);
}
try {
translogState.totalOperations(0);
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId;
if (!shardId.getIndex().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
}
indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
} catch (Throwable t) {
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
}
}
}

View File

@ -1,337 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
*
*/
public class StoreRecoveryService extends AbstractIndexShardComponent implements Closeable {
private final MappingUpdatedAction mappingUpdatedAction;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TimeValue waitForMappingUpdatePostRecovery;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private static final String SETTING_MAPPING_UPDATE_WAIT_LEGACY = "index.gateway.wait_for_mapping_update_post_recovery";
private static final String SETTING_MAPPING_UPDATE_WAIT = "index.shard.wait_for_mapping_update_post_recovery";
private final RestoreService restoreService;
private final RepositoriesService repositoriesService;
@Inject
public StoreRecoveryService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
MappingUpdatedAction mappingUpdatedAction, ClusterService clusterService, RepositoriesService repositoriesService, RestoreService restoreService) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.mappingUpdatedAction = mappingUpdatedAction;
this.restoreService = restoreService;
this.repositoriesService = repositoriesService;
this.clusterService = clusterService;
this.waitForMappingUpdatePostRecovery = indexSettings.getAsTime(SETTING_MAPPING_UPDATE_WAIT, indexSettings.getAsTime(SETTING_MAPPING_UPDATE_WAIT_LEGACY, TimeValue.timeValueSeconds(15)));
}
public interface RecoveryListener {
void onRecoveryDone();
void onIgnoreRecovery(String reason);
void onRecoveryFailed(IndexShardRecoveryException e);
}
/**
* Recovers the state of the shard from the gateway.
*/
public void recover(final IndexShard indexShard, final boolean indexShouldExists, final RecoveryListener listener) throws IndexShardRecoveryException {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
if (!indexShard.routingEntry().primary()) {
listener.onRecoveryFailed(new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null));
return;
}
try {
if (indexShard.routingEntry().restoreSource() != null) {
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
} else {
indexShard.recovering("from store", RecoveryState.Type.STORE, clusterService.localNode());
}
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery("already in recovering process, " + e.getMessage());
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
final RecoveryState recoveryState = indexShard.recoveryState();
if (indexShard.routingEntry().restoreSource() != null) {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
restore(indexShard, recoveryState);
} else {
logger.debug("starting recovery from shard_store ...");
recoverFromStore(indexShard, indexShouldExists, recoveryState);
}
// Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway
// to call post recovery.
IndexShardState shardState = indexShard.state();
assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]";
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append("shard_store").append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n");
RecoveryState.Index index = recoveryState.getIndex();
sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.totalBytes())).append("], took[")
.append(TimeValue.timeValueMillis(index.time())).append("]\n");
sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.recoveredBytes())).append("]\n");
sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time()));
}
listener.onRecoveryDone();
} catch (IndexShardRecoveryException e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
listener.onRecoveryFailed(e);
} catch (IndexShardClosedException e) {
listener.onIgnoreRecovery("shard closed");
} catch (IndexShardNotStartedException e) {
listener.onIgnoreRecovery("shard closed");
} catch (Exception e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
listener.onRecoveryFailed(new IndexShardRecoveryException(shardId, "failed recovery", e));
}
}
});
}
/**
* Recovers the state of the shard from the store.
*/
private void recoverFromStore(IndexShard indexShard, boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardRecoveryException {
indexShard.prepareForIndexRecovery();
long version = -1;
final Map<String, Mapping> typesToUpdate;
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
try {
try {
store.failIfCorrupted();
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Throwable e1) {
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
}
}
if (si != null) {
if (indexShouldExists) {
version = si.getVersion();
} else {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
recoveryState.getTranslog().totalOperations(0);
}
}
} catch (Throwable e) {
throw new IndexShardRecoveryException(shardId(), "failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
index.addFileDetail(name, length, true);
}
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
if (indexShouldExists == false) {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
}
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
String indexName = indexShard.shardId().index().name();
for (Map.Entry<String, Mapping> entry : typesToUpdate.entrySet()) {
validateMappingUpdate(indexName, entry.getKey(), entry.getValue());
}
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException e) {
throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e);
} finally {
store.decRef();
}
}
private void validateMappingUpdate(final String indexName, final String type, Mapping update) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
mappingUpdatedAction.updateMappingOnMaster(indexName, type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
latch.countDown();
error.set(t);
}
});
cancellableThreads.execute(new CancellableThreads.Interruptable() {
@Override
public void run() throws InterruptedException {
try {
if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("waited for mapping update on master for [{}], yet timed out", type);
} else {
if (error.get() != null) {
throw new IndexShardRecoveryException(shardId, "Failed to propagate mappings on master post recovery", error.get());
}
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
throw e;
}
}
});
}
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*
* @param recoveryState recovery state
*/
private void restore(final IndexShard indexShard, final RecoveryState recoveryState) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId);
}
try {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
ShardId snapshotShardId = shardId;
if (!shardId.getIndex().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
}
indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState);
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shardId());
}
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
}
}
@Override
public void close() {
cancellableThreads.cancel("closed");
}
}

View File

@ -32,16 +32,13 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -52,17 +49,14 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
@ -91,6 +85,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
private final RestoreService restoreService;
private final RepositoriesService repositoriesService;
static class FailedShard {
public final long version;
@ -112,7 +108,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction) {
NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -121,7 +117,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.shardStateAction = shardStateAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.restoreService = restoreService;
this.repositoriesService = repositoriesService;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@ -675,18 +672,33 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
handleRecoveryFailure(indexService, shardRouting, true, e);
}
} else {
indexService.shard(shardId).recoverFromStore(shardRouting, new StoreRecoveryService.RecoveryListener() {
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
}
@Override
public void onIgnoreRecovery(String reason) {
}
@Override
public void onRecoveryFailed(IndexShardRecoveryException e) {
threadPool.generic().execute(() -> {
final RestoreSource restoreSource = shardRouting.restoreSource();
try {
final boolean success;
final IndexShard shard = indexService.shard(shardId);
if (restoreSource == null) {
// recover from filesystem store
success = shard.recoverFromStore(shardRouting);
} else {
// restore
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
try {
success = shard.restoreFromRepository(shardRouting, indexShardRepository);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shard.shardId());
}
throw t;
}
if (success) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shard.shardId());
}
}
if (success) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
}
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
}
});

View File

@ -65,7 +65,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
@ -115,11 +115,9 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link StoreRecoveryService#recover(IndexShard, boolean, StoreRecoveryService.RecoveryListener)}
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository)}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null
* {@code recover} method uses {@link StoreRecoveryService#restore}
* method to start shard restore process.
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>
* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)},
* which updates {@link RestoreInProgress} in cluster state or removes it when all shards are completed. In case of
@ -489,7 +487,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
/**
* This method is used by {@link StoreRecoveryService} to notify
* This method is used by {@link IndexShard} to notify
* {@code RestoreService} about shard restore completion.
*
* @param snapshotId snapshot id

View File

@ -57,6 +57,20 @@ grant codeBase "${es.security.plugin.cloud-gce}" {
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
};
grant codeBase "${es.security.plugin.lang-expression}" {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";
};
grant codeBase "${es.security.plugin.lang-groovy}" {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";
// needed by groovy engine
permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";
// needed by GroovyScriptEngineService to close its classloader (why?)
permission java.lang.RuntimePermission "closeClassLoader";
};
grant codeBase "${es.security.plugin.lang-javascript}" {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";
@ -106,6 +120,8 @@ grant codeBase "${es.security.jar.randomizedtesting.junit4}" {
grant {
// Allow executing groovy scripts with codesource of /groovy/script
// TODO: make our own general ScriptServicePermission we check instead and
// check-before-createClassLoader for all scripting engines.
permission groovy.security.GroovyCodeSourcePermission "/groovy/script";
// Allow connecting to the internet anywhere
@ -114,15 +130,9 @@ grant {
// Allow read/write to all system properties
permission java.util.PropertyPermission "*", "read,write";
// needed by scripting engines, etc
permission java.lang.RuntimePermission "createClassLoader";
// needed by lucene SPI currently
permission java.lang.RuntimePermission "getClassLoader";
// needed by GroovyScriptEngineService
permission java.lang.RuntimePermission "closeClassLoader";
// needed by Settings
permission java.lang.RuntimePermission "getenv.*";
@ -130,12 +140,10 @@ grant {
// otherwise can be provided only to test libraries
permission java.lang.RuntimePermission "modifyThread";
// needed by groovy scripting
// needed by ExceptionSerializationTests and RestTestCase for
// some hackish things they do. otherwise only needed by groovy
// (TODO: clean this up?)
permission java.lang.RuntimePermission "getProtectionDomain";
// reflection hacks:
// needed by groovy engine
permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";
// likely not low hanging fruit...
permission java.lang.RuntimePermission "accessDeclaredMembers";

View File

@ -51,8 +51,17 @@ public class BootstrapForTesting {
// without making things complex???
static {
// make sure java.io.tmpdir exists always (in case code uses it in a static initializer)
Path javaTmpDir = PathUtils.get(Objects.requireNonNull(System.getProperty("java.io.tmpdir"),
"please set ${java.io.tmpdir} in pom.xml"));
try {
Security.ensureDirectoryExists(javaTmpDir);
} catch (Exception e) {
throw new RuntimeException("unable to create test temp directory", e);
}
// just like bootstrap, initialize natives, then SM
Bootstrap.initializeNatives(true, true, true);
Bootstrap.initializeNatives(javaTmpDir, true, true, true);
// initialize probes
Bootstrap.initializeProbes();
@ -64,15 +73,6 @@ public class BootstrapForTesting {
throw new RuntimeException("found jar hell in test classpath", e);
}
// make sure java.io.tmpdir exists always (in case code uses it in a static initializer)
Path javaTmpDir = PathUtils.get(Objects.requireNonNull(System.getProperty("java.io.tmpdir"),
"please set ${java.io.tmpdir} in pom.xml"));
try {
Security.ensureDirectoryExists(javaTmpDir);
} catch (Exception e) {
throw new RuntimeException("unable to create test temp directory", e);
}
// install security manager if requested
if (systemPropertyAsBoolean("tests.security.manager", true)) {
try {

View File

@ -43,4 +43,16 @@ public class ShardRoutingHelper {
public static void initialize(ShardRouting routing, String nodeId, long expectedSize) {
routing.initialize(nodeId, expectedSize);
}
public static void reinit(ShardRouting routing) {
routing.reinitializeShard();
}
public static void moveToUnassigned(ShardRouting routing, UnassignedInfo info) {
routing.moveToUnassigned(info);
}
public static ShardRouting newWithRestoreSource(ShardRouting routing, RestoreSource restoreSource) {
return new ShardRouting(routing.index(), routing.shardId().id(), routing.currentNodeId(), routing.relocatingNodeId(), restoreSource, routing.primary(), routing.state(), routing.version(), routing.unassignedInfo(), routing.allocationId(), true, routing.getExpectedShardSize());
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.gateway;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -32,7 +31,10 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test;
import java.util.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;

View File

@ -435,7 +435,9 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
* Allows to override boost assertions for queries that don't have the default behaviour
*/
protected void assertBoost(QB queryBuilder, Query query) throws IOException {
assertThat(query.getBoost(), equalTo(queryBuilder.boost()));
// workaround https://bugs.openjdk.java.net/browse/JDK-8056984
float boost = queryBuilder.boost();
assertThat(query.getBoost(), equalTo(boost));
}
/**

View File

@ -21,7 +21,9 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
@ -37,14 +39,14 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -64,10 +66,13 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.VersionUtils;
@ -90,7 +95,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
@ -767,4 +771,135 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(total + 1, shard.flushStats().getTotal());
}
public void testRecoverFromStore() {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush().get();
}
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c simon says so");
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue(newShard.recoverFromStore(routing));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 1);
}
public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush().get();
}
final ShardRouting origRouting = shard.routingEntry();
ShardRouting routing = new ShardRouting(origRouting);
Store store = shard.store();
store.incRef();
test.removeShard(0, "b/c simon says so");
Lucene.cleanLuceneIndex(store.directory());
store.decRef();
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
try {
newShard.recoverFromStore(routing);
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
}
ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so"));
ShardRoutingHelper.initialize(routing, origRouting.currentNodeId());
assertFalse("it's already recovering", newShard.recoverFromStore(routing));
test.removeShard(0, "I broken it");
newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(true).get();
assertHitCount(client().prepareSearch().get(), 1);
}
public void testRestoreShard() throws IOException {
createIndex("test");
createIndex("test_target");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
IndexService test_target = indicesService.indexService("test_target");
final IndexShard test_shard = test.shard(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
client().prepareIndex("test_target", "test", "1").setSource("{}").setRefresh(true).get();
assertHitCount(client().prepareSearch("test_target").get(), 1);
assertSearchHits(client().prepareSearch("test_target").get(), "1");
client().admin().indices().prepareFlush("test").get(); // only flush test
final ShardRouting origRouting = test_target.shard(0).routingEntry();
ShardRouting routing = new ShardRouting(origRouting);
ShardRoutingHelper.reinit(routing);
routing = ShardRoutingHelper.newWithRestoreSource(routing, new RestoreSource(new SnapshotId("foo", "bar"), Version.CURRENT, "test"));
test_target.removeShard(0, "just do it man!");
final IndexShard test_target_shard = test_target.createShard(0, routing);
Store sourceStore = test_shard.store();
Store targetStore = test_target_shard.store();
test_target_shard.updateRoutingEntry(routing, false);
assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
}
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
Lucene.cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken) {
}
}));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
test_target_shard.updateRoutingEntry(routing, true);
assertHitCount(client().prepareSearch("test_target").get(), 1);
assertSearchHits(client().prepareSearch("test_target").get(), "0");
}
}

View File

@ -128,6 +128,7 @@ com.google.common.collect.FluentIterable
com.google.common.io.Files
com.google.common.primitives.Ints
com.google.common.collect.ImmutableSet
com.google.common.collect.ImmutableSet$Builder
@defaultMessage Do not violate java's access system
java.lang.reflect.AccessibleObject#setAccessible(boolean)

View File

@ -43,6 +43,8 @@ import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.lookup.SearchLookup;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Map;
@ -91,12 +93,18 @@ public class ExpressionScriptEngineService extends AbstractComponent implements
@Override
public Object compile(String script) {
try {
// NOTE: validation is delayed to allow runtime vars, and we don't have access to per index stuff here
return JavascriptCompiler.compile(script);
} catch (ParseException e) {
throw new ScriptException("Failed to parse expression: " + script, e);
}
// classloader created here
return AccessController.doPrivileged(new PrivilegedAction<Expression>() {
@Override
public Expression run() {
try {
// NOTE: validation is delayed to allow runtime vars, and we don't have access to per index stuff here
return JavascriptCompiler.compile(script);
} catch (ParseException e) {
throw new ScriptException("Failed to parse expression: " + script, e);
}
}
});
}
@Override

View File

@ -423,6 +423,7 @@ public class MoreExpressionTests extends ESIntegTestCase {
// series of unit test for using expressions as executable scripts
public void testExecutableScripts() throws Exception {
assumeTrue("test creates classes directly, cannot run with security manager", System.getSecurityManager() == null);
Map<String, Object> vars = new HashMap<>();
vars.put("a", 2.5);
vars.put("b", 3);

View File

@ -20,10 +20,13 @@
package org.elasticsearch.script.groovy;
import java.nio.charset.StandardCharsets;
import com.google.common.hash.Hashing;
import groovy.lang.Binding;
import groovy.lang.GroovyClassLoader;
import groovy.lang.Script;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Scorer;
import org.codehaus.groovy.ast.ClassCodeExpressionTransformer;
@ -49,6 +52,8 @@ import org.elasticsearch.search.lookup.SearchLookup;
import java.io.IOException;
import java.math.BigDecimal;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
@ -99,17 +104,30 @@ public class GroovyScriptEngineService extends AbstractComponent implements Scri
}
// Groovy class loader to isolate Groovy-land code
this.loader = new GroovyClassLoader(getClass().getClassLoader(), config);
// classloader created here
this.loader = AccessController.doPrivileged(new PrivilegedAction<GroovyClassLoader>() {
@Override
public GroovyClassLoader run() {
return new GroovyClassLoader(getClass().getClassLoader(), config);
}
});
}
@Override
public void close() {
loader.clearCache();
try {
loader.close();
} catch (IOException e) {
logger.warn("Unable to close Groovy loader", e);
}
// close classloader here (why do we do this?)
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
try {
loader.close();
} catch (IOException e) {
logger.warn("Unable to close Groovy loader", e);
}
return null;
}
});
}
@Override

View File

@ -195,6 +195,10 @@ fi
install_and_check_plugin lang expression
}
@test "[$GROUP] install lang-groovy plugin" {
install_and_check_plugin lang groovy
}
@test "[$GROUP] install javascript plugin" {
install_and_check_plugin lang javascript rhino-*.jar
}
@ -295,6 +299,10 @@ fi
remove_plugin lang-expression
}
@test "[$GROUP] remove lang-groovy plugin" {
remove_plugin lang-groovy
}
@test "[$GROUP] remove javascript plugin" {
remove_plugin lang-javascript
}