From 88842f33190ab940149464729400980b0ab20a41 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 4 Aug 2015 15:55:00 +0200 Subject: [PATCH 01/16] PluginManager: Fix elastic.co download URLs, add snapshot ones The URL to download the main elasticsearch plugins did not match what the S3 wageon is supposed to write to. In addition this PR adds support for snapshots to access the snapshot S3 bucket, so we can possibly download snapshot versions of plugins. The format of the URLs stems from #12270 Closes #12632 --- .../elasticsearch/plugins/PluginManager.java | 5 ++- .../plugins/PluginManagerUnitTests.java | 43 ++++++++++++++----- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java index 7c0f2bfcc8c..5a03dcc61cc 100644 --- a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java +++ b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java @@ -425,7 +425,10 @@ public class PluginManager { // Elasticsearch new download service uses groupId org.elasticsearch.plugins from 2.0.0 if (user == null) { // TODO Update to https - addUrl(urls, String.format(Locale.ROOT, "http://download.elastic.co/org.elasticsearch.plugins/%1$s/%1$s-%2$s.zip", repo, version)); + if (Version.CURRENT.snapshot()) { + addUrl(urls, String.format(Locale.ROOT, "http://download.elastic.co/elasticsearch/snapshot/org/elasticsearch/plugin/%s/%s-SNAPSHOT/%s-%s-SNAPSHOT.zip", repo, version, repo, version)); + } + addUrl(urls, String.format(Locale.ROOT, "http://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/%s/%s/%s-%s.zip", repo, version, repo, version)); } else { // Elasticsearch old download service // TODO Update to https diff --git a/core/src/test/java/org/elasticsearch/plugins/PluginManagerUnitTests.java b/core/src/test/java/org/elasticsearch/plugins/PluginManagerUnitTests.java index 75d680c1abd..519c003ac69 100644 --- a/core/src/test/java/org/elasticsearch/plugins/PluginManagerUnitTests.java +++ b/core/src/test/java/org/elasticsearch/plugins/PluginManagerUnitTests.java @@ -29,8 +29,11 @@ import org.junit.Test; import java.io.IOException; import java.net.URL; import java.nio.file.Path; +import java.util.Iterator; +import java.util.Locale; import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -62,22 +65,40 @@ public class PluginManagerUnitTests extends ESTestCase { public void testSimplifiedNaming() throws IOException { String pluginName = randomAsciiOfLength(10); PluginManager.PluginHandle handle = PluginManager.PluginHandle.parse(pluginName); - assertThat(handle.urls(), hasSize(1)); - URL expected = new URL("http", "download.elastic.co", "/org.elasticsearch.plugins/" + pluginName + "/" + + + assertThat(handle.urls(), hasSize(Version.CURRENT.snapshot() ? 2 : 1)); + + Iterator iterator = handle.urls().iterator(); + + if (Version.CURRENT.snapshot()) { + String expectedSnapshotUrl = String.format(Locale.ROOT, "http://download.elastic.co/elasticsearch/snapshot/org/elasticsearch/plugin/%s/%s-SNAPSHOT/%s-%s-SNAPSHOT.zip", + pluginName, Version.CURRENT.number(), pluginName, Version.CURRENT.number()); + assertThat(iterator.next(), is(new URL(expectedSnapshotUrl))); + } + + URL expected = new URL("http", "download.elastic.co", "/elasticsearch/release/org/elasticsearch/plugin/" + pluginName + "/" + Version.CURRENT.number() + "/" + pluginName + "-" + Version.CURRENT.number() + ".zip"); - assertThat(handle.urls().get(0), is(expected)); + assertThat(iterator.next(), is(expected)); } @Test public void testTrimmingElasticsearchFromOfficialPluginName() throws IOException { - String randomName = randomAsciiOfLength(10); - String pluginName = randomFrom("elasticsearch-", "es-") + randomName; - PluginManager.PluginHandle handle = PluginManager.PluginHandle.parse(pluginName); - assertThat(handle.name, is(randomName)); - assertThat(handle.urls(), hasSize(1)); - URL expected = new URL("http", "download.elastic.co", "/org.elasticsearch.plugins/" + pluginName + "/" + - pluginName + "-" + Version.CURRENT.number() + ".zip"); - assertThat(handle.urls().get(0), is(expected)); + String randomPluginName = randomFrom(PluginManager.OFFICIAL_PLUGINS.asList()); + PluginManager.PluginHandle handle = PluginManager.PluginHandle.parse(randomPluginName); + assertThat(handle.name, is(randomPluginName.replaceAll("^elasticsearch-", ""))); + + assertThat(handle.urls(), hasSize(Version.CURRENT.snapshot() ? 2 : 1)); + Iterator iterator = handle.urls().iterator(); + + if (Version.CURRENT.snapshot()) { + String expectedSnapshotUrl = String.format(Locale.ROOT, "http://download.elastic.co/elasticsearch/snapshot/org/elasticsearch/plugin/%s/%s-SNAPSHOT/%s-%s-SNAPSHOT.zip", + randomPluginName, Version.CURRENT.number(), randomPluginName, Version.CURRENT.number()); + assertThat(iterator.next(), is(new URL(expectedSnapshotUrl))); + } + + String releaseUrl = String.format(Locale.ROOT, "http://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/%s/%s/%s-%s.zip", + randomPluginName, Version.CURRENT.number(), randomPluginName, Version.CURRENT.number()); + assertThat(iterator.next(), is(new URL(releaseUrl))); } @Test From d7d25fe6b595422c38426ed1b39ab8f93f5b5919 Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Wed, 5 Aug 2015 06:45:52 -0400 Subject: [PATCH 02/16] Add path.scripts directory Today this is "unofficial" as conf/scripts, but some people want to share scripts across different nodes and so on. Because they cannot configure it, they are forced to use dirty hacks like symbolic links, which isnt going to work: we aren't going to recursively scan conf/ and add permissions to all link targets underneath it, thats crazy. I really hate adding yet another configuration knob here, but users resorting to using symlinks are going to be frustrated, and do things in a more insecure way. --- .../org/elasticsearch/bootstrap/Security.java | 1 + .../java/org/elasticsearch/env/Environment.java | 15 +++++++++++++++ .../org/elasticsearch/script/ScriptService.java | 2 +- .../elasticsearch/bootstrap/SecurityTests.java | 3 +++ docs/reference/modules/scripting.asciidoc | 10 ++++++---- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Security.java b/core/src/main/java/org/elasticsearch/bootstrap/Security.java index ca6fb9f0eb0..7bd25ce2fc8 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/Security.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/Security.java @@ -122,6 +122,7 @@ final class Security { addPath(policy, environment.libFile(), "read,readlink"); addPath(policy, environment.pluginsFile(), "read,readlink"); addPath(policy, environment.configFile(), "read,readlink"); + addPath(policy, environment.scriptsFile(), "read,readlink"); // read-write dirs addPath(policy, environment.tmpFile(), "read,readlink,write,delete"); addPath(policy, environment.logsFile(), "read,readlink,write,delete"); diff --git a/core/src/main/java/org/elasticsearch/env/Environment.java b/core/src/main/java/org/elasticsearch/env/Environment.java index 113caa0003a..fccf767ea71 100644 --- a/core/src/main/java/org/elasticsearch/env/Environment.java +++ b/core/src/main/java/org/elasticsearch/env/Environment.java @@ -53,6 +53,8 @@ public class Environment { private final Path configFile; + private final Path scriptsFile; + private final Path pluginsFile; /** location of bin/, used by plugin manager */ @@ -100,6 +102,12 @@ public class Environment { configFile = homeFile.resolve("config"); } + if (settings.get("path.scripts") != null) { + scriptsFile = PathUtils.get(cleanPath(settings.get("path.scripts"))); + } else { + scriptsFile = configFile.resolve("scripts"); + } + if (settings.get("path.plugins") != null) { pluginsFile = PathUtils.get(cleanPath(settings.get("path.plugins"))); } else { @@ -233,6 +241,13 @@ public class Environment { return configFile; } + /** + * Location of on-disk scripts + */ + public Path scriptsFile() { + return scriptsFile; + } + public Path pluginsFile() { return pluginsFile; } diff --git a/core/src/main/java/org/elasticsearch/script/ScriptService.java b/core/src/main/java/org/elasticsearch/script/ScriptService.java index e683e5d66fd..f6d8132d6c6 100644 --- a/core/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/core/src/main/java/org/elasticsearch/script/ScriptService.java @@ -171,7 +171,7 @@ public class ScriptService extends AbstractComponent implements Closeable { this.scriptModes = new ScriptModes(this.scriptEnginesByLang, scriptContextRegistry, settings); // add file watcher for static scripts - scriptsDirectory = env.configFile().resolve("scripts"); + scriptsDirectory = env.scriptsFile(); if (logger.isTraceEnabled()) { logger.trace("Using scripts directory [{}] ", scriptsDirectory); } diff --git a/core/src/test/java/org/elasticsearch/bootstrap/SecurityTests.java b/core/src/test/java/org/elasticsearch/bootstrap/SecurityTests.java index 664b677fe7f..17b74404528 100644 --- a/core/src/test/java/org/elasticsearch/bootstrap/SecurityTests.java +++ b/core/src/test/java/org/elasticsearch/bootstrap/SecurityTests.java @@ -74,6 +74,7 @@ public class SecurityTests extends ESTestCase { Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put("path.home", esHome.resolve("home").toString()); settingsBuilder.put("path.conf", esHome.resolve("conf").toString()); + settingsBuilder.put("path.scripts", esHome.resolve("scripts").toString()); settingsBuilder.put("path.plugins", esHome.resolve("plugins").toString()); settingsBuilder.putArray("path.data", esHome.resolve("data1").toString(), esHome.resolve("data2").toString()); settingsBuilder.put("path.logs", esHome.resolve("logs").toString()); @@ -109,6 +110,8 @@ public class SecurityTests extends ESTestCase { assertExactPermissions(new FilePermission(environment.libFile().toString(), "read,readlink"), permissions); // config file: ro assertExactPermissions(new FilePermission(environment.configFile().toString(), "read,readlink"), permissions); + // scripts file: ro + assertExactPermissions(new FilePermission(environment.scriptsFile().toString(), "read,readlink"), permissions); // plugins: ro assertExactPermissions(new FilePermission(environment.pluginsFile().toString(), "read,readlink"), permissions); diff --git a/docs/reference/modules/scripting.asciidoc b/docs/reference/modules/scripting.asciidoc index 69748f09ed1..ea31b2d010c 100644 --- a/docs/reference/modules/scripting.asciidoc +++ b/docs/reference/modules/scripting.asciidoc @@ -85,10 +85,12 @@ supported scripting languages: To increase security, Elasticsearch does not allow you to specify scripts for non-sandboxed languages with a request. Instead, scripts must be placed in the `scripts` directory inside the configuration directory (the directory where -elasticsearch.yml is). Scripts placed into this directory will automatically be -picked up and be available to be used. Once a script has been placed in this -directory, it can be referenced by name. For example, a script called -`calculate-score.groovy` can be referenced in a request like this: +elasticsearch.yml is). The default location of this `scripts` directory can be +changed by setting `path.scripts` in elasticsearch.yml. Scripts placed into +this directory will automatically be picked up and be available to be used. +Once a script has been placed in this directory, it can be referenced by name. +For example, a script called `calculate-score.groovy` can be referenced in a +request like this: [source,sh] -------------------------------------------------- From da31fbe702fd2b0d3d3daada82af19796e528c89 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 5 Aug 2015 13:47:28 +0200 Subject: [PATCH 03/16] Testing: Removing leftover @AwaitsFix annotation Since the update to HDRHistogram 2.1.6 this test was forgotten to get the annotation removed in #12554. --- .../search/aggregations/metrics/HDRPercentileRanksIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/HDRPercentileRanksIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/HDRPercentileRanksIT.java index 5a25aa27252..6b8c39e9214 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/HDRPercentileRanksIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/HDRPercentileRanksIT.java @@ -256,7 +256,6 @@ public class HDRPercentileRanksIT extends AbstractNumericTestCase { @Override @Test - @AwaitsFix(bugUrl="Fails with seed: B75FCDC119D90BBE, Colin to fix") public void testSingleValuedField_WithValueScript_WithParams() throws Exception { int sigDigits = randomSignificantDigits(); Map params = new HashMap<>(); From 60f8fa4451942e4a7ef6236bdda2ac3281f2ea12 Mon Sep 17 00:00:00 2001 From: Clinton Gormley Date: Wed, 5 Aug 2015 13:58:09 +0200 Subject: [PATCH 04/16] Added pure perl fallback for Digest::SHA for old windows boxes --- .../license-check/check_license_and_sha.pl | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/dev-tools/src/main/resources/license-check/check_license_and_sha.pl b/dev-tools/src/main/resources/license-check/check_license_and_sha.pl index 5af5b6b18ed..cc5f5b02773 100755 --- a/dev-tools/src/main/resources/license-check/check_license_and_sha.pl +++ b/dev-tools/src/main/resources/license-check/check_license_and_sha.pl @@ -8,11 +8,22 @@ use lib "$RealBin/lib"; use File::Spec(); use File::Temp(); use File::Find(); -use Digest::SHA qw(sha1); use File::Basename qw(basename); use Archive::Extract(); $Archive::Extract::PREFER_BIN = 1; +our $SHA_CLASS = 'Digest::SHA'; +if ( eval { require Digest::SHA } ) { + $SHA_CLASS = 'Digest::SHA'; +} +else { + + print STDERR "Digest::SHA not available. " + . "Falling back to Digest::SHA::PurePerl\n"; + require Digest::SHA::PurePerl; + $SHA_CLASS = 'Digest::SHA::PurePerl'; +} + my $mode = shift(@ARGV) || ""; die usage() unless $mode =~ /^--(check|update)$/; @@ -230,7 +241,7 @@ sub calculate_shas { #=================================== my %shas; while ( my $file = shift() ) { - my $digest = eval { Digest::SHA->new(1)->addfile($file) } + my $digest = eval { $SHA_CLASS->new(1)->addfile($file) } or die "Error calculating SHA1 for <$file>: $!\n"; $shas{ basename($file) . ".sha1" } = $digest->hexdigest; } From 5a701367a7351709bd94229418670342e53f5380 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 5 Aug 2015 15:56:42 +0200 Subject: [PATCH 05/16] PluginManager: Add Support for basic auth In order to support the URL notation including a user/pass combination (like http://user:pass@host/plugin.zip) the auth info needs to be added manually. --- .../http/client/HttpDownloadHelper.java | 16 +++- .../elasticsearch/plugins/PluginManager.java | 7 +- .../plugins/PluginManagerIT.java | 89 +++++++++++++++++++ 3 files changed, 109 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java b/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java index b804cc6f37f..4d177d7f958 100644 --- a/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java +++ b/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java @@ -19,12 +19,13 @@ package org.elasticsearch.common.http.client; +import com.google.common.base.Charsets; +import com.google.common.base.Strings; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; +import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.cli.Terminal; import org.elasticsearch.common.unit.TimeValue; import java.io.*; @@ -266,6 +267,17 @@ public class HttpDownloadHelper { connection.setIfModifiedSince(timestamp); } + // in case the plugin manager is its own project, this can become an authenticator + boolean isSecureProcotol = "https".equalsIgnoreCase(aSource.getProtocol()); + boolean isAuthInfoSet = !Strings.isNullOrEmpty(aSource.getUserInfo()); + if (isAuthInfoSet) { + if (!isSecureProcotol) { + throw new IOException("Basic auth is only supported for HTTPS!"); + } + String basicAuth = Base64.encodeBytes(aSource.getUserInfo().getBytes(Charsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + basicAuth); + } + if (connection instanceof HttpURLConnection) { ((HttpURLConnection) connection).setInstanceFollowRedirects(false); ((HttpURLConnection) connection).setUseCaches(true); diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java index 5a03dcc61cc..44b7078e9af 100644 --- a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java +++ b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java @@ -22,7 +22,6 @@ package org.elasticsearch.plugins; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; - import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; @@ -132,6 +131,12 @@ public class PluginManager { // first, try directly from the URL provided if (url != null) { URL pluginUrl = new URL(url); + boolean isSecureProcotol = "https".equalsIgnoreCase(pluginUrl.getProtocol()); + boolean isAuthInfoSet = !Strings.isNullOrEmpty(pluginUrl.getUserInfo()); + if (isAuthInfoSet && !isSecureProcotol) { + throw new IOException("Basic auth is only supported for HTTPS!"); + } + terminal.println("Trying %s ...", pluginUrl.toExternalForm()); try { downloadHelper.download(pluginUrl, pluginFile, progress, this.timeout); diff --git a/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java b/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java index 31f088498bc..a96187e614b 100644 --- a/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java +++ b/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java @@ -18,9 +18,12 @@ */ package org.elasticsearch.plugins; +import com.google.common.base.Charsets; import org.apache.http.impl.client.HttpClients; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; +import org.elasticsearch.common.Base64; +import org.elasticsearch.common.cli.CliTool; import org.elasticsearch.common.cli.CliTool.ExitStatus; import org.elasticsearch.common.cli.CliToolTestCase.CaptureOutputTerminal; import org.elasticsearch.common.collect.Tuple; @@ -32,11 +35,23 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.junit.annotations.Network; import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; import org.elasticsearch.test.rest.client.http.HttpResponse; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.http.*; +import org.jboss.netty.handler.ssl.SslContext; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.jboss.netty.handler.ssl.util.SelfSignedCertificate; import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -46,6 +61,8 @@ import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.PosixFileAttributeView; import java.nio.file.attribute.PosixFileAttributes; import java.nio.file.attribute.PosixFilePermission; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -59,6 +76,7 @@ import static org.elasticsearch.test.ESIntegTestCase.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertDirectoryExists; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists; import static org.hamcrest.Matchers.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0.0) @LuceneTestCase.SuppressFileSystems("*") // TODO: clean up this test to allow extra files @@ -477,6 +495,77 @@ public class PluginManagerIT extends ESIntegTestCase { } } + @Test + public void testThatBasicAuthIsRejectedOnHttp() throws Exception { + assertStatus(String.format(Locale.ROOT, "install foo --url http://user:pass@localhost:12345/foo.zip --verbose"), CliTool.ExitStatus.IO_ERROR); + assertThat(terminal.getTerminalOutput(), hasItem(containsString("Basic auth is only supported for HTTPS!"))); + } + + @Test + public void testThatBasicAuthIsSupportedWithHttps() throws Exception { + assumeTrue("test requires security manager to be disabled", System.getSecurityManager() == null); + + SSLSocketFactory defaultSocketFactory = HttpsURLConnection.getDefaultSSLSocketFactory(); + ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory()); + SelfSignedCertificate ssc = new SelfSignedCertificate("localhost"); + + try { + + // Create a trust manager that does not validate certificate chains: + SSLContext sc = SSLContext.getInstance("SSL"); + sc.init(null, InsecureTrustManagerFactory.INSTANCE.getTrustManagers(), null); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + + final List requests = new ArrayList<>(); + final SslContext sslContext = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); + + serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline( + new SslHandler(sslContext.newEngine()), + new HttpRequestDecoder(), + new HttpResponseEncoder(), + new LoggingServerHandler(requests) + ); + } + }); + + Channel channel = serverBootstrap.bind(new InetSocketAddress("localhost", 0)); + int port = ((InetSocketAddress) channel.getLocalAddress()).getPort(); + // IO_ERROR because there is no real file delivered... + assertStatus(String.format(Locale.ROOT, "install foo --url https://user:pass@localhost:%s/foo.zip --verbose --timeout 1s", port), ExitStatus.IO_ERROR); + + assertThat(requests, hasSize(1)); + String msg = String.format(Locale.ROOT, "Request header did not contain Authorization header, terminal output was: %s", terminal.getTerminalOutput()); + assertThat(msg, requests.get(0).headers().contains("Authorization"), is(true)); + assertThat(msg, requests.get(0).headers().get("Authorization"), is("Basic " + Base64.encodeBytes("user:pass".getBytes(Charsets.UTF_8)))); + } finally { + HttpsURLConnection.setDefaultSSLSocketFactory(defaultSocketFactory); + serverBootstrap.releaseExternalResources(); + ssc.delete(); + } + } + + private static class LoggingServerHandler extends SimpleChannelUpstreamHandler { + + private List requests; + + public LoggingServerHandler(List requests) { + this.requests = requests; + } + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws InterruptedException { + final HttpRequest request = (HttpRequest) e.getMessage(); + requests.add(request); + final org.jboss.netty.handler.codec.http.HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST); + ctx.getChannel().write(response); + } + } + + + private Tuple buildInitialSettings() throws IOException { Settings settings = settingsBuilder() .put("discovery.zen.ping.multicast.enabled", false) From d46fdb1638b9ac8cf4a0f2b88797e825a0bde6d1 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 29 Jul 2015 10:05:12 -0400 Subject: [PATCH 06/16] [Tests] Cleanup EsExecutorsTests * names prefixed with test don't need @Test * Javadoc describing what it tests --- .../common/util/concurrent/EsExecutorsTests.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index eabac31974c..7c0891fac6e 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; /** + * Tests for EsExecutors and its components like EsAbortPolicy. */ public class EsExecutorsTests extends ESTestCase { @@ -38,7 +39,6 @@ public class EsExecutorsTests extends ESTestCase { return TimeUnit.values()[between(0, TimeUnit.values().length - 1)]; } - @Test public void testFixedForcedExecution() throws Exception { EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); @@ -101,7 +101,6 @@ public class EsExecutorsTests extends ESTestCase { executor.shutdownNow(); } - @Test public void testFixedRejected() throws Exception { EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); @@ -156,7 +155,6 @@ public class EsExecutorsTests extends ESTestCase { terminate(executor); } - @Test public void testScaleUp() throws Exception { final int min = between(1, 3); final int max = between(min + 1, 6); @@ -193,7 +191,6 @@ public class EsExecutorsTests extends ESTestCase { terminate(pool); } - @Test public void testScaleDown() throws Exception { final int min = between(1, 3); final int max = between(min + 1, 6); From ed7d84ca5f74ad63bf57e9a86e2fd63a202cd7e9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 29 Jul 2015 12:02:46 -0400 Subject: [PATCH 07/16] Core: Improve toString on EsThreadPoolExecutor Improving the toString allows for nicer error reporting. Also cleaned up the way that EsRejectedExecutionException notices that it was rejected from a shutdown thread pool. I left javadocs about how its not 100% correct but good enough for most uses. The improved toString on EsThreadPoolExecutor mean every one of them needs a name. In most cases the name to use is obvious. In tests I use the name of the test method and in real thread pools I use the name of the thread pool. In non-ThreadPool executors I use the thread's name. Closes #9732 --- .../service/InternalClusterService.java | 2 +- .../common/util/concurrent/EsAbortPolicy.java | 13 +-- .../common/util/concurrent/EsExecutors.java | 16 ++-- .../EsRejectedExecutionException.java | 30 ++++++- .../util/concurrent/EsThreadPoolExecutor.java | 38 ++++++++- .../PrioritizedEsThreadPoolExecutor.java | 4 +- .../zen/ping/unicast/UnicastZenPing.java | 2 +- .../indices/recovery/RecoverySettings.java | 6 +- .../elasticsearch/threadpool/ThreadPool.java | 6 +- .../transport/local/LocalTransport.java | 2 +- .../util/concurrent/EsExecutorsTests.java | 83 ++++++++++++++++++- .../concurrent/PrioritizedExecutorsTests.java | 12 +-- .../org/elasticsearch/test/ESTestCase.java | 2 +- .../test/InternalTestCluster.java | 2 +- 14 files changed, 172 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 456d6dc0e65..70e993274ff 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -154,7 +154,7 @@ public class InternalClusterService extends AbstractLifecycleComponent nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java index 8bb16869c47..2b19fa2096c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java @@ -27,9 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor; /** */ public class EsAbortPolicy implements XRejectedExecutionHandler { - private final CounterMetric rejected = new CounterMetric(); - public static final String SHUTTING_DOWN_KEY = "(shutting down)"; @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { @@ -49,16 +47,7 @@ public class EsAbortPolicy implements XRejectedExecutionHandler { } } rejected.inc(); - StringBuilder sb = new StringBuilder("rejected execution "); - if (executor.isShutdown()) { - sb.append(SHUTTING_DOWN_KEY + " "); - } else { - if (executor.getQueue() instanceof SizeBlockingQueue) { - sb.append("(queue capacity ").append(((SizeBlockingQueue) executor.getQueue()).capacity()).append(") "); - } - } - sb.append("on ").append(r.toString()); - throw new EsRejectedExecutionException(sb.toString()); + throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 92e8d7d095f..c7cc07c3d45 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -54,30 +54,30 @@ public class EsExecutors { return settings.getAsInt(PROCESSORS, defaultValue); } - public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) { - return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); + public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) { + return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); } - public static EsThreadPoolExecutor newScaling(int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue - EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); + EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); queue.executor = executor; return executor; } - public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); + public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); } - public static EsThreadPoolExecutor newFixed(int size, int queueCapacity, ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) { BlockingQueue queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity); } - return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); + return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); } public static String threadName(Settings settings, String ... names) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java index 2aec22c04ec..d75b3ffa8c2 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -28,17 +29,25 @@ import java.io.IOException; /** */ public class EsRejectedExecutionException extends ElasticsearchException { + private final boolean isExecutorShutdown; + + public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { + super(message); + this.isExecutorShutdown = isExecutorShutdown; + } public EsRejectedExecutionException(String message) { - super(message); + this(message, false); } public EsRejectedExecutionException() { super((String)null); + this.isExecutorShutdown = false; } public EsRejectedExecutionException(Throwable e) { super(null, e); + this.isExecutorShutdown = false; } @Override @@ -48,5 +57,24 @@ public class EsRejectedExecutionException extends ElasticsearchException { public EsRejectedExecutionException(StreamInput in) throws IOException{ super(in); + isExecutorShutdown = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(isExecutorShutdown); + } + + /** + * Checks if the thread pool that rejected the execution was terminated + * shortly after the rejection. Its possible that this returns false and the + * thread pool has since been terminated but if this returns false then the + * termination wasn't a factor in this rejection. Conversely if this returns + * true the shutdown was probably a factor in this rejection but might have + * been triggered just after the action rejection. + */ + public boolean isExecutorShutdown() { + return isExecutorShutdown; } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 9cfb6875993..1e6743e9004 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -33,13 +33,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private volatile ShutdownListener listener; private final Object monitor = new Object(); + /** + * Name used in error reporting. + */ + private final String name; - EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); + EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); } - EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { + EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + this.name = name; } public void shutdown(ShutdownListener listener) { @@ -93,4 +98,31 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { } } } + + @Override + public String toString() { + /* + * ThreadPoolExecutor has some nice information in its toString but we + * can't recreate it without nastier hacks than this. + */ + String tpeToString = super.toString(); + int startOfInfoInTpeToString = tpeToString.indexOf('['); + String tpeInfo; + if (startOfInfoInTpeToString >= 0) { + tpeInfo = tpeToString.substring(startOfInfoInTpeToString + 1); + } else { + assert false: "Unsupported ThreadPoolExecutor toString"; + tpeInfo = tpeToString; + } + StringBuilder b = new StringBuilder(); + b.append(getClass().getSimpleName()).append('['); + b.append(name).append(", "); + if (getQueue() instanceof SizeBlockingQueue) { + @SuppressWarnings("rawtypes") + SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); + b.append("queue capacity = ").append(queue.capacity()).append(", "); + } + b.append("state = ").append(tpeInfo); + return b.toString(); + } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 65998b57cc7..38c0cb23234 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -41,8 +41,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private AtomicLong insertionOrder = new AtomicLong(); private Queue current = ConcurrentCollections.newQueue(); - PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); + PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); } public Pending[] getPending() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index f9cf98f86fa..b16b616515c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -136,7 +136,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest.class, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - unicastConnectExecutor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); + unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 72d7c77e881..c3d167082ff 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -125,9 +125,11 @@ public class RecoverySettings extends AbstractComponent implements Closeable { this.concurrentStreams = settings.getAsInt("indices.recovery.concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3)); - this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); + this.concurrentStreamPool = EsExecutors.newScaling("recovery_stream", 0, concurrentStreams, 60, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2)); - this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); + this.concurrentSmallFileStreamPool = EsExecutors.newScaling("small_file_recovery_stream", 0, concurrentSmallFileStreams, 60, + TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB))); if (maxBytesPerSec.bytes() <= 0) { diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f6e359baafb..7c01367c016 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -336,7 +336,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); } - Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); @@ -371,7 +371,7 @@ public class ThreadPool extends AbstractComponent { int size = settings.getAsInt("size", defaultSize); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); - Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); + Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); } else if ("scaling".equals(type)) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); @@ -415,7 +415,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } - Executor executor = EsExecutors.newScaling(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); } throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index cc7a8fd81a8..2cd4168b28d 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -80,7 +80,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); - this.workers = EsExecutors.newFixed(workerCount, queueSize, threadFactory); + this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory); } @Override diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 7c0891fac6e..7c9355e4191 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.test.ESTestCase; import org.junit.Test; @@ -27,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -40,7 +42,7 @@ public class EsExecutorsTests extends ESTestCase { } public void testFixedForcedExecution() throws Exception { - EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -102,7 +104,7 @@ public class EsExecutorsTests extends ESTestCase { } public void testFixedRejected() throws Exception { - EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -160,7 +162,7 @@ public class EsExecutorsTests extends ESTestCase { final int max = between(min + 1, 6); final ThreadBarrier barrier = new ThreadBarrier(max + 1); - ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); + ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -196,7 +198,7 @@ public class EsExecutorsTests extends ESTestCase { final int max = between(min + 1, 6); final ThreadBarrier barrier = new ThreadBarrier(max + 1); - final ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); + final ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -233,4 +235,77 @@ public class EsExecutorsTests extends ESTestCase { }); terminate(pool); } + + public void testRejectionMessageAndShuttingDownFlag() throws InterruptedException { + int pool = between(1, 10); + int queue = between(0, 100); + int actions = queue + pool; + final CountDownLatch latch = new CountDownLatch(1); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy")); + try { + for (int i = 0; i < actions; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Doesn't matter is going to be rejected + } + + @Override + public String toString() { + return "dummy runnable"; + } + }); + fail("Didn't get a rejection when we expected one."); + } catch (EsRejectedExecutionException e) { + assertFalse("Thread pool registering as terminated when it isn't", e.isExecutorShutdown()); + String message = ExceptionsHelper.detailedMessage(e); + assertThat(message, containsString("of dummy runnable")); + assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage")); + assertThat(message, containsString("queue capacity = " + queue)); + assertThat(message, containsString("state = Running")); + assertThat(message, containsString("active threads = " + pool)); + assertThat(message, containsString("queued tasks = " + queue)); + assertThat(message, containsString("completed tasks = 0")); + } + } finally { + latch.countDown(); + terminate(executor); + } + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Doesn't matter is going to be rejected + } + + @Override + public String toString() { + return "dummy runnable"; + } + }); + fail("Didn't get a rejection when we expected one."); + } catch (EsRejectedExecutionException e) { + assertTrue("Thread pool not registering as terminated when it is", e.isExecutorShutdown()); + String message = ExceptionsHelper.detailedMessage(e); + assertThat(message, containsString("of dummy runnable")); + assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName())); + assertThat(message, containsString("queue capacity = " + queue)); + assertThat(message, containsString("state = Terminated")); + assertThat(message, containsString("active threads = 0")); + assertThat(message, containsString("queued tasks = 0")); + assertThat(message, containsString("completed tasks = " + actions)); + } + } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 0620f2f91be..ef1c0a9d7ec 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -61,7 +61,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -91,7 +91,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testExecutePrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -121,7 +121,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithCallables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -151,7 +151,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithMixed() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -182,7 +182,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testTimeout() throws Exception { ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1); executor.execute(new Runnable() { @@ -246,7 +246,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { ThreadPool threadPool = new ThreadPool("test"); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final AtomicBoolean timeoutCalled = new AtomicBoolean(); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); final CountDownLatch invoked = new CountDownLatch(1); executor.execute(new Runnable() { @Override diff --git a/core/src/test/java/org/elasticsearch/test/ESTestCase.java b/core/src/test/java/org/elasticsearch/test/ESTestCase.java index 462a98b03e4..1694ecf2e4f 100644 --- a/core/src/test/java/org/elasticsearch/test/ESTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESTestCase.java @@ -537,7 +537,7 @@ public abstract class ESTestCase extends LuceneTestCase { @Override public void uncaughtException(Thread t, Throwable e) { if (e instanceof EsRejectedExecutionException) { - if (e.getMessage() != null && e.getMessage().contains(EsAbortPolicy.SHUTTING_DOWN_KEY)) { + if (e.getMessage() != null && ((EsRejectedExecutionException) e).isExecutorShutdown()) { return; // ignore the EsRejectedExecutionException when a node shuts down } } else if (e instanceof OutOfMemoryError) { diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index ee3071cce66..19fe03ce988 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -314,7 +314,7 @@ public final class InternalTestCluster extends TestCluster { // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); defaultSettings = builder.build(); - executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); + executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); } public static String nodeMode() { From 804f14c68eced5b6a827753dcb9b1bf5cb1f44cf Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 30 Jul 2015 13:52:58 -0400 Subject: [PATCH 08/16] Core: Remove nasty hack in toString This makes the output of EsThreadPoolExecutor#toString less pretty but we no longer have funky hacky that rely on the specific format of the toString produced by ThreadPoolExecutor which isn't part of its API and could change with any JVM version and break the output. --- .../util/concurrent/EsThreadPoolExecutor.java | 19 +++++-------------- .../util/concurrent/EsExecutorsTests.java | 4 ++-- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 1e6743e9004..4c02aab1fe8 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -101,19 +101,6 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { @Override public String toString() { - /* - * ThreadPoolExecutor has some nice information in its toString but we - * can't recreate it without nastier hacks than this. - */ - String tpeToString = super.toString(); - int startOfInfoInTpeToString = tpeToString.indexOf('['); - String tpeInfo; - if (startOfInfoInTpeToString >= 0) { - tpeInfo = tpeToString.substring(startOfInfoInTpeToString + 1); - } else { - assert false: "Unsupported ThreadPoolExecutor toString"; - tpeInfo = tpeToString; - } StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()).append('['); b.append(name).append(", "); @@ -122,7 +109,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); b.append("queue capacity = ").append(queue.capacity()).append(", "); } - b.append("state = ").append(tpeInfo); + /* + * ThreadPoolExecutor has some nice information in its toString but we + * can't get at it easily without just getting the toString. + */ + b.append(super.toString()).append(']'); return b.toString(); } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 7c9355e4191..c7406aa9511 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -274,7 +274,7 @@ public class EsExecutorsTests extends ESTestCase { assertThat(message, containsString("of dummy runnable")); assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage")); assertThat(message, containsString("queue capacity = " + queue)); - assertThat(message, containsString("state = Running")); + assertThat(message, containsString("[Running")); assertThat(message, containsString("active threads = " + pool)); assertThat(message, containsString("queued tasks = " + queue)); assertThat(message, containsString("completed tasks = 0")); @@ -302,7 +302,7 @@ public class EsExecutorsTests extends ESTestCase { assertThat(message, containsString("of dummy runnable")); assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName())); assertThat(message, containsString("queue capacity = " + queue)); - assertThat(message, containsString("state = Terminated")); + assertThat(message, containsString("[Terminated")); assertThat(message, containsString("active threads = 0")); assertThat(message, containsString("queued tasks = 0")); assertThat(message, containsString("completed tasks = " + actions)); From abf763c1c50d085e21fb1bd4ddde6d8f1ba53895 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 5 Aug 2015 12:28:47 +0200 Subject: [PATCH 09/16] Rethrow exception during recovery finalization even if source if not broken Today we miss to throw / rethrow an recovery exception if it happens during the finalization of phase 1 if the source files are not affected. Even worse this can cause some dataloss if the reason for this exception is a failure of deleting a corruption marker or similar pre-existing corruptions since we continue with the recovery and mark the target shared as started which will in-turn open an engine with an empty index. --- .../indices/recovery/RecoverySourceHandler.java | 1 + .../elasticsearch/indices/recovery/RecoveryTarget.java | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 572b784093e..295ab49ac7f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -435,6 +435,7 @@ public class RecoverySourceHandler { exception.addSuppressed(remoteException); logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK", corruptIndexException, shard.shardId(), request.targetNode()); + throw exception; } else { throw remoteException; } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 0388265e64c..a953206fa89 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -406,9 +406,13 @@ public class RecoveryTarget extends AbstractComponent { logger.debug("Failed to clean lucene index", e); ex.addSuppressed(e); } - throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); + RecoveryFailedException rfe = new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); + recoveryStatus.fail(rfe, true); + throw rfe; } catch (Exception ex) { - throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); + RecoveryFailedException rfe = new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); + recoveryStatus.fail(rfe, true); + throw rfe; } channel.sendResponse(TransportResponse.Empty.INSTANCE); } From 0b9729af5b1e40faf6f283ada2a90af20d879393 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 5 Aug 2015 16:01:07 +0200 Subject: [PATCH 10/16] Use explict flag if index should be created on engine creation Today we try to detect if there is an index existing in the directory and if not we create one. This can be tricky and errof prone since we rely on the filesystem without taking the context into account when the engine gets created. We know in all situations if the index should be created so we can just use this infromation and rely on the lucene index writer to barf if we hit a situations where we can't append to an index while we should. --- .../index/engine/EngineConfig.java | 18 +++++++++++++++++- .../index/engine/InternalEngine.java | 5 +---- .../elasticsearch/index/shard/IndexShard.java | 14 ++++++-------- .../index/shard/ShadowIndexShard.java | 1 + .../index/shard/StoreRecoveryService.java | 4 ++-- .../index/shard/TranslogRecoveryPerformer.java | 2 +- .../indices/recovery/RecoveryTarget.java | 2 +- .../index/engine/InternalEngineTests.java | 9 ++++++++- .../index/engine/ShadowEngineTests.java | 6 ++++++ 9 files changed, 43 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 778509a97dd..91725899c17 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -135,7 +135,7 @@ public final class EngineConfig { private static final String DEFAULT_CODEC_NAME = "default"; private TranslogConfig translogConfig; - + private boolean create = false; /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} @@ -433,4 +433,20 @@ public final class EngineConfig { public TranslogConfig getTranslogConfig() { return translogConfig; } + + /** + * Iff set to true the engine will create a new lucene index when opening the engine. + * Otherwise the lucene index writer is opened in append mode. The default is false + */ + public void setCreate(boolean create) { + this.create = create; + } + + /** + * Iff true the engine should create a new lucene index when opening the engine. + * Otherwise the lucene index writer should be opened in append mode. The default is false + */ + public boolean isCreate() { + return create; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fe8fdab090f..597cf9b8a05 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -118,14 +118,11 @@ public class InternalEngine extends Engine { for (int i = 0; i < dirtyLocks.length; i++) { dirtyLocks[i] = new Object(); } - throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); final Translog.TranslogGeneration translogGeneration; try { - // TODO: would be better if ES could tell us "from above" whether this shard was already here, instead of using Lucene's API - // (which relies on IO ops, directory listing, and has had scary bugs in the past): - boolean create = !Lucene.indexExists(store.directory()); + final boolean create = engineConfig.isCreate(); writer = createWriter(create); indexWriter = writer; translog = openTranslog(engineConfig, writer, create || skipInitialTranslogRecovery || engineConfig.forceNewTranslog()); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c4eecad452c..bb681586781 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -828,14 +828,13 @@ public class IndexShard extends AbstractIndexShardComponent { /** * After the store has been recovered, we need to start the engine in order to apply operations */ - public Map performTranslogRecovery() { - final Map recoveredTypes = internalPerformTranslogRecovery(false); + public Map performTranslogRecovery(boolean indexExists) { + final Map recoveredTypes = internalPerformTranslogRecovery(false, indexExists); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); return recoveredTypes; - } - private Map internalPerformTranslogRecovery(boolean skipTranslogRecovery) { + private Map internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -852,6 +851,7 @@ public class IndexShard extends AbstractIndexShardComponent { // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering engineConfig.setEnableGcDeletes(false); + engineConfig.setCreate(indexExists == false); createNewEngine(skipTranslogRecovery, engineConfig); return engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes(); } @@ -860,12 +860,10 @@ public class IndexShard extends AbstractIndexShardComponent { * After the store has been recovered, we need to start the engine. This method starts a new engine but skips * the replay of the transaction log which is required in cases where we restore a previous index or recover from * a remote peer. - * - * @param wipeTranslogs if set to true all skipped / uncommitted translogs are removed. */ - public void skipTranslogRecovery(boolean wipeTranslogs) throws IOException { + public void skipTranslogRecovery() throws IOException { assert engineUnsafe() == null : "engine was already created"; - Map recoveredTypes = internalPerformTranslogRecovery(true); + Map recoveredTypes = internalPerformTranslogRecovery(true, true); assert recoveredTypes.isEmpty(); assert recoveryState.getTranslog().recoveredOperations() == 0; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 7224e701751..9e8776d1b1e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -104,6 +104,7 @@ public final class ShadowIndexShard extends IndexShard { protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig config) { assert this.shardRouting.primary() == false; assert skipInitialTranslogRecovery : "can not recover from gateway"; + config.setCreate(false); // hardcoded - we always expect an index to be present return engineFactory.newReadOnlyEngine(config); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java index 14b27efc8e9..e291589d614 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java @@ -246,7 +246,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements recoveryState.getTranslog().totalOperations(0); recoveryState.getTranslog().totalOperationsOnStart(0); } - typesToUpdate = indexShard.performTranslogRecovery(); + typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists); indexShard.finalizeRecovery(); String indexName = indexShard.shardId().index().name(); @@ -318,7 +318,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); } indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState); - indexShard.skipTranslogRecovery(true); + indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId); diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index 8738e0abdf5..46c03de09ce 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -204,7 +204,7 @@ public class TranslogRecoveryPerformer { query = queryParserService.parseQuery(source).query(); } catch (QueryParsingException ex) { // for BWC we try to parse directly the query since pre 1.0.0.Beta2 we didn't require a top level query field - if ( queryParserService.getIndexCreatedVersion().onOrBefore(Version.V_1_0_0_Beta2)) { + if (queryParserService.getIndexCreatedVersion().onOrBefore(Version.V_1_0_0_Beta2)) { try { XContentParser parser = XContentHelper.createParser(source); ParsedQuery parse = queryParserService.parse(parser); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index a953206fa89..4e641b83362 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -274,7 +274,7 @@ public class RecoveryTarget extends AbstractComponent { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final RecoveryStatus recoveryStatus = statusRef.status(); recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps()); - recoveryStatus.indexShard().skipTranslogRecovery(false); + recoveryStatus.indexShard().skipTranslogRecovery(); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 52de4859ffb..bad431f67e5 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -39,6 +39,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -256,7 +257,11 @@ public class InternalEngineTests extends ESTestCase { // we don't need to notify anybody in this test } }, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig); - + try { + config.setCreate(Lucene.indexExists(store.directory()) == false); + } catch (IOException e) { + throw new ElasticsearchException("can't find index?", e); + } return config; } @@ -775,6 +780,7 @@ public class InternalEngineTests extends ESTestCase { // this so we have to disable the check explicitly directory.setPreventDoubleWrite(false); } + config.setCreate(false); engine = new InternalEngine(config, false); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -1869,6 +1875,7 @@ public class InternalEngineTests extends ESTestCase { parser.mappingUpdate = dynamicUpdate(); engine.close(); + engine.config().setCreate(false); engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work try (Engine.Searcher searcher = engine.acquireSearcher("test")) { diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 1634d21ee34..7b45a3b90cd 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -29,6 +29,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; @@ -226,6 +227,11 @@ public class ShadowEngineTests extends ESTestCase { public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); + try { + config.setCreate(Lucene.indexExists(store.directory()) == false); + } catch (IOException e) { + throw new ElasticsearchException("can't find index?", e); + } return config; } From 69be7f77fc54d3f46837b8a90e8d81fa29eef834 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 5 Aug 2015 15:30:13 -0400 Subject: [PATCH 11/16] Prevent DirectCandidateGenerator to reuse an unclosed analyzer When postFilter generates a token that is identical to the input term DirectCandidateGenerator should not preFilter this token. If postFilter and preFilter are the same analyzer instance it would fail with : "TokenStream contract violation: close() call missing" This is a forward port of @nomoa's #12670 --- .../search/suggest/phrase/DirectCandidateGenerator.java | 4 +++- .../suggest/phrase/NoisyChannelSpellCheckerTests.java | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/suggest/phrase/DirectCandidateGenerator.java b/core/src/main/java/org/elasticsearch/search/suggest/phrase/DirectCandidateGenerator.java index d97f7cf44e5..8af181f0e63 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/phrase/DirectCandidateGenerator.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/phrase/DirectCandidateGenerator.java @@ -151,7 +151,9 @@ public final class DirectCandidateGenerator extends CandidateGenerator { if (posIncAttr.getPositionIncrement() > 0 && result.get().bytesEquals(candidate.term)) { BytesRef term = result.toBytesRef(); - long freq = frequency(term); + // We should not use frequency(term) here because it will analyze the term again + // If preFilter and postFilter are the same analyzer it would fail. + long freq = internalFrequency(term); candidates.add(new Candidate(result.toBytesRef(), freq, candidate.stringDistance, score(candidate.frequency, candidate.stringDistance, dictSize), false)); } else { candidates.add(new Candidate(result.toBytesRef(), candidate.frequency, nonErrorLikelihood, score(candidate.frequency, candidate.stringDistance, dictSize), false)); diff --git a/core/src/test/java/org/elasticsearch/search/suggest/phrase/NoisyChannelSpellCheckerTests.java b/core/src/test/java/org/elasticsearch/search/suggest/phrase/NoisyChannelSpellCheckerTests.java index 574e1635ab6..878a9a35b96 100644 --- a/core/src/test/java/org/elasticsearch/search/suggest/phrase/NoisyChannelSpellCheckerTests.java +++ b/core/src/test/java/org/elasticsearch/search/suggest/phrase/NoisyChannelSpellCheckerTests.java @@ -278,8 +278,12 @@ public class NoisyChannelSpellCheckerTests extends ESTestCase { assertThat(corrections.length, equalTo(1)); assertThat(corrections[0].join(new BytesRef(" ")).utf8ToString(), equalTo("xorr the god jewel")); + // Test a special case where one of the suggest term is unchanged by the postFilter, 'II' here is unchanged by the reverse analyzer. + corrections = suggester.getCorrections(wrapper, new BytesRef("Quazar II"), generator, 1, 1, ir, "body", wordScorer, 1, 2).corrections; + assertThat(corrections.length, equalTo(1)); + assertThat(corrections[0].join(new BytesRef(" ")).utf8ToString(), equalTo("quasar ii")); } - + @Test public void testMarvelHerosTrigram() throws IOException { From 15e5247e03bbf7b69ac547c2bad3d17a89149c61 Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Wed, 5 Aug 2015 20:24:36 -0400 Subject: [PATCH 12/16] Get plugin smoketester running in jenkins. We have a smoke_test_plugins.py, but its a bit slow, not integrated into our build, etc. I converted this into an integration test. It is definitely uglier but more robust and fast (e.g. 20 seconds time to verify). Also there is refactoring of existing integ tests logic, like printing out commands we execute and stuff --- dev-tools/smoke_test_plugins.py | 172 ---------- .../main/resources/ant/integration-tests.xml | 66 ++-- pom.xml | 1 + qa/pom.xml | 320 ++++++++++++++++++ qa/smoke-test-plugins/integration-tests.xml | 42 +++ qa/smoke-test-plugins/pom.xml | 238 +++++++++++++ .../test/smoke_test_plugins/10_basic.yaml | 13 + .../smoketest/SmokeTestPluginsIT.java | 41 +++ 8 files changed, 701 insertions(+), 192 deletions(-) delete mode 100644 dev-tools/smoke_test_plugins.py create mode 100644 qa/pom.xml create mode 100644 qa/smoke-test-plugins/integration-tests.xml create mode 100644 qa/smoke-test-plugins/pom.xml create mode 100644 qa/smoke-test-plugins/rest-api-spec/test/smoke_test_plugins/10_basic.yaml create mode 100644 qa/smoke-test-plugins/src/test/java/org/elasticsearch/smoketest/SmokeTestPluginsIT.java diff --git a/dev-tools/smoke_test_plugins.py b/dev-tools/smoke_test_plugins.py deleted file mode 100644 index da6c2c95209..00000000000 --- a/dev-tools/smoke_test_plugins.py +++ /dev/null @@ -1,172 +0,0 @@ -# Licensed to Elasticsearch under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on -# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -# either express or implied. See the License for the specific -# language governing permissions and limitations under the License. - -import datetime -import traceback -import json -import os -import shutil -import signal -import socket -import subprocess -import tempfile -import threading -import time - -from http.client import HTTPConnection - -LOG = os.environ.get('ES_SMOKE_TEST_PLUGINS_LOG', '/tmp/elasticsearch_smoke_test_plugins.log') - -print('Logging to %s' % LOG) - -if os.path.exists(LOG): - raise RuntimeError('please remove old log %s first' % LOG) - -try: - JAVA_HOME = os.environ['JAVA7_HOME'] -except KeyError: - try: - JAVA_HOME = os.environ['JAVA_HOME'] - except KeyError: - raise RuntimeError(""" - Please set JAVA_HOME in the env before running release tool - On OSX use: export JAVA_HOME=`/usr/libexec/java_home -v '1.7*'`""") - -JAVA_ENV = 'export JAVA_HOME="%s" PATH="%s/bin:$PATH" JAVACMD="%s/bin/java"' % (JAVA_HOME, JAVA_HOME, JAVA_HOME) - -try: - # make sure mvn3 is used if mvn3 is available - # some systems use maven 2 as default - subprocess.check_output('mvn3 --version', shell=True, stderr=subprocess.STDOUT) - MVN = 'mvn3' -except subprocess.CalledProcessError: - MVN = 'mvn' - -def log(msg): - f = open(LOG, mode='ab') - f.write(('\n'+msg).encode('utf-8')) - f.close() - -def run(command, quiet=False): - log('%s: RUN: %s\n' % (datetime.datetime.now(), command)) - if os.system('%s >> %s 2>&1' % (command, LOG)): - msg = ' FAILED: %s [see log %s]' % (command, LOG) - if not quiet: - print(msg) - raise RuntimeError(msg) - -def readServerOutput(p, startupEvent, failureEvent): - try: - while True: - line = p.stdout.readline() - if len(line) == 0: - p.poll() - if not startupEvent.isSet(): - failureEvent.set() - startupEvent.set() - print('ES: **process exit**\n') - break - line = line.decode('utf-8').rstrip() - if line.endswith('started') and not startupEvent.isSet(): - startupEvent.set() - print('ES: %s' % line) - except: - print() - print('Exception reading Elasticsearch output:') - traceback.print_exc() - failureEvent.set() - startupEvent.set() - -if __name__ == '__main__': - print('Build release bits...') - - run('%s; %s clean package -DskipTests' % (JAVA_ENV, MVN)) - - for f in os.listdir('distribution/tar/target/releases/'): - if f.endswith('.tar.gz'): - artifact = f - break - else: - raise RuntimeError('could not find elasticsearch release under distribution/tar/target/releases/') - - tmp_dir = tempfile.mkdtemp() - p = None - try: - # Extract artifact: - run('tar -xzf distribution/tar/target/releases/%s -C %s' % (artifact, tmp_dir)) - es_install_dir = os.path.join(tmp_dir, artifact[:-7]) - es_plugin_path = os.path.join(es_install_dir, 'bin/plugin') - installed_plugin_names = set() - print('Find plugins:') - for name in os.listdir('plugins'): - if name not in ('target', 'pom.xml'): - url = 'file://%s/plugins/%s/target/releases/elasticsearch-%s-2.0.0-beta1-SNAPSHOT.zip' % (os.path.abspath('.'), name, name) - print(' install plugin %s...' % name) - run('%s; %s install %s --url %s' % (JAVA_ENV, es_plugin_path, name, url)) - installed_plugin_names.add(name) - - print('Start Elasticsearch') - - env = os.environ.copy() - env['JAVA_HOME'] = JAVA_HOME - env['PATH'] = '%s/bin:%s' % (JAVA_HOME, env['PATH']) - env['JAVA_CMD'] = '%s/bin/java' % JAVA_HOME - - startupEvent = threading.Event() - failureEvent = threading.Event() - p = subprocess.Popen(('%s/bin/elasticsearch' % es_install_dir, - '-Des.node.name=smoke_tester', - '-Des.cluster.name=smoke_tester_cluster' - '-Des.discovery.zen.ping.multicast.enabled=false', - '-Des.logger.level=debug', - '-Des.script.inline=on', - '-Des.script.indexed=on'), - stdout = subprocess.PIPE, - stderr = subprocess.STDOUT, - env = env) - thread = threading.Thread(target=readServerOutput, args=(p, startupEvent, failureEvent)) - thread.setDaemon(True) - thread.start() - - startupEvent.wait(1200) - if failureEvent.isSet(): - raise RuntimeError('ES failed to start') - - print('Confirm plugins are installed') - conn = HTTPConnection('127.0.0.1', 9200, 20); - conn.request('GET', '/_nodes?plugin=true&pretty=true') - res = conn.getresponse() - if res.status == 200: - nodes = json.loads(res.read().decode("utf-8"))['nodes'] - for _, node in nodes.items(): - node_plugins = node['plugins'] - for node_plugin in node_plugins: - plugin_name = node_plugin['name'] - if plugin_name not in installed_plugin_names: - raise RuntimeError('Unexpeced plugin %s' % plugin_name) - installed_plugin_names.remove(plugin_name) - if len(installed_plugin_names) > 0: - raise RuntimeError('Plugins not loaded %s' % installed_plugin_names) - else: - raise RuntimeError('Expected HTTP 200 but got %s' % res.status) - finally: - if p is not None: - try: - os.kill(p.pid, signal.SIGKILL) - except ProcessLookupError: - pass - shutil.rmtree(tmp_dir) - diff --git a/dev-tools/src/main/resources/ant/integration-tests.xml b/dev-tools/src/main/resources/ant/integration-tests.xml index 4cd6b4728f8..8378fff5012 100644 --- a/dev-tools/src/main/resources/ant/integration-tests.xml +++ b/dev-tools/src/main/resources/ant/integration-tests.xml @@ -15,13 +15,6 @@ - - - @@ -37,14 +30,19 @@ - + + + + execute: ${script.base} @{args} + + - + @@ -119,31 +117,60 @@ + + + + + + + + + + + + + Starting up external cluster... - + + + + + - + - - + - External cluster started PID ${integ.pid} + External cluster started PID @{es.pidfile} @@ -162,12 +189,11 @@ - - + + - - + + diff --git a/pom.xml b/pom.xml index 188ed85015e..1e7ca1a6c75 100644 --- a/pom.xml +++ b/pom.xml @@ -1463,5 +1463,6 @@ org.eclipse.jdt.ui.text.custom_code_templates=core distribution plugins + qa diff --git a/qa/pom.xml b/qa/pom.xml new file mode 100644 index 00000000000..a5d68c1beaf --- /dev/null +++ b/qa/pom.xml @@ -0,0 +1,320 @@ + + + + 4.0.0 + + org.elasticsearch.qa + elasticsearch-qa + 2.0.0-beta1-SNAPSHOT + pom + QA: Parent POM + 2015 + + + org.elasticsearch + elasticsearch-parent + 2.0.0-beta1-SNAPSHOT + + + + + + + + + org.hamcrest + hamcrest-all + test + + + org.apache.lucene + lucene-test-framework + test + + + org.elasticsearch + elasticsearch + test-jar + test + + + + + org.elasticsearch + elasticsearch + provided + + + org.apache.lucene + lucene-core + provided + + + org.apache.lucene + lucene-backward-codecs + provided + + + org.apache.lucene + lucene-analyzers-common + provided + + + org.apache.lucene + lucene-queries + provided + + + org.apache.lucene + lucene-memory + provided + + + org.apache.lucene + lucene-highlighter + provided + + + org.apache.lucene + lucene-queryparser + provided + + + org.apache.lucene + lucene-suggest + provided + + + org.apache.lucene + lucene-join + provided + + + org.apache.lucene + lucene-spatial + provided + + + org.apache.lucene + lucene-expressions + provided + + + com.spatial4j + spatial4j + provided + + + com.vividsolutions + jts + provided + + + com.github.spullara.mustache.java + compiler + provided + + + com.google.guava + guava + provided + + + com.carrotsearch + hppc + provided + + + joda-time + joda-time + provided + + + org.joda + joda-convert + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + com.fasterxml.jackson.dataformat + jackson-dataformat-smile + provided + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + provided + + + com.fasterxml.jackson.dataformat + jackson-dataformat-cbor + provided + + + io.netty + netty + provided + + + com.ning + compress-lzf + provided + + + com.tdunning + t-digest + provided + + + org.apache.commons + commons-lang3 + provided + + + commons-cli + commons-cli + provided + + + org.codehaus.groovy + groovy-all + indy + provided + + + log4j + log4j + provided + + + log4j + apache-log4j-extras + provided + + + org.slf4j + slf4j-api + provided + + + net.java.dev.jna + jna + provided + + + + + + org.apache.httpcomponents + httpclient + test + + + + + + + + src/main/resources + true + + **/*.properties + + + + + + + src/test/java + + **/*.json + **/*.txt + + + + src/test/resources + + elasticsearch.yml + **/*.properties + + + + src/test/resources + true + + elasticsearch.yml + **/*.properties + + + + + ${project.basedir}/rest-api-spec + true + rest-api-spec + + api/*.json + test/**/*.yaml + + + + + ${elasticsearch.tools.directory}/rest-api-spec + rest-api-spec + + + api/info.json + api/cluster.health.json + api/cluster.state.json + + api/index.json + api/get.json + api/update.json + api/search.json + api/indices.analyze.json + api/indices.create.json + api/indices.refresh.json + api/nodes.info.json + api/count.json + + + + + ${elasticsearch.tools.directory}/shared-test-resources + false + + + + + + + com.carrotsearch.randomizedtesting + junit4-maven-plugin + + + integ-tests + + + 1 + + + 127.0.0.1:${integ.transport.port} + + + + + + + + + + + smoke-test-plugins + + diff --git a/qa/smoke-test-plugins/integration-tests.xml b/qa/smoke-test-plugins/integration-tests.xml new file mode 100644 index 00000000000..d00d8c37bab --- /dev/null +++ b/qa/smoke-test-plugins/integration-tests.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/qa/smoke-test-plugins/pom.xml b/qa/smoke-test-plugins/pom.xml new file mode 100644 index 00000000000..417dcdc767a --- /dev/null +++ b/qa/smoke-test-plugins/pom.xml @@ -0,0 +1,238 @@ + + + + 4.0.0 + + + org.elasticsearch.qa + elasticsearch-qa + 2.0.0-beta1-SNAPSHOT + + + + + smoke-test-plugins + QA: Smoke Test Plugins + Loads up all of our plugins + + + true + ${project.basedir}/integration-tests.xml + smoke_test_plugins + false + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + integ-setup-dependencies + pre-integration-test + + copy + + + + true + ${integ.deps}/plugins + + + + + org.elasticsearch.distribution.zip + elasticsearch + ${elasticsearch.version} + zip + true + ${integ.deps} + + + + + org.elasticsearch.plugin + elasticsearch-analysis-kuromoji + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-analysis-smartcn + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-analysis-stempel + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-analysis-phonetic + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-analysis-icu + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-cloud-gce + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-cloud-azure + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-cloud-aws + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-site-example + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-lang-python + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-lang-javascript + ${elasticsearch.version} + zip + true + + + + org.elasticsearch.plugin + elasticsearch-delete-by-query + ${elasticsearch.version} + zip + true + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + count-expected-plugins + validate + + run + + + + + + + + Found ${expected.plugin.count} plugins in ${plugins.dir} + + true + + + + + integ-setup + pre-integration-test + + run + + + + + + + + + + + + + integ-teardown + post-integration-test + + run + + + + + + + + + + + ant-contrib + ant-contrib + 1.0b3 + + + ant + ant + + + + + org.apache.ant + ant-nodeps + 1.8.1 + + + + + + + diff --git a/qa/smoke-test-plugins/rest-api-spec/test/smoke_test_plugins/10_basic.yaml b/qa/smoke-test-plugins/rest-api-spec/test/smoke_test_plugins/10_basic.yaml new file mode 100644 index 00000000000..dbb09225fce --- /dev/null +++ b/qa/smoke-test-plugins/rest-api-spec/test/smoke_test_plugins/10_basic.yaml @@ -0,0 +1,13 @@ +# Integration tests for smoke testing plugins +# +"Correct Plugin Count": + - do: + cluster.state: {} + + # Get master node id + - set: { master_node: master } + + - do: + nodes.info: {} + + - length: { nodes.$master.plugins: ${expected.plugin.count} } diff --git a/qa/smoke-test-plugins/src/test/java/org/elasticsearch/smoketest/SmokeTestPluginsIT.java b/qa/smoke-test-plugins/src/test/java/org/elasticsearch/smoketest/SmokeTestPluginsIT.java new file mode 100644 index 00000000000..6e0243b2a04 --- /dev/null +++ b/qa/smoke-test-plugins/src/test/java/org/elasticsearch/smoketest/SmokeTestPluginsIT.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.smoketest; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.RestTestCandidate; +import org.elasticsearch.test.rest.parser.RestTestParseException; + +import java.io.IOException; + +public class SmokeTestPluginsIT extends ESRestTestCase { + + public SmokeTestPluginsIT(@Name("yaml") RestTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, RestTestParseException { + return ESRestTestCase.createParameters(0, 1); + } +} + From c62f0655d353a878074bcbdf530797650d5a3acd Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Wed, 5 Aug 2015 20:34:24 -0400 Subject: [PATCH 13/16] fix/cleanup pidfile stuff --- dev-tools/src/main/resources/ant/integration-tests.xml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dev-tools/src/main/resources/ant/integration-tests.xml b/dev-tools/src/main/resources/ant/integration-tests.xml index 8378fff5012..9f6c58d7ab8 100644 --- a/dev-tools/src/main/resources/ant/integration-tests.xml +++ b/dev-tools/src/main/resources/ant/integration-tests.xml @@ -52,9 +52,10 @@ + - + @@ -169,8 +170,8 @@ - - External cluster started PID @{es.pidfile} + + External cluster started PID ${integ.pid} @@ -207,7 +208,7 @@ - + Shutting down external cluster PID ${integ.pid} From 5fe49e4420bfd6dda9f229b01faed0ba300909c0 Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Wed, 5 Aug 2015 21:45:09 -0400 Subject: [PATCH 14/16] cleanup skipping everywhere --- .../main/resources/ant/integration-tests.xml | 25 ++++++++----------- distribution/rpm/pom.xml | 2 +- plugins/pom.xml | 2 ++ qa/smoke-test-plugins/pom.xml | 4 ++- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/dev-tools/src/main/resources/ant/integration-tests.xml b/dev-tools/src/main/resources/ant/integration-tests.xml index 9f6c58d7ab8..42e7cd943c3 100644 --- a/dev-tools/src/main/resources/ant/integration-tests.xml +++ b/dev-tools/src/main/resources/ant/integration-tests.xml @@ -1,11 +1,6 @@ - - - - - @@ -176,7 +171,7 @@ - + @@ -198,7 +193,7 @@ - + @@ -225,7 +220,7 @@ - + - + - + - + - + @@ -275,12 +270,12 @@ - + - + @@ -315,7 +310,7 @@ - + diff --git a/distribution/rpm/pom.xml b/distribution/rpm/pom.xml index 86da5bba0f2..e390c4a7121 100644 --- a/distribution/rpm/pom.xml +++ b/distribution/rpm/pom.xml @@ -24,7 +24,7 @@ - true + true ${project.build.directory}/releases/ diff --git a/plugins/pom.xml b/plugins/pom.xml index 2d8e9387d9c..2f0b9879151 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -309,6 +309,7 @@ run + ${skip.integ.tests} @@ -322,6 +323,7 @@ run + ${skip.integ.tests} diff --git a/qa/smoke-test-plugins/pom.xml b/qa/smoke-test-plugins/pom.xml index 417dcdc767a..b3cf1b6a12d 100644 --- a/qa/smoke-test-plugins/pom.xml +++ b/qa/smoke-test-plugins/pom.xml @@ -45,7 +45,7 @@ copy - + ${skip.integ.tests} true ${integ.deps}/plugins @@ -197,6 +197,7 @@ + ${skip.integ.tests} @@ -210,6 +211,7 @@ + ${skip.integ.tests} From 354a32b05d38550d1fc8f8e9a5c137806b6d0618 Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Wed, 5 Aug 2015 22:08:12 -0400 Subject: [PATCH 15/16] remove annoying maven warning --- distribution/shaded/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distribution/shaded/pom.xml b/distribution/shaded/pom.xml index 3163a8053b3..63f12ec365b 100644 --- a/distribution/shaded/pom.xml +++ b/distribution/shaded/pom.xml @@ -74,7 +74,7 @@ false true true - ${build.directory}/dependency-reduced-pom.xml + ${project.build.directory}/dependency-reduced-pom.xml org.apache.lucene:* From d0abffc9acb9afddc83ae99ae17848b813fd918f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 6 Aug 2015 10:35:12 +0200 Subject: [PATCH 16/16] Add unittest for DiskThresholdDecider#getShardSize / #sizeOfRelocatingShards This commit adds a basic unittest for the shard size routines and simplifies some object creation. --- .../elasticsearch/cluster/ClusterInfo.java | 10 ++- .../cluster/ClusterInfoService.java | 2 - .../cluster/EmptyClusterInfoService.java | 13 +-- .../cluster/InternalClusterInfoService.java | 20 +++-- .../decider/DiskThresholdDecider.java | 10 +-- .../cluster/routing/ShardRoutingHelper.java | 38 +++++++++ .../allocation/BalanceConfigurationTests.java | 3 +- .../RandomAllocationDeciderTests.java | 3 +- .../decider/DiskThresholdDeciderTests.java | 20 ++--- .../DiskThresholdDeciderUnitTests.java | 79 +++++++++++++++---- .../indices/state/RareClusterStateIT.java | 5 +- .../test/ESAllocationTestCase.java | 3 +- 12 files changed, 140 insertions(+), 66 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index ae1228cc119..5e2d35a9818 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -19,9 +19,9 @@ package org.elasticsearch.cluster; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.cluster.routing.ShardRouting; +import java.util.Collections; import java.util.Map; /** @@ -30,10 +30,16 @@ import java.util.Map; * InternalClusterInfoService.shardIdentifierFromRouting(String) * for the key used in the shardSizes map */ -public class ClusterInfo { +public final class ClusterInfo { private final Map usages; final Map shardSizes; + public static final ClusterInfo EMPTY = new ClusterInfo(); + + private ClusterInfo() { + this.usages = Collections.emptyMap(); + this.shardSizes = Collections.emptyMap(); + } public ClusterInfo(Map usages, Map shardSizes) { this.usages = usages; diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java index e17b2326386..d4ceb844ec2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java @@ -25,8 +25,6 @@ package org.elasticsearch.cluster; */ public interface ClusterInfoService { - public static ClusterInfoService EMPTY = EmptyClusterInfoService.getInstance(); - /** The latest cluster information */ public ClusterInfo getClusterInfo(); diff --git a/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java index 3267a9b1185..89a0e9193b8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java @@ -27,24 +27,15 @@ import org.elasticsearch.common.settings.Settings; * ClusterInfoService that provides empty maps for disk usage and shard sizes */ public class EmptyClusterInfoService extends AbstractComponent implements ClusterInfoService { - - private final static class Holder { - private final static EmptyClusterInfoService instance = new EmptyClusterInfoService(); - } - private final ClusterInfo emptyClusterInfo; + public final static EmptyClusterInfoService INSTANCE = new EmptyClusterInfoService(); private EmptyClusterInfoService() { super(Settings.EMPTY); - emptyClusterInfo = new ClusterInfo(ImmutableMap.of(), ImmutableMap.of()); - } - - public static EmptyClusterInfoService getInstance() { - return Holder.instance; } @Override public ClusterInfo getClusterInfo() { - return emptyClusterInfo; + return ClusterInfo.EMPTY; } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index e77818e66a1..567812e7d58 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -32,7 +31,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -66,8 +64,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu private volatile TimeValue updateFrequency; - private volatile ImmutableMap usages; - private volatile ImmutableMap shardSizes; + private volatile Map usages; + private volatile Map shardSizes; private volatile boolean isMaster = false; private volatile boolean enabled; private volatile TimeValue fetchTimeout; @@ -83,8 +81,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService, ThreadPool threadPool) { super(settings); - this.usages = ImmutableMap.of(); - this.shardSizes = ImmutableMap.of(); + this.usages = Collections.emptyMap(); + this.shardSizes = Collections.emptyMap(); this.transportNodesStatsAction = transportNodesStatsAction; this.transportIndicesStatsAction = transportIndicesStatsAction; this.clusterService = clusterService; @@ -201,7 +199,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu } Map newUsages = new HashMap<>(usages); newUsages.remove(removedNode.getId()); - usages = ImmutableMap.copyOf(newUsages); + usages = Collections.unmodifiableMap(newUsages); } } } @@ -332,7 +330,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu newUsages.put(nodeId, new DiskUsage(nodeId, nodeName, total, available)); } } - usages = ImmutableMap.copyOf(newUsages); + usages = Collections.unmodifiableMap(newUsages); } @Override @@ -348,7 +346,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); } // we empty the usages list, to be safe - we don't know what's going on. - usages = ImmutableMap.of(); + usages = Collections.emptyMap(); } } }); @@ -366,7 +364,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu } newShardSizes.put(sid, size); } - shardSizes = ImmutableMap.copyOf(newShardSizes); + shardSizes = Collections.unmodifiableMap(newShardSizes); } @Override @@ -382,7 +380,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); } // we empty the usages list, to be safe - we don't know what's going on. - shardSizes = ImmutableMap.of(); + shardSizes = Collections.emptyMap(); } } }); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index b7189908fb7..2a438de800f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -226,7 +227,7 @@ public class DiskThresholdDecider extends AllocationDecider { // It's okay the Client is null here, because the empty cluster info // service will never actually call the listener where the client is // needed. Also this constructor is only used for tests - this(settings, new NodeSettingsService(settings), ClusterInfoService.EMPTY, null); + this(settings, new NodeSettingsService(settings), EmptyClusterInfoService.INSTANCE, null); } @Inject @@ -312,7 +313,7 @@ public class DiskThresholdDecider extends AllocationDecider { * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size * of all shards */ - public long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway) { + public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway) { long totalSize = 0; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { if (routing.initializing() && routing.relocatingNodeId() != null) { @@ -324,7 +325,7 @@ public class DiskThresholdDecider extends AllocationDecider { return totalSize; } - private long getShardSize(ShardRouting routing, ClusterInfo clusterInfo) { + static long getShardSize(ShardRouting routing, ClusterInfo clusterInfo) { Long shardSize = clusterInfo.getShardSize(routing); return shardSize == null ? 0 : shardSize; } @@ -419,8 +420,7 @@ public class DiskThresholdDecider extends AllocationDecider { } // Secondly, check that allocating the shard to this node doesn't put it above the high watermark - Long shardSize = allocation.clusterInfo().getShardSize(shardRouting); - shardSize = shardSize == null ? 0 : shardSize; + final long shardSize = getShardSize(shardRouting, allocation.clusterInfo()); double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java new file mode 100644 index 00000000000..f2eb15ae3b1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +/** + * A helper class that allows access to package private APIs for testing. + */ +public class ShardRoutingHelper { + + public static void relocate(ShardRouting routing, String nodeId) { + routing.relocate(nodeId); + } + + public static void moveToStarted(ShardRouting routing) { + routing.moveToStarted(); + } + + public static void initialize(ShardRouting routing, String nodeId) { + routing.initialize(nodeId); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index a8311ab606c..c5c96b0e8af 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -408,7 +409,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { unassigned.clear(); return changed; } - }), ClusterInfoService.EMPTY); + }), EmptyClusterInfoService.INSTANCE); MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); IndexMetaData.Builder indexMeta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index 8ebe7e7de36..5ddb87864d1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData.Builder; @@ -61,7 +62,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom()); AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), - randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), ClusterInfoService.EMPTY); + randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); int indices = scaledRandomIntBetween(1, 20); Builder metaBuilder = MetaData.builder(); int maxNumReplicas = 1; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index c86eba95685..52280eb2a1c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; @@ -44,10 +43,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.shard.ShardId; import org.junit.Test; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; +import java.util.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -77,7 +73,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); - final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -272,7 +268,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); - final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -334,7 +330,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Make node without the primary now habitable to replicas usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", 100, 35)); // 65% used - final ClusterInfo clusterInfo2 = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); cis = new ClusterInfoService() { @Override public ClusterInfo getClusterInfo() { @@ -533,7 +529,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -600,7 +596,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -704,7 +700,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { shardSizes.put("[test][0][r]", 14L); shardSizes.put("[test2][0][p]", 1L); // 1 bytes shardSizes.put("[test2][0][r]", 1L); - final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -807,7 +803,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 40L); shardSizes.put("[test][1][p]", 40L); - final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); MetaData metaData = MetaData.builder() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 7b0f8386474..0be13948e42 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -19,17 +19,24 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import com.google.common.collect.ImmutableMap; - +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ESTestCase; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -44,19 +51,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { public void testDynamicSettings() { NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - Map usages = new HashMap<>(); - Map shardSizes = new HashMap<>(); - return new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); - } - - @Override - public void addListener(Listener listener) { - // noop - } - }; + ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); assertThat(decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test"))); @@ -94,4 +89,56 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { assertFalse("relocations should now be disabled", decider.isIncludeRelocations()); } + public void testShardSizeAndRelocatingSize() { + Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][r]", 10L); + shardSizes.put("[test][1][r]", 100L); + shardSizes.put("[test][2][r]", 1000L); + shardSizes.put("[other][0][p]", 10000L); + ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, shardSizes); + ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(test_0, "node1"); + ShardRoutingHelper.moveToStarted(test_0); + ShardRoutingHelper.relocate(test_0, "node2"); + + ShardRouting test_1 = ShardRouting.newUnassigned("test", 1, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(test_1, "node2"); + ShardRoutingHelper.moveToStarted(test_1); + ShardRoutingHelper.relocate(test_1, "node1"); + + ShardRouting test_2 = ShardRouting.newUnassigned("test", 2, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(test_2, "node1"); + ShardRoutingHelper.moveToStarted(test_2); + + assertEquals(1000l, DiskThresholdDecider.getShardSize(test_2, info)); + assertEquals(100l, DiskThresholdDecider.getShardSize(test_1, info)); + assertEquals(10l, DiskThresholdDecider.getShardSize(test_0, info)); + + RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2)); + assertEquals(100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false)); + assertEquals(90l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true)); + + ShardRouting test_3 = ShardRouting.newUnassigned("test", 3, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(test_3, "node1"); + ShardRoutingHelper.moveToStarted(test_3); + assertEquals(0l, DiskThresholdDecider.getShardSize(test_3, info)); + + + ShardRouting other_0 = ShardRouting.newUnassigned("other", 0, null, randomBoolean(), new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(other_0, "node2"); + ShardRoutingHelper.moveToStarted(other_0); + ShardRoutingHelper.relocate(other_0, "node1"); + + + node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard())); + if (other_0.primary()) { + assertEquals(10100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false)); + assertEquals(10090l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true)); + } else { + assertEquals(100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false)); + assertEquals(90l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true)); + } + + } + } diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 24a438bbe65..6ea7e07c887 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.state; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; @@ -92,9 +91,7 @@ public class RareClusterStateIT extends ESIntegTestCase { .nodes(DiscoveryNodes.EMPTY_NODES) .build() ); - ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.of(), ImmutableMap.of()); - - RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), clusterInfo); + RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY); allocator.allocateUnassigned(routingAllocation); } diff --git a/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java b/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java index 533dfd80eaa..69c2db63a29 100644 --- a/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -66,7 +67,7 @@ public abstract class ESAllocationTestCase extends ESTestCase { public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { return new AllocationService(settings, randomAllocationDeciders(settings, nodeSettingsService, random), - new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), ClusterInfoService.EMPTY); + new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); }