Add Upgrade API

This commit does the following:
* Add the new API at the rest layer, being backed by the optimize API
  with upgrade flag, and segments api to find upgrade status.
* Add `upgrade` flag to optimize API, and deprecate `force` flag (will
  remove in master)
* Add test for both synchronous and async upgrade

closes #7884
closes #7922
This commit is contained in:
Ryan Ernst 2014-10-07 08:09:50 -07:00
parent 555bfcb02b
commit c021f22523
14 changed files with 541 additions and 79 deletions

View File

@ -37,8 +37,7 @@ deletes. Defaults to `false`. Note that this won't override the
to `true`. Note, a merge can potentially be a very heavy operation, so
it might make sense to run it set to `false`.
`force`:: Force a merge operation, even if there is a single segment in the
shard with no deletions.
`force`:: deprecated[1.4.0, Use the upgrade API]
[float]
[[optimize-multi-index]]

View File

@ -0,0 +1,46 @@
[[indices-upgrade]]
== Upgrade
The upgrade API allows to upgrade one or more indices to the latest format
through an API. The upgrade process converts any segments written
with previous formats.
[source,js]
=== Start an upgrade
--------------------------------------------------
$ curl -XPOST 'http://localhost:9200/twitter/_upgrade'
--------------------------------------------------
Note that upgrading is an I/O intensive operation, and is limited to processing
a single shard per node at a time. It also is not allowed to run at the same time
as optimize.
[float]
[[upgrade-parameters]]
==== Request Parameters
The upgrade API accepts the following request parameters:
[horizontal]
`wait_for_completion`:: Should the request wait the upgrade to complete. Defaults
to `false`.
=== Check upgrade status
Use a `GET` request to monitor how much of an index is upgraded. This
can also be used prior to starting an upgrade to identify which indices
you want to upgrade at the same time.
--------------------------------------------------
$ curl 'http://localhost:9200/twitter/_upgrade?human'
--------------------------------------------------
[source,js]
--------------------------------------------------
{
"twitter": {
"size": "21gb",
"size_in_bytes": "21000000000",
"size_to_upgrade": "10gb",
"size_to_upgrade_in_bytes": "10000000000"
}
}
--------------------------------------------------

View File

@ -0,0 +1,37 @@
{
"indices.upgrade": {
"documentation": "http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/indices-upgrade.html",
"methods": ["POST", "GET"],
"url": {
"path": "/_upgrade",
"paths": ["/_upgrade", "/{index}/_upgrade"],
"parts": {
"index": {
"type" : "list",
"description" : "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices"
}
},
"params": {
"ignore_unavailable": {
"type" : "boolean",
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"allow_no_indices": {
"type" : "boolean",
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
},
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"wait_for_completion": {
"type" : "boolean",
"description" : "Specify whether the request should block until the all segments are upgraded (default: true)"
}
}
},
"body": null
}
}

View File

@ -47,14 +47,14 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean FORCE = false;
public static final boolean UPGRADE = false;
}
private boolean waitForMerge = Defaults.WAIT_FOR_MERGE;
private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean force = Defaults.FORCE;
private boolean upgrade = Defaults.UPGRADE;
/**
* Constructs an optimization request over one or more indices.
@ -134,18 +134,35 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
}
/**
* Should the merge be forced even if there is a single segment with no deletions in the shard.
* Defaults to <tt>false</tt>.
* @deprecated See {@link #upgrade()}
*/
@Deprecated
public boolean force() {
return force;
return upgrade;
}
/**
* See #force().
* @deprecated Use {@link #upgrade(boolean)}.
*/
@Deprecated
public OptimizeRequest force(boolean force) {
this.force = force;
this.upgrade = force;
return this;
}
/**
* Should the merge upgrade all old segments to the current index format.
* Defaults to <tt>false</tt>.
*/
public boolean upgrade() {
return upgrade;
}
/**
* See {@link #upgrade()}
*/
public OptimizeRequest upgrade(boolean upgrade) {
this.upgrade = upgrade;
return this;
}
@ -156,7 +173,7 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
force = in.readBoolean();
upgrade = in.readBoolean();
}
}
@ -167,7 +184,7 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(force);
out.writeBoolean(upgrade);
}
}
}

View File

@ -37,7 +37,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
private int maxNumSegments = OptimizeRequest.Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = OptimizeRequest.Defaults.FLUSH;
private boolean force = OptimizeRequest.Defaults.FORCE;
private boolean upgrade = OptimizeRequest.Defaults.UPGRADE;
ShardOptimizeRequest() {
}
@ -48,7 +48,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
maxNumSegments = request.maxNumSegments();
onlyExpungeDeletes = request.onlyExpungeDeletes();
flush = request.flush();
force = request.force();
upgrade = request.force() || request.upgrade();
}
boolean waitForMerge() {
@ -67,8 +67,8 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
return flush;
}
public boolean force() {
return force;
public boolean upgrade() {
return upgrade;
}
@Override
@ -79,7 +79,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
force = in.readBoolean();
upgrade = in.readBoolean();
}
}
@ -91,7 +91,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(force);
out.writeBoolean(upgrade);
}
}
}

View File

@ -113,7 +113,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
.maxNumSegments(request.maxNumSegments())
.onlyExpungeDeletes(request.onlyExpungeDeletes())
.flush(request.flush())
.force(request.force())
.upgrade(request.upgrade())
);
return new ShardOptimizeResponse(request.shardId());
}

View File

@ -303,7 +303,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private int maxNumSegments = -1;
private boolean onlyExpungeDeletes = false;
private boolean flush = false;
private boolean force = false;
private boolean upgrade = false;
public Optimize() {
}
@ -344,18 +344,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this;
}
public boolean force() {
return force;
public boolean upgrade() {
return upgrade;
}
public Optimize force(boolean force) {
this.force = force;
public Optimize upgrade(boolean upgrade) {
this.upgrade = upgrade;
return this;
}
@Override
public String toString() {
return "waitForMerge[" + waitForMerge + "], maxNumSegments[" + maxNumSegments + "], onlyExpungeDeletes[" + onlyExpungeDeletes + "], flush[" + flush + "], force[" + force + "]";
return "waitForMerge[" + waitForMerge + "], maxNumSegments[" + maxNumSegments + "], onlyExpungeDeletes[" + onlyExpungeDeletes + "], flush[" + flush + "], upgrade[" + upgrade + "]";
}
}

View File

@ -1025,27 +1025,22 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override
public void optimize(Optimize optimize) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
try (InternalLock _ = readLock.acquire()) {
final IndexWriter writer = currentIndexWriter();
if (writer.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) {
elasticsearchMergePolicy = (ElasticsearchMergePolicy) writer.getConfig().getMergePolicy();
}
if (optimize.force() && elasticsearchMergePolicy == null) {
throw new ElasticsearchIllegalStateException("The `force` flag can only be used if the merge policy is an instance of "
+ ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + writer.getConfig().getMergePolicy().getClass().getName() + "]");
}
/*
* The way we implement "forced forced merges" is a bit hackish in the sense that we set an instance variable and that this
* setting will thus apply to all forced merges that will be run until `force` is set back to false. However, since
* InternalEngine.optimize is the only place in code where we call forceMerge and since calls are protected with
* `optimizeMutex`, this has the expected behavior.
* The way we implement upgrades is a bit hackish in the sense that we set an instance
* variable and that this setting will thus apply to the next forced merge that will be run.
* This is ok because (1) this is the only place we call forceMerge, (2) we have a single
* thread for optimize, and the 'optimizeMutex' guarding this code, and (3) ConcurrentMergeScheduler
* syncs calls to findForcedMerges.
*/
if (optimize.force()) {
elasticsearchMergePolicy.setForce(true);
MergePolicy mp = writer.getConfig().getMergePolicy();
assert mp instanceof ElasticsearchMergePolicy : "MergePolicy is " + mp.getClass().getName();
if (optimize.upgrade()) {
((ElasticsearchMergePolicy)mp).setUpgradeInProgress(true);
}
if (optimize.onlyExpungeDeletes()) {
writer.forceMergeDeletes(false);
} else if (optimize.maxNumSegments() <= 0) {
@ -1058,9 +1053,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
maybeFailEngine(t, "optimize");
throw new OptimizeFailedEngineException(shardId, t);
} finally {
if (elasticsearchMergePolicy != null) {
elasticsearchMergePolicy.setForce(false);
}
optimizeMutex.set(false);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.packed.GrowableWriter;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.Version;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
@ -40,7 +41,7 @@ import java.util.List;
import java.util.Map;
/**
* A {@link MergePolicy} that upgrades segments and can force merges.
* A {@link MergePolicy} that upgrades segments and can upgrade merges.
* <p>
* It can be useful to use the background merging process to upgrade segments,
* for example when we perform internal changes that imply different index
@ -54,7 +55,8 @@ import java.util.Map;
public final class ElasticsearchMergePolicy extends MergePolicy {
private final MergePolicy delegate;
private volatile boolean force;
private volatile boolean upgradeInProgress;
private static final int MAX_CONCURRENT_UPGRADE_MERGES = 5;
/** @param delegate the merge policy to wrap */
public ElasticsearchMergePolicy(MergePolicy delegate) {
@ -196,22 +198,30 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
throws IOException {
MergeSpecification spec = delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);
if (spec == null && force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(spec);
if (upgradeInProgress) {
MergeSpecification spec = new IndexUpgraderMergeSpecification();
for (SegmentCommitInfo info : segmentInfos) {
if (Version.CURRENT.luceneVersion.minor > info.info.getVersion().minor) {
// TODO: Use IndexUpgradeMergePolicy instead. We should be comparing codecs,
// for now we just assume every minor upgrade has a new format.
spec.add(new OneMerge(Lists.newArrayList(info)));
}
if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) {
// hit our max upgrades, so return the spec. we will get a cascaded call to continue.
return spec;
}
}
// We must have less than our max upgrade merges, so the next return will be our last in upgrading mode.
upgradeInProgress = false;
if (spec.merges.isEmpty() == false) {
return spec;
}
// fall through, so when we don't have any segments to upgrade, the delegate policy
// has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount)
}
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
}
@Override
@ -226,12 +236,13 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
}
/**
* When <code>force</code> is true, running a force merge will cause a merge even if there
* is a single segment in the directory. This will apply to all calls to
* {@link IndexWriter#forceMerge} that are handled by this {@link MergePolicy}.
* When <code>upgrade</code> is true, running a force merge will upgrade any segments written
* with older versions. This will apply to the next call to
* {@link IndexWriter#forceMerge} that is handled by this {@link MergePolicy}, as well as
* cascading calls made by {@link IndexWriter}.
*/
public void setForce(boolean force) {
this.force = force;
public void setUpgradeInProgress(boolean upgrade) {
this.upgradeInProgress = upgrade;
}
@Override

View File

@ -23,8 +23,8 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.admin.indices.upgrade.RestUpgradeAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.exists.RestExistsAction;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
@ -191,6 +191,7 @@ public class RestActionModule extends AbstractModule {
bind(RestRefreshAction.class).asEagerSingleton();
bind(RestFlushAction.class).asEagerSingleton();
bind(RestOptimizeAction.class).asEagerSingleton();
bind(RestUpgradeAction.class).asEagerSingleton();
bind(RestClearIndicesCacheAction.class).asEagerSingleton();
bind(RestIndexAction.class).asEagerSingleton();

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.upgrade;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.segments.*;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.action.support.RestActions.buildBroadcastShardsHeader;
public class RestUpgradeAction extends BaseRestHandler {
@Inject
public RestUpgradeAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(POST, "/_upgrade", this);
controller.registerHandler(POST, "/{index}/_upgrade", this);
controller.registerHandler(GET, "/_upgrade", this);
controller.registerHandler(GET, "/{index}/_upgrade", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
if (request.method().equals(RestRequest.Method.GET)) {
handleGet(request, channel, client);
} else if (request.method().equals(RestRequest.Method.POST)) {
handlePost(request, channel, client);
}
}
void handleGet(RestRequest request, RestChannel channel, Client client) {
IndicesSegmentsRequest segsReq = new IndicesSegmentsRequest(Strings.splitStringByCommaToArray(request.param("index")));
client.admin().indices().segments(segsReq, new RestBuilderListener<IndicesSegmentResponse>(channel) {
@Override
public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
// TODO: getIndices().values() is what IndecesSegmentsResponse uses, but this will produce different orders with jdk8?
for (IndexSegments indexSegments : response.getIndices().values()) {
Tuple<Long, Long> summary = calculateUpgradeStatus(indexSegments);
builder.startObject(indexSegments.getIndex());
builder.byteSizeField(SIZE_IN_BYTES, SIZE, summary.v1());
builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, summary.v2());
builder.endObject();
}
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
}
void handlePost(RestRequest request, RestChannel channel, Client client) {
OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeReq.waitForMerge(request.paramAsBoolean("wait_for_completion", false));
optimizeReq.flush(true);
optimizeReq.upgrade(true);
optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment
client.admin().indices().optimize(optimizeReq, new RestBuilderListener<OptimizeResponse>(channel) {
@Override
public RestResponse buildResponse(OptimizeResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
buildBroadcastShardsHeader(builder, response);
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
}
Tuple<Long, Long> calculateUpgradeStatus(IndexSegments indexSegments) {
long total_bytes = 0;
long to_upgrade_bytes = 0;
for (IndexShardSegments shard : indexSegments) {
for (ShardSegments segs : shard.getShards()) {
for (Segment seg : segs.getSegments()) {
total_bytes += seg.sizeInBytes;
if (seg.version.minor != Version.CURRENT.luceneVersion.minor) {
// TODO: this comparison is bogus! it would cause us to upgrade even with the same format
// instead, we should check if the codec has changed
to_upgrade_bytes += seg.sizeInBytes;
}
}
}
}
return new Tuple<>(total_bytes, to_upgrade_bytes);
}
// this is a silly class which should just be a standalone function, but java doesn't even have a standard Pair that could
// be used to return 2 values from a function...
static class UpgradeSummary {
public long total_bytes;
public long to_upgrade_bytes;
UpgradeSummary(IndexSegments indexSegments) {
}
}
static final XContentBuilderString SIZE = new XContentBuilderString("size");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade");
static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes");
}

View File

@ -422,18 +422,12 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// we could have multiple underlying merges, so the generation may increase more than once
assertTrue(store.readLastCommittedSegmentsInfo().getGeneration() > gen1);
// forcing an optimize will merge this single segment shard
final boolean force = randomBoolean();
if (force) {
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
}
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).force(force).waitForMerge(false));
engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).waitForMerge(false));
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), force ? notNullValue() : nullValue());
assertThat(segment.getMergeId(), nullValue());
}
waitForMerge.get().countDown();

View File

@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.upgrade;
import com.google.common.base.Predicate;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.impl.client.HttpClients;
import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.elasticsearch.test.rest.client.RestResponse;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.elasticsearch.test.rest.client.http.HttpResponse;
import org.elasticsearch.test.rest.json.JsonPath;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
@Override
protected int minExternalNodes() {
return 2;
}
public void testUpgrade() throws Exception {
if (backwardsCluster().numNewDataNodes() == 0) {
backwardsCluster().startNewNode();
}
int numIndexes = randomIntBetween(2, 4);
String[] indexNames = new String[numIndexes];
for (int i = 0; i < numIndexes; ++i) {
String indexName = "test" + i;
indexNames[i] = indexName;
Settings settings = ImmutableSettings.builder()
.put("index.routing.allocation.exclude._name", backwardsCluster().newNodePattern())
// don't allow any merges so that we can check segments are upgraded
// by the upgrader, and not just regular merging
.put("index.merge.policy.segments_per_tier", 1000000f)
.put(indexSettings())
.build();
assertAcked(prepareCreate(indexName).setSettings(settings));
ensureGreen(indexName);
assertAllShardsOnNodes(indexName, backwardsCluster().backwardsNodePattern());
int numDocs = scaledRandomIntBetween(100, 1000);
List<IndexRequestBuilder> builder = new ArrayList<>();
for (int j = 0; j < numDocs; ++j) {
String id = Integer.toString(j);
builder.add(client().prepareIndex(indexName, "type1", id).setSource("text", "sometext"));
}
indexRandom(true, builder);
ensureGreen(indexName);
flushAndRefresh();
}
backwardsCluster().allowOnAllNodes(indexNames);
backwardsCluster().upgradeAllNodes();
ensureGreen();
checkNotUpgraded("/_upgrade");
final String indexToUpgrade = "test" + randomInt(numIndexes - 1);
runUpgrade("/" + indexToUpgrade + "/_upgrade");
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
try {
return isUpgraded("/" + indexToUpgrade + "/_upgrade");
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
});
runUpgrade("/_upgrade", "wait_for_completion", "true");
checkUpgraded("/_upgrade");
}
void checkNotUpgraded(String path) throws Exception {
for (UpgradeStatus status : getUpgradeStatus(path)) {
assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0);
assertTrue("total bytes must be >= upgrade bytes", status.totalBytes >= status.toUpgradeBytes);
assertEquals("index " + status.indexName + " should need upgrading",
status.totalBytes, status.toUpgradeBytes);
}
}
void checkUpgraded(String path) throws Exception {
for (UpgradeStatus status : getUpgradeStatus(path)) {
assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0);
assertTrue("total bytes must be >= upgrade bytes", status.totalBytes >= status.toUpgradeBytes);
assertEquals("index " + status.indexName + " should need upgrading",
0, status.toUpgradeBytes);
}
}
boolean isUpgraded(String path) throws Exception {
int toUpgrade = 0;
for (UpgradeStatus status : getUpgradeStatus(path)) {
logger.info("Index: " + status.indexName + ", total: " + status.totalBytes + ", toUpgrade: " + status.toUpgradeBytes);
toUpgrade += status.toUpgradeBytes;
}
return toUpgrade == 0;
}
class UpgradeStatus {
public final String indexName;
public final int totalBytes;
public final int toUpgradeBytes;
public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes) {
this.indexName = indexName;
this.totalBytes = totalBytes;
this.toUpgradeBytes = toUpgradeBytes;
}
}
void runUpgrade(String path, String... params) throws Exception {
assert params.length % 2 == 0;
HttpRequestBuilder builder = httpClient().method("POST").path(path);
for (int i = 0; i < params.length; i += 2) {
builder.addParam(params[i], params[i + 1]);
}
HttpResponse rsp = builder.execute();
assertNotNull(rsp);
assertEquals(200, rsp.getStatusCode());
}
List<UpgradeStatus> getUpgradeStatus(String path) throws Exception {
HttpResponse rsp = httpClient().method("GET").path(path).execute();
Map<String,Object> data = validateAndParse(rsp);
List<UpgradeStatus> ret = new ArrayList<>();
for (String index : data.keySet()) {
Map<String, Object> status = (Map<String,Object>)data.get(index);
assertTrue("missing key size_in_bytes for index " + index, status.containsKey("size_in_bytes"));
Object totalBytes = status.get("size_in_bytes");
assertTrue("size_in_bytes for index " + index + " is not an integer", totalBytes instanceof Integer);
assertTrue("missing key size_to_upgrade_in_bytes for index " + index, status.containsKey("size_to_upgrade_in_bytes"));
Object toUpgradeBytes = status.get("size_to_upgrade_in_bytes");
assertTrue("size_to_upgrade_in_bytes for index " + index + " is not an integer", toUpgradeBytes instanceof Integer);
ret.add(new UpgradeStatus(index, ((Integer)totalBytes).intValue(), ((Integer)toUpgradeBytes).intValue()));
}
return ret;
}
Map<String, Object> validateAndParse(HttpResponse rsp) throws Exception {
assertNotNull(rsp);
assertEquals(200, rsp.getStatusCode());
assertTrue(rsp.hasBody());
return (Map<String,Object>)new JsonPath(rsp.getBody()).evaluate("");
}
HttpRequestBuilder httpClient() {
InetSocketAddress[] addresses = cluster().httpAddresses();
InetSocketAddress address = addresses[randomInt(addresses.length - 1)];
return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort());
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder().put(super.nodeSettings(nodeOrdinal))
.put(InternalNode.HTTP_ENABLED, true).build();
}
}

View File

@ -34,6 +34,7 @@ import org.junit.Before;
import org.junit.Ignore;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import static org.hamcrest.Matchers.is;
@ -92,14 +93,40 @@ public abstract class ElasticsearchBackwardsCompatIntegrationTest extends Elasti
throw new IllegalArgumentException("Backcompat elasticsearch version must be same major version as current. " +
"backcompat: " + version + ", current: " + Version.CURRENT.toString());
}
File file = new File(path, "elasticsearch-" + version);
if (!file.exists()) {
throw new IllegalArgumentException("Backwards tests location is missing: " + file.getAbsolutePath());
File dir;
if (version == null || version.isEmpty()) {
// choose a random version
// TODO: how can we put the version selected in the repeat test output?
File[] subdirs = new File(path).listFiles(new FileFilter() {
@Override
public boolean accept(File file) {
return file.getName().startsWith("elasticsearch-") && file.isDirectory();
}
});
if (subdirs == null || subdirs.length == 0) {
throw new IllegalArgumentException("Backwards dir " + path + " must be a directory, and contain elasticsearch releases");
}
dir = subdirs[randomInt(subdirs.length - 1)];
version = dir.getName().substring("elasticsearch-".length());
} else {
dir = new File(path, "elasticsearch-" + version);
if (!dir.exists()) {
throw new IllegalArgumentException("Backwards tests location is missing: " + dir.getAbsolutePath());
}
if (!dir.isDirectory()) {
throw new IllegalArgumentException("Backwards tests location is not a directory: " + dir.getAbsolutePath());
}
}
if (!file.isDirectory()) {
throw new IllegalArgumentException("Backwards tests location is not a directory: " + file.getAbsolutePath());
Version v = Version.fromString(version);
if (v == null) {
throw new IllegalArgumentException("Backcompat elasticsearch version could not be parsed: " + version);
}
return file;
if (v.major != Version.CURRENT.major) {
throw new IllegalArgumentException("Backcompat elasticsearch version must be same major version as current. " +
"backcompat: " + version + ", current: " + Version.CURRENT.toString());
}
return dir;
}
public CompositeTestCluster backwardsCluster() {