From c021f22523a69583b94c56921792b727ab4030fb Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Tue, 7 Oct 2014 08:09:50 -0700 Subject: [PATCH] Add Upgrade API This commit does the following: * Add the new API at the rest layer, being backed by the optimize API with upgrade flag, and segments api to find upgrade status. * Add `upgrade` flag to optimize API, and deprecate `force` flag (will remove in master) * Add test for both synchronous and async upgrade closes #7884 closes #7922 --- docs/reference/indices/optimize.asciidoc | 3 +- docs/reference/indices/upgrade.asciidoc | 46 ++++ rest-api-spec/api/indices.upgrade.json | 37 ++++ .../indices/optimize/OptimizeRequest.java | 35 +++- .../optimize/ShardOptimizeRequest.java | 12 +- .../optimize/TransportOptimizeAction.java | 2 +- .../elasticsearch/index/engine/Engine.java | 12 +- .../index/engine/internal/InternalEngine.java | 28 +-- .../policy/ElasticsearchMergePolicy.java | 55 +++-- .../rest/action/RestActionModule.java | 3 +- .../indices/upgrade/RestUpgradeAction.java | 141 +++++++++++++ .../engine/internal/InternalEngineTests.java | 10 +- .../admin/indices/upgrade/UpgradeTest.java | 197 ++++++++++++++++++ ...csearchBackwardsCompatIntegrationTest.java | 39 +++- 14 files changed, 541 insertions(+), 79 deletions(-) create mode 100644 docs/reference/indices/upgrade.asciidoc create mode 100644 rest-api-spec/api/indices.upgrade.json create mode 100644 src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java create mode 100644 src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java diff --git a/docs/reference/indices/optimize.asciidoc b/docs/reference/indices/optimize.asciidoc index abf3db308c9..b3ea5d01069 100644 --- a/docs/reference/indices/optimize.asciidoc +++ b/docs/reference/indices/optimize.asciidoc @@ -37,8 +37,7 @@ deletes. Defaults to `false`. Note that this won't override the to `true`. Note, a merge can potentially be a very heavy operation, so it might make sense to run it set to `false`. -`force`:: Force a merge operation, even if there is a single segment in the -shard with no deletions. +`force`:: deprecated[1.4.0, Use the upgrade API] [float] [[optimize-multi-index]] diff --git a/docs/reference/indices/upgrade.asciidoc b/docs/reference/indices/upgrade.asciidoc new file mode 100644 index 00000000000..67d9fd8ddc2 --- /dev/null +++ b/docs/reference/indices/upgrade.asciidoc @@ -0,0 +1,46 @@ +[[indices-upgrade]] +== Upgrade + +The upgrade API allows to upgrade one or more indices to the latest format +through an API. The upgrade process converts any segments written +with previous formats. + +[source,js] +=== Start an upgrade +-------------------------------------------------- +$ curl -XPOST 'http://localhost:9200/twitter/_upgrade' +-------------------------------------------------- + +Note that upgrading is an I/O intensive operation, and is limited to processing +a single shard per node at a time. It also is not allowed to run at the same time +as optimize. + +[float] +[[upgrade-parameters]] +==== Request Parameters + +The upgrade API accepts the following request parameters: + +[horizontal] +`wait_for_completion`:: Should the request wait the upgrade to complete. Defaults +to `false`. + +=== Check upgrade status +Use a `GET` request to monitor how much of an index is upgraded. This +can also be used prior to starting an upgrade to identify which indices +you want to upgrade at the same time. +-------------------------------------------------- +$ curl 'http://localhost:9200/twitter/_upgrade?human' +-------------------------------------------------- + +[source,js] +-------------------------------------------------- +{ + "twitter": { + "size": "21gb", + "size_in_bytes": "21000000000", + "size_to_upgrade": "10gb", + "size_to_upgrade_in_bytes": "10000000000" + } +} +-------------------------------------------------- diff --git a/rest-api-spec/api/indices.upgrade.json b/rest-api-spec/api/indices.upgrade.json new file mode 100644 index 00000000000..a75bb01e35d --- /dev/null +++ b/rest-api-spec/api/indices.upgrade.json @@ -0,0 +1,37 @@ +{ + "indices.upgrade": { + "documentation": "http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/indices-upgrade.html", + "methods": ["POST", "GET"], + "url": { + "path": "/_upgrade", + "paths": ["/_upgrade", "/{index}/_upgrade"], + "parts": { + "index": { + "type" : "list", + "description" : "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices" + } + }, + "params": { + "ignore_unavailable": { + "type" : "boolean", + "description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)" + }, + "allow_no_indices": { + "type" : "boolean", + "description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)" + }, + "expand_wildcards": { + "type" : "enum", + "options" : ["open","closed"], + "default" : "open", + "description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both." + }, + "wait_for_completion": { + "type" : "boolean", + "description" : "Specify whether the request should block until the all segments are upgraded (default: true)" + } + } + }, + "body": null + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java index 8403c1b0f97..b55ed580d3b 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java @@ -47,14 +47,14 @@ public class OptimizeRequest extends BroadcastOperationRequest public static final int MAX_NUM_SEGMENTS = -1; public static final boolean ONLY_EXPUNGE_DELETES = false; public static final boolean FLUSH = true; - public static final boolean FORCE = false; + public static final boolean UPGRADE = false; } private boolean waitForMerge = Defaults.WAIT_FOR_MERGE; private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; - private boolean force = Defaults.FORCE; + private boolean upgrade = Defaults.UPGRADE; /** * Constructs an optimization request over one or more indices. @@ -134,18 +134,35 @@ public class OptimizeRequest extends BroadcastOperationRequest } /** - * Should the merge be forced even if there is a single segment with no deletions in the shard. - * Defaults to false. + * @deprecated See {@link #upgrade()} */ + @Deprecated public boolean force() { - return force; + return upgrade; } /** - * See #force(). + * @deprecated Use {@link #upgrade(boolean)}. */ + @Deprecated public OptimizeRequest force(boolean force) { - this.force = force; + this.upgrade = force; + return this; + } + + /** + * Should the merge upgrade all old segments to the current index format. + * Defaults to false. + */ + public boolean upgrade() { + return upgrade; + } + + /** + * See {@link #upgrade()} + */ + public OptimizeRequest upgrade(boolean upgrade) { + this.upgrade = upgrade; return this; } @@ -156,7 +173,7 @@ public class OptimizeRequest extends BroadcastOperationRequest onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_1_1_0)) { - force = in.readBoolean(); + upgrade = in.readBoolean(); } } @@ -167,7 +184,7 @@ public class OptimizeRequest extends BroadcastOperationRequest out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); if (out.getVersion().onOrAfter(Version.V_1_1_0)) { - out.writeBoolean(force); + out.writeBoolean(upgrade); } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java index 7ecd654436d..2cf0e7dde38 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java @@ -37,7 +37,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { private int maxNumSegments = OptimizeRequest.Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = OptimizeRequest.Defaults.FLUSH; - private boolean force = OptimizeRequest.Defaults.FORCE; + private boolean upgrade = OptimizeRequest.Defaults.UPGRADE; ShardOptimizeRequest() { } @@ -48,7 +48,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { maxNumSegments = request.maxNumSegments(); onlyExpungeDeletes = request.onlyExpungeDeletes(); flush = request.flush(); - force = request.force(); + upgrade = request.force() || request.upgrade(); } boolean waitForMerge() { @@ -67,8 +67,8 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { return flush; } - public boolean force() { - return force; + public boolean upgrade() { + return upgrade; } @Override @@ -79,7 +79,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_1_1_0)) { - force = in.readBoolean(); + upgrade = in.readBoolean(); } } @@ -91,7 +91,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); if (out.getVersion().onOrAfter(Version.V_1_1_0)) { - out.writeBoolean(force); + out.writeBoolean(upgrade); } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index f9f61884839..ce04ab6d4fb 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -113,7 +113,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction * It can be useful to use the background merging process to upgrade segments, * for example when we perform internal changes that imply different index @@ -54,7 +55,8 @@ import java.util.Map; public final class ElasticsearchMergePolicy extends MergePolicy { private final MergePolicy delegate; - private volatile boolean force; + private volatile boolean upgradeInProgress; + private static final int MAX_CONCURRENT_UPGRADE_MERGES = 5; /** @param delegate the merge policy to wrap */ public ElasticsearchMergePolicy(MergePolicy delegate) { @@ -196,22 +198,30 @@ public final class ElasticsearchMergePolicy extends MergePolicy { public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, IndexWriter writer) throws IOException { - MergeSpecification spec = delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer); - if (spec == null && force) { - List segments = Lists.newArrayList(); - for (SegmentCommitInfo info : segmentInfos) { - if (segmentsToMerge.containsKey(info)) { - segments.add(info); - } - } - if (!segments.isEmpty()) { - spec = new IndexUpgraderMergeSpecification(); - spec.add(new OneMerge(segments)); - return spec; - } - } - return upgradedMergeSpecification(spec); + if (upgradeInProgress) { + MergeSpecification spec = new IndexUpgraderMergeSpecification(); + for (SegmentCommitInfo info : segmentInfos) { + if (Version.CURRENT.luceneVersion.minor > info.info.getVersion().minor) { + // TODO: Use IndexUpgradeMergePolicy instead. We should be comparing codecs, + // for now we just assume every minor upgrade has a new format. + spec.add(new OneMerge(Lists.newArrayList(info))); + } + if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) { + // hit our max upgrades, so return the spec. we will get a cascaded call to continue. + return spec; + } + } + // We must have less than our max upgrade merges, so the next return will be our last in upgrading mode. + upgradeInProgress = false; + if (spec.merges.isEmpty() == false) { + return spec; + } + // fall through, so when we don't have any segments to upgrade, the delegate policy + // has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount) + } + + return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer)); } @Override @@ -226,12 +236,13 @@ public final class ElasticsearchMergePolicy extends MergePolicy { } /** - * When force is true, running a force merge will cause a merge even if there - * is a single segment in the directory. This will apply to all calls to - * {@link IndexWriter#forceMerge} that are handled by this {@link MergePolicy}. + * When upgrade is true, running a force merge will upgrade any segments written + * with older versions. This will apply to the next call to + * {@link IndexWriter#forceMerge} that is handled by this {@link MergePolicy}, as well as + * cascading calls made by {@link IndexWriter}. */ - public void setForce(boolean force) { - this.force = force; + public void setUpgradeInProgress(boolean upgrade) { + this.upgradeInProgress = upgrade; } @Override diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 4fc822a32b5..708d4a67c57 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -23,8 +23,8 @@ import com.google.common.collect.Lists; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.action.admin.indices.upgrade.RestUpgradeAction; import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction; -import org.elasticsearch.rest.action.exists.RestExistsAction; import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction; import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction; import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; @@ -191,6 +191,7 @@ public class RestActionModule extends AbstractModule { bind(RestRefreshAction.class).asEagerSingleton(); bind(RestFlushAction.class).asEagerSingleton(); bind(RestOptimizeAction.class).asEagerSingleton(); + bind(RestUpgradeAction.class).asEagerSingleton(); bind(RestClearIndicesCacheAction.class).asEagerSingleton(); bind(RestIndexAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java new file mode 100644 index 00000000000..9faeafb9287 --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices.upgrade; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; +import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; +import org.elasticsearch.action.admin.indices.segments.*; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.engine.Segment; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.action.support.RestBuilderListener; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.action.support.RestActions.buildBroadcastShardsHeader; + + +public class RestUpgradeAction extends BaseRestHandler { + + @Inject + public RestUpgradeAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(POST, "/_upgrade", this); + controller.registerHandler(POST, "/{index}/_upgrade", this); + + controller.registerHandler(GET, "/_upgrade", this); + controller.registerHandler(GET, "/{index}/_upgrade", this); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + if (request.method().equals(RestRequest.Method.GET)) { + handleGet(request, channel, client); + } else if (request.method().equals(RestRequest.Method.POST)) { + handlePost(request, channel, client); + } + } + + void handleGet(RestRequest request, RestChannel channel, Client client) { + IndicesSegmentsRequest segsReq = new IndicesSegmentsRequest(Strings.splitStringByCommaToArray(request.param("index"))); + client.admin().indices().segments(segsReq, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + + // TODO: getIndices().values() is what IndecesSegmentsResponse uses, but this will produce different orders with jdk8? + for (IndexSegments indexSegments : response.getIndices().values()) { + Tuple summary = calculateUpgradeStatus(indexSegments); + builder.startObject(indexSegments.getIndex()); + builder.byteSizeField(SIZE_IN_BYTES, SIZE, summary.v1()); + builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, summary.v2()); + builder.endObject(); + } + + builder.endObject(); + return new BytesRestResponse(OK, builder); + } + }); + } + + void handlePost(RestRequest request, RestChannel channel, Client client) { + OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index"))); + optimizeReq.waitForMerge(request.paramAsBoolean("wait_for_completion", false)); + optimizeReq.flush(true); + optimizeReq.upgrade(true); + optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment + client.admin().indices().optimize(optimizeReq, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(OptimizeResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + buildBroadcastShardsHeader(builder, response); + builder.endObject(); + return new BytesRestResponse(OK, builder); + } + }); + } + + Tuple calculateUpgradeStatus(IndexSegments indexSegments) { + long total_bytes = 0; + long to_upgrade_bytes = 0; + for (IndexShardSegments shard : indexSegments) { + for (ShardSegments segs : shard.getShards()) { + for (Segment seg : segs.getSegments()) { + total_bytes += seg.sizeInBytes; + if (seg.version.minor != Version.CURRENT.luceneVersion.minor) { + // TODO: this comparison is bogus! it would cause us to upgrade even with the same format + // instead, we should check if the codec has changed + to_upgrade_bytes += seg.sizeInBytes; + } + } + } + } + return new Tuple<>(total_bytes, to_upgrade_bytes); + } + + // this is a silly class which should just be a standalone function, but java doesn't even have a standard Pair that could + // be used to return 2 values from a function... + static class UpgradeSummary { + public long total_bytes; + public long to_upgrade_bytes; + + UpgradeSummary(IndexSegments indexSegments) { + + } + } + + static final XContentBuilderString SIZE = new XContentBuilderString("size"); + static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); + static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade"); + static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes"); +} diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index bbb86a3995b..fcdcca603c2 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -422,18 +422,12 @@ public class InternalEngineTests extends ElasticsearchTestCase { // we could have multiple underlying merges, so the generation may increase more than once assertTrue(store.readLastCommittedSegmentsInfo().getGeneration() > gen1); - // forcing an optimize will merge this single segment shard - final boolean force = randomBoolean(); - if (force) { - waitTillMerge.set(new CountDownLatch(1)); - waitForMerge.set(new CountDownLatch(1)); - } final boolean flush = randomBoolean(); final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration(); - engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).force(force).waitForMerge(false)); + engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).waitForMerge(false)); waitTillMerge.get().await(); for (Segment segment : engine.segments()) { - assertThat(segment.getMergeId(), force ? notNullValue() : nullValue()); + assertThat(segment.getMergeId(), nullValue()); } waitForMerge.get().countDown(); diff --git a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java new file mode 100644 index 00000000000..2132ff90a47 --- /dev/null +++ b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices.upgrade; + +import com.google.common.base.Predicate; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.http.impl.client.HttpClients; +import org.apache.lucene.util.Version; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest; +import org.elasticsearch.test.rest.client.RestResponse; +import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; +import org.elasticsearch.test.rest.client.http.HttpResponse; +import org.elasticsearch.test.rest.json.JsonPath; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { + + @Override + protected int minExternalNodes() { + return 2; + } + + public void testUpgrade() throws Exception { + if (backwardsCluster().numNewDataNodes() == 0) { + backwardsCluster().startNewNode(); + } + + int numIndexes = randomIntBetween(2, 4); + String[] indexNames = new String[numIndexes]; + for (int i = 0; i < numIndexes; ++i) { + String indexName = "test" + i; + indexNames[i] = indexName; + + Settings settings = ImmutableSettings.builder() + .put("index.routing.allocation.exclude._name", backwardsCluster().newNodePattern()) + // don't allow any merges so that we can check segments are upgraded + // by the upgrader, and not just regular merging + .put("index.merge.policy.segments_per_tier", 1000000f) + .put(indexSettings()) + .build(); + + assertAcked(prepareCreate(indexName).setSettings(settings)); + ensureGreen(indexName); + assertAllShardsOnNodes(indexName, backwardsCluster().backwardsNodePattern()); + + int numDocs = scaledRandomIntBetween(100, 1000); + List builder = new ArrayList<>(); + for (int j = 0; j < numDocs; ++j) { + String id = Integer.toString(j); + builder.add(client().prepareIndex(indexName, "type1", id).setSource("text", "sometext")); + } + indexRandom(true, builder); + ensureGreen(indexName); + flushAndRefresh(); + } + backwardsCluster().allowOnAllNodes(indexNames); + backwardsCluster().upgradeAllNodes(); + ensureGreen(); + + checkNotUpgraded("/_upgrade"); + final String indexToUpgrade = "test" + randomInt(numIndexes - 1); + + runUpgrade("/" + indexToUpgrade + "/_upgrade"); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + try { + return isUpgraded("/" + indexToUpgrade + "/_upgrade"); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } + }); + + runUpgrade("/_upgrade", "wait_for_completion", "true"); + checkUpgraded("/_upgrade"); + } + + void checkNotUpgraded(String path) throws Exception { + for (UpgradeStatus status : getUpgradeStatus(path)) { + assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); + assertTrue("total bytes must be >= upgrade bytes", status.totalBytes >= status.toUpgradeBytes); + assertEquals("index " + status.indexName + " should need upgrading", + status.totalBytes, status.toUpgradeBytes); + } + } + + void checkUpgraded(String path) throws Exception { + for (UpgradeStatus status : getUpgradeStatus(path)) { + assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); + assertTrue("total bytes must be >= upgrade bytes", status.totalBytes >= status.toUpgradeBytes); + assertEquals("index " + status.indexName + " should need upgrading", + 0, status.toUpgradeBytes); + } + } + + boolean isUpgraded(String path) throws Exception { + int toUpgrade = 0; + for (UpgradeStatus status : getUpgradeStatus(path)) { + logger.info("Index: " + status.indexName + ", total: " + status.totalBytes + ", toUpgrade: " + status.toUpgradeBytes); + toUpgrade += status.toUpgradeBytes; + } + return toUpgrade == 0; + } + + class UpgradeStatus { + public final String indexName; + public final int totalBytes; + public final int toUpgradeBytes; + + public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes) { + this.indexName = indexName; + this.totalBytes = totalBytes; + this.toUpgradeBytes = toUpgradeBytes; + } + } + + void runUpgrade(String path, String... params) throws Exception { + assert params.length % 2 == 0; + HttpRequestBuilder builder = httpClient().method("POST").path(path); + for (int i = 0; i < params.length; i += 2) { + builder.addParam(params[i], params[i + 1]); + } + HttpResponse rsp = builder.execute(); + assertNotNull(rsp); + assertEquals(200, rsp.getStatusCode()); + } + + List getUpgradeStatus(String path) throws Exception { + HttpResponse rsp = httpClient().method("GET").path(path).execute(); + Map data = validateAndParse(rsp); + List ret = new ArrayList<>(); + for (String index : data.keySet()) { + Map status = (Map)data.get(index); + assertTrue("missing key size_in_bytes for index " + index, status.containsKey("size_in_bytes")); + Object totalBytes = status.get("size_in_bytes"); + assertTrue("size_in_bytes for index " + index + " is not an integer", totalBytes instanceof Integer); + assertTrue("missing key size_to_upgrade_in_bytes for index " + index, status.containsKey("size_to_upgrade_in_bytes")); + Object toUpgradeBytes = status.get("size_to_upgrade_in_bytes"); + assertTrue("size_to_upgrade_in_bytes for index " + index + " is not an integer", toUpgradeBytes instanceof Integer); + ret.add(new UpgradeStatus(index, ((Integer)totalBytes).intValue(), ((Integer)toUpgradeBytes).intValue())); + } + return ret; + } + + Map validateAndParse(HttpResponse rsp) throws Exception { + assertNotNull(rsp); + assertEquals(200, rsp.getStatusCode()); + assertTrue(rsp.hasBody()); + return (Map)new JsonPath(rsp.getBody()).evaluate(""); + } + + HttpRequestBuilder httpClient() { + InetSocketAddress[] addresses = cluster().httpAddresses(); + InetSocketAddress address = addresses[randomInt(addresses.length - 1)]; + return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort()); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(InternalNode.HTTP_ENABLED, true).build(); + } +} diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java index 26486e29d41..3ef743ff140 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Ignore; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import static org.hamcrest.Matchers.is; @@ -92,14 +93,40 @@ public abstract class ElasticsearchBackwardsCompatIntegrationTest extends Elasti throw new IllegalArgumentException("Backcompat elasticsearch version must be same major version as current. " + "backcompat: " + version + ", current: " + Version.CURRENT.toString()); } - File file = new File(path, "elasticsearch-" + version); - if (!file.exists()) { - throw new IllegalArgumentException("Backwards tests location is missing: " + file.getAbsolutePath()); + File dir; + if (version == null || version.isEmpty()) { + // choose a random version + // TODO: how can we put the version selected in the repeat test output? + File[] subdirs = new File(path).listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + return file.getName().startsWith("elasticsearch-") && file.isDirectory(); + } + }); + if (subdirs == null || subdirs.length == 0) { + throw new IllegalArgumentException("Backwards dir " + path + " must be a directory, and contain elasticsearch releases"); + } + dir = subdirs[randomInt(subdirs.length - 1)]; + version = dir.getName().substring("elasticsearch-".length()); + } else { + dir = new File(path, "elasticsearch-" + version); + if (!dir.exists()) { + throw new IllegalArgumentException("Backwards tests location is missing: " + dir.getAbsolutePath()); + } + if (!dir.isDirectory()) { + throw new IllegalArgumentException("Backwards tests location is not a directory: " + dir.getAbsolutePath()); + } } - if (!file.isDirectory()) { - throw new IllegalArgumentException("Backwards tests location is not a directory: " + file.getAbsolutePath()); + + Version v = Version.fromString(version); + if (v == null) { + throw new IllegalArgumentException("Backcompat elasticsearch version could not be parsed: " + version); } - return file; + if (v.major != Version.CURRENT.major) { + throw new IllegalArgumentException("Backcompat elasticsearch version must be same major version as current. " + + "backcompat: " + version + ", current: " + Version.CURRENT.toString()); + } + return dir; } public CompositeTestCluster backwardsCluster() {