diff --git a/docs/reference/indices/upgrade.asciidoc b/docs/reference/indices/upgrade.asciidoc index 295a407f979..046dde2fe8a 100644 --- a/docs/reference/indices/upgrade.asciidoc +++ b/docs/reference/indices/upgrade.asciidoc @@ -54,13 +54,26 @@ curl 'http://localhost:9200/twitter/_upgrade?pretty&human' [source,js] -------------------------------------------------- { - "twitter": { + "size": "21gb", + "size_in_bytes": "21000000000", + "size_to_upgrade": "10gb", + "size_to_upgrade_in_bytes": "10000000000" + "size_to_upgrade_ancient": "1gb", + "size_to_upgrade_ancient_in_bytes": "1000000000" + "indices": { + "twitter": { "size": "21gb", "size_in_bytes": "21000000000", "size_to_upgrade": "10gb", "size_to_upgrade_in_bytes": "10000000000" "size_to_upgrade_ancient": "1gb", "size_to_upgrade_ancient_in_bytes": "1000000000" - } + } + } } -------------------------------------------------- + +The level of details in the upgrade status command can be controlled by +setting `level` parameter to `cluster`, `index` (default) or `shard` levels. +For example, you can run the upgrade status command with `level=shard` to +get detailed upgrade information of each individual shard. \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index 0decb393405..7bb66260a58 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -111,6 +111,12 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesActi import org.elasticsearch.action.admin.indices.template.get.TransportGetIndexTemplatesAction; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction; +import org.elasticsearch.action.admin.indices.upgrade.get.TransportUpgradeStatusAction; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusAction; +import org.elasticsearch.action.admin.indices.upgrade.post.TransportUpgradeAction; +import org.elasticsearch.action.admin.indices.upgrade.post.TransportUpgradeSettingsAction; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsAction; import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerAction; @@ -256,6 +262,9 @@ public class ActionModule extends AbstractModule { registerAction(FlushAction.INSTANCE, TransportFlushAction.class); registerAction(SealIndicesAction.INSTANCE, TransportSealIndicesAction.class); registerAction(OptimizeAction.INSTANCE, TransportOptimizeAction.class); + registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class); + registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class); + registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class); registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class); registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class); registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java index 3510a3b7f96..08f322a1154 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java @@ -42,15 +42,11 @@ public class OptimizeRequest extends BroadcastRequest { public static final int MAX_NUM_SEGMENTS = -1; public static final boolean ONLY_EXPUNGE_DELETES = false; public static final boolean FLUSH = true; - public static final boolean UPGRADE = false; - public static final boolean UPGRADE_ONLY_ANCIENT_SEGMENTS = false; } private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; - private boolean upgrade = Defaults.UPGRADE; - private boolean upgradeOnlyAncientSegments = Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS; /** * Constructs an optimization request over one or more indices. @@ -114,30 +110,12 @@ public class OptimizeRequest extends BroadcastRequest { return this; } - /** - * Should the merge upgrade all old segments to the current index format. - * Defaults to false. - */ - public boolean upgrade() { - return upgrade; - } - - /** - * See {@link #upgrade()} - */ - public OptimizeRequest upgrade(boolean upgrade) { - this.upgrade = upgrade; - return this; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - upgrade = in.readBoolean(); - upgradeOnlyAncientSegments = in.readBoolean(); } @Override @@ -146,24 +124,6 @@ public class OptimizeRequest extends BroadcastRequest { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); - out.writeBoolean(upgrade); - out.writeBoolean(upgradeOnlyAncientSegments); - } - - /** - * Should the merge upgrade only the ancient (older major version of Lucene) segments? - * Defaults to false. - */ - public boolean upgradeOnlyAncientSegments() { - return upgradeOnlyAncientSegments; - } - - /** - * See {@link #upgradeOnlyAncientSegments()} - */ - public OptimizeRequest upgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) { - this.upgradeOnlyAncientSegments = upgradeOnlyAncientSegments; - return this; } @Override @@ -172,8 +132,6 @@ public class OptimizeRequest extends BroadcastRequest { "maxNumSegments=" + maxNumSegments + ", onlyExpungeDeletes=" + onlyExpungeDeletes + ", flush=" + flush + - ", upgrade=" + upgrade + - ", upgradeOnlyAncientSegments=" + upgradeOnlyAncientSegments + '}'; } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/IndexShardUpgradeStatus.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/IndexShardUpgradeStatus.java new file mode 100644 index 00000000000..e1cd16370c3 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/IndexShardUpgradeStatus.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import com.google.common.collect.Iterators; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Iterator; + +public class IndexShardUpgradeStatus implements Iterable { + + private final ShardId shardId; + + private final ShardUpgradeStatus[] shards; + + IndexShardUpgradeStatus(ShardId shardId, ShardUpgradeStatus[] shards) { + this.shardId = shardId; + this.shards = shards; + } + + public ShardId getShardId() { + return this.shardId; + } + + public ShardUpgradeStatus getAt(int i) { + return shards[i]; + } + + public ShardUpgradeStatus[] getShards() { + return this.shards; + } + + @Override + public Iterator iterator() { + return Iterators.forArray(shards); + } + + public long getTotalBytes() { + long totalBytes = 0; + for (ShardUpgradeStatus indexShardUpgradeStatus : shards) { + totalBytes += indexShardUpgradeStatus.getTotalBytes(); + } + return totalBytes; + } + + public long getToUpgradeBytes() { + long upgradeBytes = 0; + for (ShardUpgradeStatus indexShardUpgradeStatus : shards) { + upgradeBytes += indexShardUpgradeStatus.getToUpgradeBytes(); + } + return upgradeBytes; + } + + public long getToUpgradeBytesAncient() { + long upgradeBytesAncient = 0; + for (ShardUpgradeStatus indexShardUpgradeStatus : shards) { + upgradeBytesAncient += indexShardUpgradeStatus.getToUpgradeBytesAncient(); + } + return upgradeBytesAncient; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/IndexUpgradeStatus.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/IndexUpgradeStatus.java new file mode 100644 index 00000000000..33a60328951 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/IndexUpgradeStatus.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class IndexUpgradeStatus implements Iterable { + + private final String index; + + private final Map indexShards; + + IndexUpgradeStatus(String index, ShardUpgradeStatus[] shards) { + this.index = index; + + Map> tmpIndexShards = Maps.newHashMap(); + for (ShardUpgradeStatus shard : shards) { + List lst = tmpIndexShards.get(shard.getShardRouting().id()); + if (lst == null) { + lst = Lists.newArrayList(); + tmpIndexShards.put(shard.getShardRouting().id(), lst); + } + lst.add(shard); + } + indexShards = Maps.newHashMap(); + for (Map.Entry> entry : tmpIndexShards.entrySet()) { + indexShards.put(entry.getKey(), new IndexShardUpgradeStatus(entry.getValue().get(0).getShardRouting().shardId(), entry.getValue().toArray(new ShardUpgradeStatus[entry.getValue().size()]))); + } + } + + public String getIndex() { + return this.index; + } + + /** + * A shard id to index shard upgrade status map (note, index shard upgrade status is the replication shard group that maps + * to the shard id). + */ + public Map getShards() { + return this.indexShards; + } + + @Override + public Iterator iterator() { + return indexShards.values().iterator(); + } + + public long getTotalBytes() { + long totalBytes = 0; + for (IndexShardUpgradeStatus indexShardUpgradeStatus : indexShards.values()) { + totalBytes += indexShardUpgradeStatus.getTotalBytes(); + } + return totalBytes; + } + + public long getToUpgradeBytes() { + long upgradeBytes = 0; + for (IndexShardUpgradeStatus indexShardUpgradeStatus : indexShards.values()) { + upgradeBytes += indexShardUpgradeStatus.getToUpgradeBytes(); + } + return upgradeBytes; + } + + public long getToUpgradeBytesAncient() { + long upgradeBytesAncient = 0; + for (IndexShardUpgradeStatus indexShardUpgradeStatus : indexShards.values()) { + upgradeBytesAncient += indexShardUpgradeStatus.getToUpgradeBytesAncient(); + } + return upgradeBytesAncient; + } + + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/ShardUpgradeStatus.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/ShardUpgradeStatus.java new file mode 100644 index 00000000000..e5f0261932c --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/ShardUpgradeStatus.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.cluster.routing.ImmutableShardRouting.readShardRoutingEntry; + +public class ShardUpgradeStatus extends BroadcastShardResponse { + + private ShardRouting shardRouting; + + private long totalBytes; + + private long toUpgradeBytes; + + private long toUpgradeBytesAncient; + + ShardUpgradeStatus() { + } + + ShardUpgradeStatus(ShardRouting shardRouting, long totalBytes, long toUpgradeBytes, long upgradeBytesAncient) { + super(shardRouting.shardId()); + this.shardRouting = shardRouting; + this.totalBytes = totalBytes; + this.toUpgradeBytes = toUpgradeBytes; + this.toUpgradeBytesAncient = upgradeBytesAncient; + + } + + public ShardRouting getShardRouting() { + return this.shardRouting; + } + + public long getTotalBytes() { + return totalBytes; + } + + public long getToUpgradeBytes() { + return toUpgradeBytes; + } + + public long getToUpgradeBytesAncient() { + return toUpgradeBytesAncient; + } + + public static ShardUpgradeStatus readShardUpgradeStatus(StreamInput in) throws IOException { + ShardUpgradeStatus shard = new ShardUpgradeStatus(); + shard.readFrom(in); + return shard; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardRouting = readShardRoutingEntry(in); + totalBytes = in.readLong(); + toUpgradeBytes = in.readLong(); + toUpgradeBytesAncient = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardRouting.writeTo(out); + out.writeLong(totalBytes); + out.writeLong(toUpgradeBytes); + out.writeLong(toUpgradeBytesAncient); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java new file mode 100644 index 00000000000..370dce6e41f --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; +import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Segment; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static com.google.common.collect.Lists.newArrayList; + +/** + * + */ +public class TransportUpgradeStatusAction extends TransportBroadcastAction { + + private final IndicesService indicesService; + + @Inject + public TransportUpgradeStatusAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, + IndicesService indicesService, ActionFilters actionFilters) { + super(settings, UpgradeStatusAction.NAME, threadPool, clusterService, transportService, actionFilters, + UpgradeStatusRequest.class, IndexShardUpgradeStatusRequest.class, ThreadPool.Names.MANAGEMENT); + this.indicesService = indicesService; + } + + /** + * Getting upgrade stats from *all* active shards. + */ + @Override + protected GroupShardsIterator shards(ClusterState clusterState, UpgradeStatusRequest request, String[] concreteIndices) { + return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, UpgradeStatusRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, UpgradeStatusRequest countRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); + } + + @Override + protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { + int successfulShards = 0; + int failedShards = 0; + List shardFailures = null; + final List shards = newArrayList(); + for (int i = 0; i < shardsResponses.length(); i++) { + Object shardResponse = shardsResponses.get(i); + if (shardResponse == null) { + // simply ignore non active shards + } else if (shardResponse instanceof BroadcastShardOperationFailedException) { + failedShards++; + if (shardFailures == null) { + shardFailures = newArrayList(); + } + shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); + } else { + shards.add((ShardUpgradeStatus) shardResponse); + successfulShards++; + } + } + return new UpgradeStatusResponse(shards.toArray(new ShardUpgradeStatus[shards.size()]), shardsResponses.length(), successfulShards, failedShards, shardFailures); + } + + @Override + protected IndexShardUpgradeStatusRequest newShardRequest(int numShards, ShardRouting shard, UpgradeStatusRequest request) { + return new IndexShardUpgradeStatusRequest(shard.shardId(), request); + } + + @Override + protected ShardUpgradeStatus newShardResponse() { + return new ShardUpgradeStatus(); + } + + @Override + protected ShardUpgradeStatus shardOperation(IndexShardUpgradeStatusRequest request) { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + List segments = indexShard.engine().segments(false); + long total_bytes = 0; + long to_upgrade_bytes = 0; + long to_upgrade_bytes_ancient = 0; + for (Segment seg : segments) { + total_bytes += seg.sizeInBytes; + if (seg.version.major != Version.CURRENT.luceneVersion.major) { + to_upgrade_bytes_ancient += seg.sizeInBytes; + to_upgrade_bytes += seg.sizeInBytes; + } else if (seg.version.minor != Version.CURRENT.luceneVersion.minor) { + // TODO: this comparison is bogus! it would cause us to upgrade even with the same format + // instead, we should check if the codec has changed + to_upgrade_bytes += seg.sizeInBytes; + } + } + + return new ShardUpgradeStatus(indexShard.routingEntry(), total_bytes, to_upgrade_bytes, to_upgrade_bytes_ancient); + } + + static class IndexShardUpgradeStatusRequest extends BroadcastShardRequest { + + IndexShardUpgradeStatusRequest() { + + } + + IndexShardUpgradeStatusRequest(ShardId shardId, UpgradeStatusRequest request) { + super(shardId, request); + } + + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusAction.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusAction.java new file mode 100644 index 00000000000..e0318b13b97 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + */ +public class UpgradeStatusAction extends Action { + + public static final UpgradeStatusAction INSTANCE = new UpgradeStatusAction(); + public static final String NAME = "indices:monitor/upgrade"; + + private UpgradeStatusAction() { + super(NAME); + } + + @Override + public UpgradeStatusResponse newResponse() { + return new UpgradeStatusResponse(); + } + + @Override + public UpgradeStatusRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new UpgradeStatusRequestBuilder(client, this); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusRequest.java new file mode 100644 index 00000000000..a951924720d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class UpgradeStatusRequest extends BroadcastRequest { + + public UpgradeStatusRequest() { + this(Strings.EMPTY_ARRAY); + } + + public UpgradeStatusRequest(String... indices) { + super(indices); + } + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusRequestBuilder.java new file mode 100644 index 00000000000..98dd1c1828d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusRequestBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * + */ +public class UpgradeStatusRequestBuilder extends BroadcastOperationRequestBuilder { + + public UpgradeStatusRequestBuilder(ElasticsearchClient client, UpgradeStatusAction action) { + super(client, action, new UpgradeStatusRequest()); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java new file mode 100644 index 00000000000..89520704049 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java @@ -0,0 +1,191 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.get; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class UpgradeStatusResponse extends BroadcastResponse implements ToXContent { + + + private ShardUpgradeStatus[] shards; + + private Map indicesUpgradeStatus; + + UpgradeStatusResponse() { + + } + + UpgradeStatusResponse(ShardUpgradeStatus[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.shards = shards; + } + + public Map getIndices() { + if (indicesUpgradeStatus != null) { + return indicesUpgradeStatus; + } + Map indicesUpgradeStats = Maps.newHashMap(); + + Set indices = Sets.newHashSet(); + for (ShardUpgradeStatus shard : shards) { + indices.add(shard.getIndex()); + } + + for (String index : indices) { + List shards = Lists.newArrayList(); + for (ShardUpgradeStatus shard : this.shards) { + if (shard.getShardRouting().index().equals(index)) { + shards.add(shard); + } + } + indicesUpgradeStats.put(index, new IndexUpgradeStatus(index, shards.toArray(new ShardUpgradeStatus[shards.size()]))); + } + this.indicesUpgradeStatus = indicesUpgradeStats; + return indicesUpgradeStats; + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shards = new ShardUpgradeStatus[in.readVInt()]; + for (int i = 0; i < shards.length; i++) { + shards[i] = ShardUpgradeStatus.readShardUpgradeStatus(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shards.length); + for (ShardUpgradeStatus shard : shards) { + shard.writeTo(out); + } + } + + public long getTotalBytes() { + long totalBytes = 0; + for (IndexUpgradeStatus indexShardUpgradeStatus : getIndices().values()) { + totalBytes += indexShardUpgradeStatus.getTotalBytes(); + } + return totalBytes; + } + + public long getToUpgradeBytes() { + long upgradeBytes = 0; + for (IndexUpgradeStatus indexShardUpgradeStatus : getIndices().values()) { + upgradeBytes += indexShardUpgradeStatus.getToUpgradeBytes(); + } + return upgradeBytes; + } + + public long getToUpgradeBytesAncient() { + long upgradeBytesAncient = 0; + for (IndexUpgradeStatus indexShardUpgradeStatus : getIndices().values()) { + upgradeBytesAncient += indexShardUpgradeStatus.getToUpgradeBytesAncient(); + } + return upgradeBytesAncient; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + + + builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, getTotalBytes()); + builder.byteSizeField(Fields.SIZE_TO_UPGRADE_IN_BYTES, Fields.SIZE_TO_UPGRADE, getToUpgradeBytes()); + builder.byteSizeField(Fields.SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, Fields.SIZE_TO_UPGRADE_ANCIENT, getToUpgradeBytesAncient()); + + String level = params.param("level", "indices"); + boolean outputShards = "shards".equals(level); + boolean outputIndices = "indices".equals(level) || outputShards; + if (outputIndices) { + builder.startObject(Fields.INDICES); + for (IndexUpgradeStatus indexUpgradeStatus : getIndices().values()) { + builder.startObject(indexUpgradeStatus.getIndex(), XContentBuilder.FieldCaseConversion.NONE); + + builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, indexUpgradeStatus.getTotalBytes()); + builder.byteSizeField(Fields.SIZE_TO_UPGRADE_IN_BYTES, Fields.SIZE_TO_UPGRADE, indexUpgradeStatus.getToUpgradeBytes()); + builder.byteSizeField(Fields.SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, Fields.SIZE_TO_UPGRADE_ANCIENT, indexUpgradeStatus.getToUpgradeBytesAncient()); + if (outputShards) { + builder.startObject(Fields.SHARDS); + for (IndexShardUpgradeStatus indexShardUpgradeStatus : indexUpgradeStatus) { + builder.startArray(Integer.toString(indexShardUpgradeStatus.getShardId().id())); + for (ShardUpgradeStatus shardUpgradeStatus : indexShardUpgradeStatus) { + builder.startObject(); + + builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, getTotalBytes()); + builder.byteSizeField(Fields.SIZE_TO_UPGRADE_IN_BYTES, Fields.SIZE_TO_UPGRADE, getToUpgradeBytes()); + builder.byteSizeField(Fields.SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, Fields.SIZE_TO_UPGRADE_ANCIENT, getToUpgradeBytesAncient()); + + builder.startObject(Fields.ROUTING); + builder.field(Fields.STATE, shardUpgradeStatus.getShardRouting().state()); + builder.field(Fields.PRIMARY, shardUpgradeStatus.getShardRouting().primary()); + builder.field(Fields.NODE, shardUpgradeStatus.getShardRouting().currentNodeId()); + if (shardUpgradeStatus.getShardRouting().relocatingNodeId() != null) { + builder.field(Fields.RELOCATING_NODE, shardUpgradeStatus.getShardRouting().relocatingNodeId()); + } + builder.endObject(); + + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + } + + builder.endObject(); + } + + builder.endObject(); + } + return builder; + } + + static final class Fields { + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + static final XContentBuilderString SHARDS = new XContentBuilderString("shards"); + static final XContentBuilderString ROUTING = new XContentBuilderString("routing"); + static final XContentBuilderString STATE = new XContentBuilderString("state"); + static final XContentBuilderString PRIMARY = new XContentBuilderString("primary"); + static final XContentBuilderString NODE = new XContentBuilderString("node"); + static final XContentBuilderString RELOCATING_NODE = new XContentBuilderString("relocating_node"); + static final XContentBuilderString SIZE = new XContentBuilderString("size"); + static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); + static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade"); + static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient"); + static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes"); + static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes"); + + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeRequest.java new file mode 100644 index 00000000000..9731a983c38 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + + +import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * + */ +final class ShardUpgradeRequest extends BroadcastShardRequest { + + private UpgradeRequest request = new UpgradeRequest(); + + ShardUpgradeRequest() { + } + + ShardUpgradeRequest(ShardId shardId, UpgradeRequest request) { + super(shardId, request); + this.request = request; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + request.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + + public UpgradeRequest upgradeRequest() { + return this.request; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java new file mode 100644 index 00000000000..efbb19142c3 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.text.ParseException; + +/** + * + */ +class ShardUpgradeResponse extends BroadcastShardResponse { + + private org.apache.lucene.util.Version version; + + private boolean primary; + + + ShardUpgradeResponse() { + } + + ShardUpgradeResponse(ShardId shardId, boolean primary, org.apache.lucene.util.Version version) { + super(shardId); + this.primary = primary; + this.version = version; + } + + public org.apache.lucene.util.Version version() { + return this.version; + } + + public boolean primary() { + return primary; + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + primary = in.readBoolean(); + try { + version = org.apache.lucene.util.Version.parse(in.readString()); + } catch (ParseException ex) { + throw new IOException("failed to parse lucene version [" + version + "]", ex); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(primary); + out.writeString(version.toString()); + } + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java new file mode 100644 index 00000000000..c5dc59ee634 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.apache.lucene.util.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.PrimaryMissingActionException; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Sets.newHashSet; + +/** + * Upgrade index/indices action. + */ +public class TransportUpgradeAction extends TransportBroadcastAction { + + private final IndicesService indicesService; + + private final TransportUpgradeSettingsAction upgradeSettingsAction; + + @Inject + public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, + TransportUpgradeSettingsAction upgradeSettingsAction) { + super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, + UpgradeRequest.class, ShardUpgradeRequest.class, ThreadPool.Names.OPTIMIZE); + this.indicesService = indicesService; + this.upgradeSettingsAction = upgradeSettingsAction; + } + + @Override + protected UpgradeResponse newResponse(UpgradeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { + int successfulShards = 0; + int failedShards = 0; + List shardFailures = null; + Map successfulPrimaryShards = newHashMap(); + Map versions = newHashMap(); + for (int i = 0; i < shardsResponses.length(); i++) { + Object shardResponse = shardsResponses.get(i); + if (shardResponse == null) { + // a non active shard, ignore... + } else if (shardResponse instanceof BroadcastShardOperationFailedException) { + failedShards++; + if (shardFailures == null) { + shardFailures = newArrayList(); + } + shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); + } else { + successfulShards++; + ShardUpgradeResponse shardUpgradeResponse = (ShardUpgradeResponse) shardResponse; + String index = shardUpgradeResponse.getIndex(); + if (shardUpgradeResponse.primary()) { + Integer count = successfulPrimaryShards.get(index); + successfulPrimaryShards.put(index, count == null ? 1 : count + 1); + } + Version version = versions.get(index); + if (version == null || shardUpgradeResponse.version().onOrAfter(version) == false) { + versions.put(index, shardUpgradeResponse.version()); + } + } + } + Map updatedVersions = newHashMap(); + MetaData metaData = clusterState.metaData(); + for (Map.Entry versionEntry : versions.entrySet()) { + String index = versionEntry.getKey(); + Integer primaryCount = successfulPrimaryShards.get(index); + int expectedPrimaryCount = metaData.index(index).getNumberOfShards(); + if (primaryCount == metaData.index(index).getNumberOfShards()) { + updatedVersions.put(index, versionEntry.getValue().toString()); + } else { + logger.warn("Not updating settings for the index [{}] because upgraded of some primary shards failed - expected[{}], received[{}]", index, + expectedPrimaryCount, primaryCount == null ? 0 : primaryCount); + } + } + + return new UpgradeResponse(updatedVersions, shardsResponses.length(), successfulShards, failedShards, shardFailures); + } + + @Override + protected ShardUpgradeRequest newShardRequest(int numShards, ShardRouting shard, UpgradeRequest request) { + return new ShardUpgradeRequest(shard.shardId(), request); + } + + @Override + protected ShardUpgradeResponse newShardResponse() { + return new ShardUpgradeResponse(); + } + + @Override + protected ShardUpgradeResponse shardOperation(ShardUpgradeRequest request) { + IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); + org.apache.lucene.util.Version version = indexShard.upgrade(request.upgradeRequest()); + return new ShardUpgradeResponse(request.shardId(), indexShard.routingEntry().primary(), version); + } + + /** + * The upgrade request works against *all* shards. + */ + @Override + protected GroupShardsIterator shards(ClusterState clusterState, UpgradeRequest request, String[] concreteIndices) { + GroupShardsIterator iterator = clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true); + Set indicesWithMissingPrimaries = indicesWithMissingPrimaries(clusterState, concreteIndices); + if (indicesWithMissingPrimaries.isEmpty()) { + return iterator; + } + // If some primary shards are not available the request should fail. + throw new PrimaryMissingActionException("Cannot upgrade indices because the following indices are missing primary shards " + indicesWithMissingPrimaries); + } + + /** + * Finds all indices that have not all primaries available + */ + private Set indicesWithMissingPrimaries(ClusterState clusterState, String[] concreteIndices) { + Set indices = newHashSet(); + RoutingTable routingTable = clusterState.routingTable(); + for (String index : concreteIndices) { + IndexRoutingTable indexRoutingTable = routingTable.index(index); + if (indexRoutingTable.allPrimaryShardsActive() == false) { + indices.add(index); + } + } + return indices; + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, UpgradeRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, UpgradeRequest request, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); + } + + @Override + protected void doExecute(UpgradeRequest request, final ActionListener listener) { + ActionListener settingsUpdateListener = new ActionListener() { + @Override + public void onResponse(UpgradeResponse upgradeResponse) { + try { + if (upgradeResponse.versions().isEmpty()) { + listener.onResponse(upgradeResponse); + } else { + updateSettings(upgradeResponse, listener); + } + } catch (Throwable t) { + listener.onFailure(t); + } + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }; + super.doExecute(request, settingsUpdateListener); + } + + private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { + UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions()); + upgradeSettingsAction.execute(upgradeSettingsRequest, new ActionListener() { + @Override + public void onResponse(UpgradeSettingsResponse updateSettingsResponse) { + listener.onResponse(upgradeResponse); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } + +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java new file mode 100644 index 00000000000..26c3731697d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * + */ +public class TransportUpgradeSettingsAction extends TransportMasterNodeAction { + + private final MetaDataUpdateSettingsService updateSettingsService; + + @Inject + public TransportUpgradeSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + MetaDataUpdateSettingsService updateSettingsService, ActionFilters actionFilters) { + super(settings, UpgradeSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, UpgradeSettingsRequest.class); + this.updateSettingsService = updateSettingsService; + } + + @Override + protected String executor() { + // we go async right away.... + return ThreadPool.Names.SAME; + } + + @Override + protected ClusterBlockException checkBlock(UpgradeSettingsRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected UpgradeSettingsResponse newResponse() { + return new UpgradeSettingsResponse(); + } + + @Override + protected void masterOperation(final UpgradeSettingsRequest request, final ClusterState state, final ActionListener listener) { + UpgradeSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpgradeSettingsClusterStateUpdateRequest() + .ackTimeout(request.timeout()) + .versions(request.versions()) + .masterNodeTimeout(request.masterNodeTimeout()); + + updateSettingsService.upgradeIndexSettings(clusterStateUpdateRequest, new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new UpgradeSettingsResponse(response.isAcknowledged())); + } + + @Override + public void onFailure(Throwable t) { + logger.debug("failed to upgrade minimum compatibility version settings on indices [{}]", t, request.versions().keySet()); + listener.onFailure(t); + } + }); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeAction.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeAction.java new file mode 100644 index 00000000000..908a8a0d283 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeAction.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Upgrade index/indices action. + */ +public class UpgradeAction extends Action { + + public static final UpgradeAction INSTANCE = new UpgradeAction(); + public static final String NAME = "indices:admin/upgrade"; + + private UpgradeAction() { + super(NAME); + } + + @Override + public UpgradeResponse newResponse() { + return new UpgradeResponse(); + } + + @Override + public UpgradeRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new UpgradeRequestBuilder(client, this); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeRequest.java new file mode 100644 index 00000000000..af328ce21ad --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeRequest.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A request to upgrade one or more indices. In order to optimize on all the indices, pass an empty array or + * null for the indices. + *

+ * @see org.elasticsearch.client.Requests#upgradeRequest(String...) + * @see org.elasticsearch.client.IndicesAdminClient#upgrade(UpgradeRequest) + * @see UpgradeResponse + */ +public class UpgradeRequest extends BroadcastRequest { + + public static final class Defaults { + public static final boolean UPGRADE_ONLY_ANCIENT_SEGMENTS = false; + } + + private boolean upgradeOnlyAncientSegments = Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS; + + /** + * Constructs an optimization request over one or more indices. + * + * @param indices The indices to optimize, no indices passed means all indices will be optimized. + */ + public UpgradeRequest(String... indices) { + super(indices); + } + + public UpgradeRequest() { + + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + upgradeOnlyAncientSegments = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(upgradeOnlyAncientSegments); + } + + /** + * Should the upgrade only the ancient (older major version of Lucene) segments? + * Defaults to false. + */ + public boolean upgradeOnlyAncientSegments() { + return upgradeOnlyAncientSegments; + } + + /** + * See {@link #upgradeOnlyAncientSegments()} + */ + public UpgradeRequest upgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) { + this.upgradeOnlyAncientSegments = upgradeOnlyAncientSegments; + return this; + } + + @Override + public String toString() { + return "UpgradeRequest{" + + "upgradeOnlyAncientSegments=" + upgradeOnlyAncientSegments + + '}'; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeRequestBuilder.java new file mode 100644 index 00000000000..adc8ea5510a --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeRequestBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * A request to upgrade one or more indices. In order to optimize on all the indices, pass an empty array or + * null for the indices. + */ +public class UpgradeRequestBuilder extends BroadcastOperationRequestBuilder { + + public UpgradeRequestBuilder(ElasticsearchClient client, UpgradeAction action) { + super(client, action, new UpgradeRequest()); + } + + /** + * Should the upgrade only the ancient (older major version of Lucene) segments? + */ + public UpgradeRequestBuilder setUpgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) { + request.upgradeOnlyAncientSegments(upgradeOnlyAncientSegments); + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeResponse.java new file mode 100644 index 00000000000..04e377dd75d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeResponse.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * A response for optimize action. + * + * + */ +public class UpgradeResponse extends BroadcastResponse { + + private Map versions; + + UpgradeResponse() { + + } + + UpgradeResponse(Map versions, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.versions = versions; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + versions = newHashMap(); + for (int i=0; i entry : versions.entrySet()) { + out.writeString(entry.getKey()); + out.writeString(entry.getValue()); + } + } + + public Map versions() { + return versions; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsAction.java new file mode 100644 index 00000000000..5257b50132d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + */ +public class UpgradeSettingsAction extends Action { + + public static final UpgradeSettingsAction INSTANCE = new UpgradeSettingsAction(); + public static final String NAME = "internal:indices/admin/upgrade"; + + private UpgradeSettingsAction() { + super(NAME); + } + + @Override + public UpgradeSettingsResponse newResponse() { + return new UpgradeSettingsResponse(); + } + + @Override + public UpgradeSettingsRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new UpgradeSettingsRequestBuilder(client, this); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsClusterStateUpdateRequest.java new file mode 100644 index 00000000000..7067f2f61ec --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsClusterStateUpdateRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; + +import java.util.Map; + +/** + * Cluster state update request that allows to change minimum compatibility settings for some indices + */ +public class UpgradeSettingsClusterStateUpdateRequest extends ClusterStateUpdateRequest { + + private Map versions; + + public UpgradeSettingsClusterStateUpdateRequest() { + + } + + /** + * Returns the index to version map for indices that should be updated + */ + public Map versions() { + return versions; + } + + /** + * Sets the index to version map for indices that should be updated + */ + public UpgradeSettingsClusterStateUpdateRequest versions(Map versions) { + this.versions = versions; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsRequest.java new file mode 100644 index 00000000000..b191fa53539 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsRequest.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Request for an update index settings action + */ +public class UpgradeSettingsRequest extends AcknowledgedRequest { + + + private Map versions; + + UpgradeSettingsRequest() { + } + + /** + * Constructs a new request to update minimum compatible version settings for one or more indices + */ + public UpgradeSettingsRequest(Map versions) { + this.versions = versions; + } + + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (versions.isEmpty()) { + validationException = addValidationError("no indices to update", validationException); + } + return validationException; + } + + + Map versions() { + return versions; + } + + /** + * Sets the index versions to be updated + */ + public UpgradeSettingsRequest versions(Map versions) { + this.versions = versions; + return this; + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + versions = newHashMap(); + for (int i=0; i entry : versions.entrySet()) { + out.writeString(entry.getKey()); + out.writeString(entry.getValue()); + } + writeTimeout(out); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsRequestBuilder.java new file mode 100644 index 00000000000..74c42a5fe80 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsRequestBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +import java.util.Map; + +/** + * Builder for an update index settings request + */ +public class UpgradeSettingsRequestBuilder extends AcknowledgedRequestBuilder { + + public UpgradeSettingsRequestBuilder(ElasticsearchClient client, UpgradeSettingsAction action) { + super(client, action, new UpgradeSettingsRequest()); + } + + /** + * Sets the index versions to be updated + */ + public UpgradeSettingsRequestBuilder setVersions(Map versions) { + request.versions(versions); + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsResponse.java new file mode 100644 index 00000000000..0918af6f418 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/UpgradeSettingsResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.upgrade.post; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A response for an update index settings action + */ +public class UpgradeSettingsResponse extends AcknowledgedResponse { + + UpgradeSettingsResponse() { + } + + UpgradeSettingsResponse(boolean acknowledged) { + super(acknowledged); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out); + } +} diff --git a/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index c54aaece7f4..ae16d7b36d2 100644 --- a/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -96,6 +96,12 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResp import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequest; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequestBuilder; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequestBuilder; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeResponse; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse; @@ -406,6 +412,53 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ OptimizeRequestBuilder prepareOptimize(String... indices); + + /** + * Explicitly upgrade one or more indices + * + * @param request The upgrade request + * @return A result future + * @see org.elasticsearch.client.Requests#upgradeRequest(String...) + */ + ActionFuture upgrade(UpgradeRequest request); + + /** + * Explicitly upgrade one or more indices + * + * @param request The upgrade request + * @param listener A listener to be notified with a result + * @see org.elasticsearch.client.Requests#upgradeRequest(String...) + */ + void upgrade(UpgradeRequest request, ActionListener listener); + + /** + * Explicitly upgrade one or more indices + */ + UpgradeStatusRequestBuilder prepareUpgradeStatus(String... indices); + + /** + * Check upgrade status of one or more indices + * + * @param request The upgrade request + * @return A result future + * @see org.elasticsearch.client.Requests#upgradeRequest(String...) + */ + ActionFuture upgradeStatus(UpgradeStatusRequest request); + + /** + * Check upgrade status of one or more indices + * + * @param request The upgrade request + * @param listener A listener to be notified with a result + * @see org.elasticsearch.client.Requests#upgradeRequest(String...) + */ + void upgradeStatus(UpgradeStatusRequest request, ActionListener listener); + + /** + * Check upgrade status of one or more indices + */ + UpgradeRequestBuilder prepareUpgrade(String... indices); + /** * Get the complete mappings of one or more types */ diff --git a/src/main/java/org/elasticsearch/client/Requests.java b/src/main/java/org/elasticsearch/client/Requests.java index bc2a778f570..8a70c18b374 100644 --- a/src/main/java/org/elasticsearch/client/Requests.java +++ b/src/main/java/org/elasticsearch/client/Requests.java @@ -49,6 +49,7 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.count.CountRequest; import org.elasticsearch.action.delete.DeleteRequest; @@ -291,6 +292,17 @@ public class Requests { return new OptimizeRequest(indices); } + /** + * Creates an upgrade request. + * + * @param indices The indices to upgrade. Use null or _all to execute against all indices + * @return The upgrade request + * @see org.elasticsearch.client.IndicesAdminClient#upgrade(UpgradeRequest) + */ + public static UpgradeRequest upgradeRequest(String... indices) { + return new UpgradeRequest(indices); + } + /** * Creates a clean indices cache request. * diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 75e8ada560b..625a469470d 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -196,6 +196,14 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateActio import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusAction; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequest; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequestBuilder; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequestBuilder; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeResponse; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder; @@ -1415,6 +1423,36 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new OptimizeRequestBuilder(this, OptimizeAction.INSTANCE).setIndices(indices); } + @Override + public ActionFuture upgrade(final UpgradeRequest request) { + return execute(UpgradeAction.INSTANCE, request); + } + + @Override + public void upgrade(final UpgradeRequest request, final ActionListener listener) { + execute(UpgradeAction.INSTANCE, request, listener); + } + + @Override + public UpgradeRequestBuilder prepareUpgrade(String... indices) { + return new UpgradeRequestBuilder(this, UpgradeAction.INSTANCE).setIndices(indices); + } + + + @Override + public ActionFuture upgradeStatus(final UpgradeStatusRequest request) { + return execute(UpgradeStatusAction.INSTANCE, request); + } + + @Override + public void upgradeStatus(final UpgradeStatusRequest request, final ActionListener listener) { + execute(UpgradeStatusAction.INSTANCE, request, listener); + } + + @Override + public UpgradeStatusRequestBuilder prepareUpgradeStatus(String... indices) { + return new UpgradeStatusRequestBuilder(this, UpgradeStatusAction.INSTANCE).setIndices(indices); + } @Override public ActionFuture refresh(final RefreshRequest request) { return execute(RefreshAction.INSTANCE, request); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index c9c7bbabb00..07703bca591 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -49,6 +49,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.warmer.IndexWarmersMetaData; import java.io.IOException; +import java.text.ParseException; import java.util.EnumSet; import java.util.HashMap; import java.util.Locale; @@ -159,6 +160,7 @@ public class IndexMetaData implements Diffable { public static final String SETTING_BLOCKS_WRITE = "index.blocks.write"; public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata"; public static final String SETTING_VERSION_CREATED = "index.version.created"; + public static final String SETTING_VERSION_UPGRADED = "index.version.upgraded"; public static final String SETTING_VERSION_MINIMUM_COMPATIBLE = "index.version.minimum_compatible"; public static final String SETTING_CREATION_DATE = "index.creation_date"; public static final String SETTING_UUID = "index.uuid"; @@ -192,7 +194,8 @@ public class IndexMetaData implements Diffable { private final DiscoveryNodeFilters excludeFilters; private final Version indexCreatedVersion; - private final Version indexMinimumCompatibleVersion; + private final Version indexUpgradedVersion; + private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion; private final HashFunction routingHashFunction; private final boolean useTypeForRouting; @@ -227,7 +230,17 @@ public class IndexMetaData implements Diffable { excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap); } indexCreatedVersion = Version.indexCreated(settings); - indexMinimumCompatibleVersion = settings.getAsVersion(SETTING_VERSION_MINIMUM_COMPATIBLE, indexCreatedVersion); + indexUpgradedVersion = settings.getAsVersion(IndexMetaData.SETTING_VERSION_UPGRADED, indexCreatedVersion); + String stringLuceneVersion = settings.get(SETTING_VERSION_MINIMUM_COMPATIBLE); + if (stringLuceneVersion != null) { + try { + this.minimumCompatibleLuceneVersion = org.apache.lucene.util.Version.parse(stringLuceneVersion); + } catch (ParseException ex) { + throw new IllegalStateException("Cannot parse lucene version [" + stringLuceneVersion + "] in the [" + SETTING_VERSION_MINIMUM_COMPATIBLE +"] setting", ex); + } + } else { + this.minimumCompatibleLuceneVersion = null; + } final Class hashFunctionClass = settings.getAsClass(SETTING_LEGACY_ROUTING_HASH_FUNCTION, null); if (hashFunctionClass == null) { routingHashFunction = MURMUR3_HASH_FUNCTION; @@ -280,8 +293,6 @@ public class IndexMetaData implements Diffable { /** * Return the {@link Version} on which this index has been created. This * information is typically useful for backward compatibility. - * - * Returns null if the index was created before 0.19.0.RC1. */ public Version creationVersion() { return indexCreatedVersion; @@ -292,17 +303,22 @@ public class IndexMetaData implements Diffable { } /** - * Return the {@link Version} of that created the oldest segment in the index. - * - * If the index was created before v1.6 and didn't go through upgrade API the creation verion is returned. - * Returns null if the index was created before 0.19.0.RC1. + * Return the {@link Version} on which this index has been upgraded. This + * information is typically useful for backward compatibility. */ - public Version minimumCompatibleVersion() { - return indexMinimumCompatibleVersion; + public Version upgradeVersion() { + return indexUpgradedVersion; } - public Version getMinimumCompatibleVersion() { - return minimumCompatibleVersion(); + public Version getUpgradeVersion() { + return upgradeVersion(); + } + + /** + * Return the {@link org.apache.lucene.util.Version} of the oldest lucene segment in the index + */ + public org.apache.lucene.util.Version getMinimumCompatibleVersion() { + return minimumCompatibleLuceneVersion; } /** diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 5265cfabd5d..ea9f7f7a611 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.settings.Settings; /** * This service is responsible for upgrading legacy index metadata to the current version - * + *

* Every time an existing index is introduced into cluster this service should be used * to upgrade the existing index metadata to the latest version of the cluster. It typically * occurs during cluster upgrade, when dangling indices are imported into the cluster or indices @@ -64,7 +64,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent { pre20HashFunction = DjbHashFunction.class; } pre20UseType = settings.getAsBoolean(DEPRECATED_SETTING_ROUTING_USE_TYPE, null); - if (hasCustomPre20HashFunction|| pre20UseType != null) { + if (hasCustomPre20HashFunction || pre20UseType != null) { logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they " + "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE); } @@ -72,7 +72,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent { /** * Checks that the index can be upgraded to the current version of the master node. - * + *

* If the index does need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index cannot be * updated the method throws an exception. */ @@ -101,8 +101,16 @@ public class MetaDataIndexUpgradeService extends AbstractComponent { * Returns true if this index can be supported by the current version of elasticsearch */ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { - return indexMetaData.minimumCompatibleVersion() != null && - indexMetaData.minimumCompatibleVersion().luceneVersion.onOrAfter(Version.V_0_90_0_Beta1.luceneVersion); + if (indexMetaData.creationVersion().onOrAfter(Version.V_0_90_0_Beta1)) { + // The index was created with elasticsearch that was using Lucene 4.0 + return true; + } + if (indexMetaData.getMinimumCompatibleVersion() != null && + indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_4_0_0)) { + //The index was upgraded we can work with it + return true; + } + return false; } /** diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 4b5d793356d..2f40335116e 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -20,8 +20,10 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Sets; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; @@ -40,6 +42,8 @@ import org.elasticsearch.index.settings.IndexDynamicSettings; import java.util.*; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; + /** * Service responsible for submitting update index settings requests */ @@ -307,4 +311,37 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } }); } + + public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, final ActionListener listener) { + + + clusterService.submitStateUpdateTask("update-index-compatibility-versions", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData()); + for (Map.Entry entry : request.versions().entrySet()) { + String index = entry.getKey(); + IndexMetaData indexMetaData = metaDataBuilder.get(index); + if (indexMetaData != null) { + if (Version.CURRENT.equals(indexMetaData.creationVersion()) == false) { + // No reason to pollute the settings, we didn't really upgrade anything + metaDataBuilder.put(IndexMetaData.builder(indexMetaData) + .settings(settingsBuilder().put(indexMetaData.settings()) + .put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE, entry.getValue()) + .put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.CURRENT) + ) + ); + } + } + } + return ClusterState.builder(currentState).metaData(metaDataBuilder).build(); + } + }); + } } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index acac1c07794..e6222003651 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -33,6 +33,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -717,8 +718,38 @@ public class IndexShard extends AbstractIndexShardComponent { if (logger.isTraceEnabled()) { logger.trace("optimize with {}", optimize); } - engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), - optimize.upgrade(), optimize.upgradeOnlyAncientSegments()); + engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), false, false); + } + + /** + * Upgrades the shard to the current version of Lucene and returns the minimum segment version + */ + public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) { + verifyStarted(); + if (logger.isTraceEnabled()) { + logger.trace("upgrade with {}", upgrade); + } + org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion(); + // we just want to upgrade the segments, not actually optimize to a single segment + engine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable + Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment + false, true, upgrade.upgradeOnlyAncientSegments()); + org.apache.lucene.util.Version version = minimumCompatibleVersion(); + if (logger.isTraceEnabled()) { + logger.trace("upgraded segment {} from version {} to version {}", previousVersion, version); + } + + return version; + } + + public org.apache.lucene.util.Version minimumCompatibleVersion() { + org.apache.lucene.util.Version luceneVersion = Version.LUCENE_3_EMULATION_VERSION; + for(Segment segment : engine().segments(false)) { + if (luceneVersion.onOrAfter(segment.getVersion())) { + luceneVersion = segment.getVersion(); + } + } + return luceneVersion; } public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java index 6ea428bc31a..8c1b1c0458a 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java @@ -19,17 +19,14 @@ package org.elasticsearch.rest.action.admin.indices.upgrade; -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; -import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; -import org.elasticsearch.action.admin.indices.segments.*; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.index.engine.Segment; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; @@ -37,7 +34,8 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.support.RestBuilderListener; -import java.io.IOException; + +import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -66,72 +64,36 @@ public class RestUpgradeAction extends BaseRestHandler { } } - void handleGet(RestRequest request, RestChannel channel, Client client) { - IndicesSegmentsRequest segsReq = new IndicesSegmentsRequest(Strings.splitStringByCommaToArray(request.param("index"))); - client.admin().indices().segments(segsReq, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - - // TODO: getIndices().values() is what IndicesSegmentsResponse uses, but this will produce different orders with jdk8? - for (IndexSegments indexSegments : response.getIndices().values()) { - builder.startObject(indexSegments.getIndex()); - buildUpgradeStatus(indexSegments, builder); - builder.endObject(); - } - - builder.endObject(); - return new BytesRestResponse(OK, builder); - } - }); + void handleGet(final RestRequest request, RestChannel channel, Client client) { + client.admin().indices().prepareUpgradeStatus(Strings.splitStringByCommaToArray(request.param("index"))) + .execute(new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(UpgradeStatusResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); + return new BytesRestResponse(OK, builder); + } + }); } - + void handlePost(final RestRequest request, RestChannel channel, Client client) { - OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index"))); - optimizeReq.flush(true); - optimizeReq.upgrade(true); - optimizeReq.upgradeOnlyAncientSegments(request.paramAsBoolean("only_ancient_segments", false)); - optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment - client.admin().indices().optimize(optimizeReq, new RestBuilderListener(channel) { + UpgradeRequest upgradeReq = new UpgradeRequest(Strings.splitStringByCommaToArray(request.param("index"))); + upgradeReq.upgradeOnlyAncientSegments(request.paramAsBoolean("only_ancient_segments", false)); + client.admin().indices().upgrade(upgradeReq, new RestBuilderListener(channel) { @Override - public RestResponse buildResponse(OptimizeResponse response, XContentBuilder builder) throws Exception { + public RestResponse buildResponse(UpgradeResponse response, XContentBuilder builder) throws Exception { builder.startObject(); buildBroadcastShardsHeader(builder, request, response); + builder.startArray("upgraded_indices"); + for (Map.Entry entry : response.versions().entrySet()) { + builder.field(entry.getKey(), entry.getValue(), XContentBuilder.FieldCaseConversion.NONE); + } + builder.endObject(); builder.endObject(); return new BytesRestResponse(OK, builder); } }); } - - void buildUpgradeStatus(IndexSegments indexSegments, XContentBuilder builder) throws IOException { - long total_bytes = 0; - long to_upgrade_bytes = 0; - long to_upgrade_bytes_ancient = 0; - for (IndexShardSegments shard : indexSegments) { - for (ShardSegments segs : shard.getShards()) { - for (Segment seg : segs.getSegments()) { - total_bytes += seg.sizeInBytes; - if (seg.version.major != Version.CURRENT.luceneVersion.major) { - to_upgrade_bytes_ancient += seg.sizeInBytes; - to_upgrade_bytes += seg.sizeInBytes; - } else if (seg.version.minor != Version.CURRENT.luceneVersion.minor) { - // TODO: this comparison is bogus! it would cause us to upgrade even with the same format - // instead, we should check if the codec has changed - to_upgrade_bytes += seg.sizeInBytes; - } - } - } - } - builder.byteSizeField(SIZE_IN_BYTES, SIZE, total_bytes); - builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, to_upgrade_bytes); - builder.byteSizeField(SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, SIZE_TO_UPGRADE_ANCIENT, to_upgrade_bytes_ancient); - } - - static final XContentBuilderString SIZE = new XContentBuilderString("size"); - static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); - static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade"); - static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient"); - static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes"); - static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes"); } diff --git a/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 9aabab8580b..5f10e5217a6 100644 --- a/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -106,6 +106,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis .addAll(UNMODIFIABLE_SETTINGS) .add(SETTING_NUMBER_OF_REPLICAS) .add(SETTING_AUTO_EXPAND_REPLICAS) + .add(SETTING_VERSION_UPGRADED) + .add(SETTING_VERSION_MINIMUM_COMPATIBLE) .build(); private final ClusterService clusterService; diff --git a/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityTests.java b/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityTests.java index 32fef09363e..ac59615f902 100644 --- a/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityTests.java +++ b/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityTests.java @@ -54,7 +54,6 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.test.index.merge.NoMergePolicyProvider; -import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.Before; @@ -67,8 +66,9 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.matchers.JUnitMatchers.containsString; // needs at least 2 nodes since it bumps replicas to 1 @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) @@ -110,7 +110,6 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio @Override public Settings nodeSettings(int ord) { return Settings.builder() - .put(Node.HTTP_ENABLED, true) // for _upgrade .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class) // disable merging so no segments will be upgraded .put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, 30) // increase recovery speed for small files .build(); @@ -438,13 +437,11 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio } void assertUpgradeWorks(String indexName, boolean alreadyLatest) throws Exception { - HttpRequestBuilder httpClient = httpClient(); - if (alreadyLatest == false) { - UpgradeTest.assertNotUpgraded(httpClient, indexName); + UpgradeTest.assertNotUpgraded(client(), indexName); } - UpgradeTest.runUpgrade(httpClient, indexName); - UpgradeTest.assertUpgraded(httpClient, indexName); + assertNoFailures(client().admin().indices().prepareUpgrade(indexName).get()); + UpgradeTest.assertUpgraded(client(), indexName); } } diff --git a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeReallyOldIndexTest.java b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeReallyOldIndexTest.java index 10416d44868..ce2d54bb30f 100644 --- a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeReallyOldIndexTest.java +++ b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeReallyOldIndexTest.java @@ -20,20 +20,22 @@ package org.elasticsearch.rest.action.admin.indices.upgrade; import org.elasticsearch.bwcompat.StaticIndexBackwardCompatibilityTest; -import org.elasticsearch.node.Node; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; public class UpgradeReallyOldIndexTest extends StaticIndexBackwardCompatibilityTest { public void testUpgrade_0_90_6() throws Exception { String indexName = "index-0.90.6"; - loadIndex(indexName, Node.HTTP_ENABLED, true); - - UpgradeTest.assertNotUpgraded(httpClient(), indexName); - assertTrue(UpgradeTest.hasAncientSegments(httpClient(), indexName)); - UpgradeTest.runUpgrade(httpClient(), indexName, "wait_for_completion", "true", "only_ancient_segments", "true"); - assertFalse(UpgradeTest.hasAncientSegments(httpClient(), "index-0.90.6")); + + loadIndex(indexName); + UpgradeTest.assertNotUpgraded(client(), indexName); + assertTrue(UpgradeTest.hasAncientSegments(client(), indexName)); + assertNoFailures(client().admin().indices().prepareUpgrade(indexName).setUpgradeOnlyAncientSegments(true).get()); + + assertFalse(UpgradeTest.hasAncientSegments(client(), "index-0.90.6")); // This index has only ancient segments, so it should now be fully upgraded: - UpgradeTest.assertUpgraded(httpClient(), indexName); + UpgradeTest.assertUpgraded(client(), indexName); } } diff --git a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java index 3ea5747899c..ddf4bbe2057 100644 --- a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java +++ b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java @@ -26,26 +26,26 @@ import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.upgrade.get.IndexUpgradeStatus; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Segment; -import org.elasticsearch.node.Node; import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; -import org.elasticsearch.test.rest.client.http.HttpResponse; -import org.elasticsearch.test.rest.json.JsonPath; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST) // test scope since we set cluster wide settings public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { @@ -134,20 +134,20 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { logger.info("--> Nodes upgrade complete"); logSegmentsState(); - assertNotUpgraded(httpClient(), null); + assertNotUpgraded(client(), null); final String indexToUpgrade = "test" + randomInt(numIndexes - 1); // This test fires up another node running an older version of ES, but because wire protocol changes across major ES versions, it // means we can never generate ancient segments in this test (unless Lucene major version bumps but ES major version does not): - assertFalse(hasAncientSegments(httpClient(), indexToUpgrade)); + assertFalse(hasAncientSegments(client(), indexToUpgrade)); logger.info("--> Running upgrade on index " + indexToUpgrade); - runUpgrade(httpClient(), indexToUpgrade); + assertNoFailures(client().admin().indices().prepareUpgrade(indexToUpgrade).get()); awaitBusy(new Predicate() { @Override public boolean apply(Object o) { try { - return isUpgraded(httpClient(), indexToUpgrade); + return isUpgraded(client(), indexToUpgrade); } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); } @@ -156,48 +156,40 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { logger.info("--> Single index upgrade complete"); logger.info("--> Running upgrade on the rest of the indexes"); - runUpgrade(httpClient(), null); + assertNoFailures(client().admin().indices().prepareUpgrade().get()); logSegmentsState(); logger.info("--> Full upgrade complete"); - assertUpgraded(httpClient(), null); + assertUpgraded(client(), null); } - static String upgradePath(String index) { - String path = "/_upgrade"; - if (index != null) { - path = "/" + index + path; - } - return path; - } - - public static void assertNotUpgraded(HttpRequestBuilder httpClient, String index) throws Exception { - for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { - assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); + public static void assertNotUpgraded(Client client, String index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); // TODO: it would be better for this to be strictly greater, but sometimes an extra flush // mysteriously happens after the second round of docs are indexed - assertTrue("index " + status.indexName + " should have recovered some segments from transaction log", - status.totalBytes >= status.toUpgradeBytes); - assertTrue("index " + status.indexName + " should need upgrading", status.toUpgradeBytes != 0); + assertTrue("index " + status.getIndex() + " should have recovered some segments from transaction log", + status.getTotalBytes() >= status.getToUpgradeBytes()); + assertTrue("index " + status.getIndex() + " should need upgrading", status.getToUpgradeBytes() != 0); } } - public static void assertNoAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception { - for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { - assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); + public static void assertNoAncientSegments(Client client, String index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); // TODO: it would be better for this to be strictly greater, but sometimes an extra flush // mysteriously happens after the second round of docs are indexed - assertTrue("index " + status.indexName + " should not have any ancient segments", - status.toUpgradeBytesAncient == 0); - assertTrue("index " + status.indexName + " should have recovered some segments from transaction log", - status.totalBytes >= status.toUpgradeBytes); - assertTrue("index " + status.indexName + " should need upgrading", status.toUpgradeBytes != 0); + assertTrue("index " + status.getIndex() + " should not have any ancient segments", + status.getToUpgradeBytesAncient() == 0); + assertTrue("index " + status.getIndex() + " should have recovered some segments from transaction log", + status.getTotalBytes() >= status.getToUpgradeBytes()); + assertTrue("index " + status.getIndex() + " should need upgrading", status.getToUpgradeBytes() != 0); } } /** Returns true if there are any ancient segments. */ - public static boolean hasAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception { - for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { - if (status.toUpgradeBytesAncient != 0) { + public static boolean hasAncientSegments(Client client, String index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + if (status.getToUpgradeBytesAncient() != 0) { return true; } } @@ -205,20 +197,20 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { } /** Returns true if there are any old but not ancient segments. */ - public static boolean hasOldButNotAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception { - for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { - if (status.toUpgradeBytes > status.toUpgradeBytesAncient) { + public static boolean hasOldButNotAncientSegments(Client client, String index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + if (status.getToUpgradeBytes() > status.getToUpgradeBytesAncient()) { return true; } } return false; } - public static void assertUpgraded(HttpRequestBuilder httpClient, String index) throws Exception { - for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { - assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); - assertEquals("index " + status.indexName + " should be upgraded", - 0, status.toUpgradeBytes); + public static void assertUpgraded(Client client, String index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); + assertEquals("index " + status.getIndex() + " should be upgraded", + 0, status.getToUpgradeBytes()); } // double check using the segments api that all segments are actually upgraded @@ -242,12 +234,12 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { } } - static boolean isUpgraded(HttpRequestBuilder httpClient, String index) throws Exception { + static boolean isUpgraded(Client client, String index) throws Exception { ESLogger logger = Loggers.getLogger(UpgradeTest.class); int toUpgrade = 0; - for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { - logger.info("Index: " + status.indexName + ", total: " + status.totalBytes + ", toUpgrade: " + status.toUpgradeBytes); - toUpgrade += status.toUpgradeBytes; + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + logger.info("Index: " + status.getIndex() + ", total: " + status.getTotalBytes() + ", toUpgrade: " + status.getToUpgradeBytes()); + toUpgrade += status.getToUpgradeBytes(); } return toUpgrade == 0; } @@ -257,7 +249,7 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { public final int totalBytes; public final int toUpgradeBytes; public final int toUpgradeBytesAncient; - + public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes, int toUpgradeBytesAncient) { this.indexName = indexName; this.totalBytes = totalBytes; @@ -266,49 +258,11 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest { assert toUpgradeBytesAncient <= toUpgradeBytes; } } - - public static void runUpgrade(HttpRequestBuilder httpClient, String index, String... params) throws Exception { - assert params.length % 2 == 0; - HttpRequestBuilder builder = httpClient.method("POST").path(upgradePath(index)); - for (int i = 0; i < params.length; i += 2) { - builder.addParam(params[i], params[i + 1]); - } - HttpResponse rsp = builder.execute(); - assertNotNull(rsp); - assertEquals(200, rsp.getStatusCode()); - } @SuppressWarnings("unchecked") - static List getUpgradeStatus(HttpRequestBuilder httpClient, String path) throws Exception { - HttpResponse rsp = httpClient.method("GET").path(path).execute(); - Map data = validateAndParse(rsp); - List ret = new ArrayList<>(); - for (String index : data.keySet()) { - Map status = (Map)data.get(index); - assertTrue("missing key size_in_bytes for index " + index, status.containsKey("size_in_bytes")); - Object totalBytes = status.get("size_in_bytes"); - assertTrue("size_in_bytes for index " + index + " is not an integer", totalBytes instanceof Integer); - assertTrue("missing key size_to_upgrade_in_bytes for index " + index, status.containsKey("size_to_upgrade_in_bytes")); - Object toUpgradeBytes = status.get("size_to_upgrade_in_bytes"); - assertTrue("size_to_upgrade_in_bytes for index " + index + " is not an integer", toUpgradeBytes instanceof Integer); - Object toUpgradeBytesAncient = status.get("size_to_upgrade_ancient_in_bytes"); - assertTrue("size_to_upgrade_ancient_in_bytes for index " + index + " is not an integer", toUpgradeBytesAncient instanceof Integer); - ret.add(new UpgradeStatus(index, (Integer) totalBytes, (Integer) toUpgradeBytes, (Integer) toUpgradeBytesAncient)); - } - return ret; - } - - @SuppressWarnings("unchecked") - static Map validateAndParse(HttpResponse rsp) throws Exception { - assertNotNull(rsp); - assertEquals(200, rsp.getStatusCode()); - assertTrue(rsp.hasBody()); - return (Map)new JsonPath(rsp.getBody()).evaluate(""); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(Node.HTTP_ENABLED, true).build(); + static Collection getUpgradeStatus(Client client, String... indices) throws Exception { + UpgradeStatusResponse upgradeStatusResponse = client.admin().indices().prepareUpgradeStatus(indices).get(); + assertNoFailures(upgradeStatusResponse); + return upgradeStatusResponse.getIndices().values(); } }