From 46d10f1b6f200d1350b694fa35e42fb04e533635 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 5 Oct 2015 08:38:03 -0400 Subject: [PATCH] More progress --- .../cluster/RestoreInProgress.java | 34 ++++++----- .../common/collect/MapBuilder.java | 9 +-- .../org/elasticsearch/index/IndexService.java | 9 ++- .../flush/IndicesSyncedFlushResult.java | 8 ++- .../indices/query/IndicesQueriesRegistry.java | 2 +- .../snapshots/RestoreService.java | 56 +++++++++---------- .../cluster/ClusterStateDiffIT.java | 3 +- .../cluster/serialization/DiffableTests.java | 4 +- .../test/rest/section/ApiCallSection.java | 10 +++- 9 files changed, 71 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 7e07cb6084c..dd7eb9f0c6d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -19,8 +19,11 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; @@ -31,12 +34,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; /** * Meta data about restore processes that are currently executing @@ -115,7 +113,7 @@ public class RestoreInProgress extends AbstractDiffable implements Custo public static class Entry { private final State state; private final SnapshotId snapshotId; - private final Map shards; + private final ImmutableOpenMap shards; private final List indices; /** @@ -124,14 +122,14 @@ public class RestoreInProgress extends AbstractDiffable implements Custo * @param snapshotId snapshot id * @param state current state of the restore process * @param indices list of indices being restored - * @param shards list of shards being restored and thier current restore status + * @param shards map of shards being restored to their current restore status */ - public Entry(SnapshotId snapshotId, State state, List indices, Map shards) { + public Entry(SnapshotId snapshotId, State state, List indices, ImmutableOpenMap shards) { this.snapshotId = snapshotId; this.state = state; this.indices = indices; if (shards == null) { - this.shards = emptyMap(); + this.shards = ImmutableOpenMap.of(); } else { this.shards = shards; } @@ -151,7 +149,7 @@ public class RestoreInProgress extends AbstractDiffable implements Custo * * @return list of shards */ - public Map shards() { + public ImmutableOpenMap shards() { return this.shards; } @@ -419,14 +417,14 @@ public class RestoreInProgress extends AbstractDiffable implements Custo for (int j = 0; j < indices; j++) { indexBuilder.add(in.readString()); } - Map builder = new HashMap<>(); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); int shards = in.readVInt(); for (int j = 0; j < shards; j++) { ShardId shardId = ShardId.readShardId(in); ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in); builder.put(shardId, shardState); } - entries[i] = new Entry(snapshotId, state, Collections.unmodifiableList(indexBuilder), unmodifiableMap(builder)); + entries[i] = new Entry(snapshotId, state, Collections.unmodifiableList(indexBuilder), builder.build()); } return new RestoreInProgress(entries); } @@ -445,9 +443,9 @@ public class RestoreInProgress extends AbstractDiffable implements Custo out.writeString(index); } out.writeVInt(entry.shards().size()); - for (Map.Entry shardEntry : entry.shards().entrySet()) { - shardEntry.getKey().writeTo(out); - shardEntry.getValue().writeTo(out); + for (ObjectObjectCursor shardEntry : entry.shards()) { + shardEntry.key.writeTo(out); + shardEntry.value.writeTo(out); } } } @@ -486,9 +484,9 @@ public class RestoreInProgress extends AbstractDiffable implements Custo builder.endArray(); builder.startArray("shards"); { - for (Map.Entry shardEntry : entry.shards.entrySet()) { - ShardId shardId = shardEntry.getKey(); - ShardRestoreStatus status = shardEntry.getValue(); + for (ObjectObjectCursor shardEntry : entry.shards) { + ShardId shardId = shardEntry.key; + ShardRestoreStatus status = shardEntry.value; builder.startObject(); { builder.field("index", shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/common/collect/MapBuilder.java b/core/src/main/java/org/elasticsearch/common/collect/MapBuilder.java index 3c77c628c05..290efb2a1a6 100644 --- a/core/src/main/java/org/elasticsearch/common/collect/MapBuilder.java +++ b/core/src/main/java/org/elasticsearch/common/collect/MapBuilder.java @@ -19,11 +19,11 @@ package org.elasticsearch.common.collect; +import com.google.common.collect.ImmutableMap; + import java.util.HashMap; import java.util.Map; -import static java.util.Collections.unmodifiableMap; - /** * */ @@ -83,7 +83,8 @@ public class MapBuilder { return this.map; } - public Map immutableMap() { - return unmodifiableMap(new HashMap<>(map)); + public ImmutableMap immutableMap() { + // Note that this whole method is going to have to go next but we're changing it like this here just to keep the commit smaller. + return ImmutableMap.builder().putAll(map).build(); } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 0aac941f727..d9a046e230e 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -19,8 +19,6 @@ package org.elasticsearch.index; -import com.google.common.collect.ImmutableMap; - import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; @@ -75,6 +73,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; /** @@ -410,11 +409,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone return; } logger.debug("[{}] closing... (reason: [{}])", shardId, reason); - HashMap tmpShardsMap = new HashMap<>(shards); - IndexShardInjectorPair indexShardInjectorPair = tmpShardsMap.remove(shardId); + HashMap newShards = new HashMap<>(shards); + IndexShardInjectorPair indexShardInjectorPair = newShards.remove(shardId); indexShard = indexShardInjectorPair.getIndexShard(); shardInjector = indexShardInjectorPair.getInjector(); - shards = ImmutableMap.copyOf(tmpShardsMap); + shards = unmodifiableMap(newShards); closeShardInjector(reason, sId, shardInjector, indexShard); logger.debug("[{}] closed (reason: [{}])", shardId, reason); } diff --git a/core/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java b/core/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java index 54ec76e7a30..435c0d138cd 100644 --- a/core/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java +++ b/core/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.indices.flush; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.ToXContent; @@ -30,6 +29,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import static java.util.Collections.unmodifiableMap; + /** * The result of performing a sync flush operation on all shards of multiple indices */ @@ -40,7 +41,10 @@ public class IndicesSyncedFlushResult implements ToXContent { public IndicesSyncedFlushResult(Map> shardsResultPerIndex) { - this.shardsResultPerIndex = ImmutableMap.copyOf(shardsResultPerIndex); + // shardsResultPerIndex is never modified after it is passed to this + // constructor so this is safe even though shardsResultPerIndex is a + // ConcurrentHashMap + this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); this.shardCounts = calculateShardCounts(Iterables.flatten(shardsResultPerIndex.values())); } diff --git a/core/src/main/java/org/elasticsearch/indices/query/IndicesQueriesRegistry.java b/core/src/main/java/org/elasticsearch/indices/query/IndicesQueriesRegistry.java index 450718b8879..0cec415d63b 100644 --- a/core/src/main/java/org/elasticsearch/indices/query/IndicesQueriesRegistry.java +++ b/core/src/main/java/org/elasticsearch/indices/query/IndicesQueriesRegistry.java @@ -37,7 +37,7 @@ public class IndicesQueriesRegistry extends AbstractComponent { private Map> queryParsers; @Inject - public IndicesQueriesRegistry(Settings settings, Set> injectedQueryParsers, NamedWriteableRegistry namedWriteableRegistry) { + public IndicesQueriesRegistry(Settings settings, Set injectedQueryParsers, NamedWriteableRegistry namedWriteableRegistry) { super(settings); Map> queryParsers = new HashMap<>(); for (QueryParser queryParser : injectedQueryParsers) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 37cdfbed9a1..c709daad54e 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.settings.ClusterDynamicSettings; import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -89,8 +90,6 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; @@ -233,11 +232,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); - Map shards; + ImmutableOpenMap shards; Set aliases = new HashSet<>(); if (!renamedIndices.isEmpty()) { // We have some indices to restore - Map shardsBuilder = new HashMap<>(); + ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); for (Map.Entry indexEntry : renamedIndices.entrySet()) { String index = indexEntry.getValue(); boolean partial = checkPartial(index); @@ -308,11 +307,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } - shards = unmodifiableMap(shardsBuilder); + shards = shardsBuilder.build(); RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshotId, RestoreInProgress.State.INIT, Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards); builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry)); } else { - shards = emptyMap(); + shards = ImmutableOpenMap.of(); } checkAliasNameConflicts(renamedIndices, aliases); @@ -533,7 +532,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() { private final List drainedRequests = new ArrayList<>(); - private Map>> batchedRestoreInfo = null; + private Map>> batchedRestoreInfo = null; @Override public ClusterState execute(ClusterState currentState) { @@ -556,7 +555,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis int changedCount = 0; final List entries = new ArrayList<>(); for (RestoreInProgress.Entry entry : restore.entries()) { - Map shards = null; + ImmutableOpenMap.Builder shardsBuilder = null; for (int i = 0; i < batchSize; i++) { final UpdateIndexShardRestoreStatusRequest updateSnapshotState = drainedRequests.get(i); @@ -564,17 +563,18 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis if (entry.snapshotId().equals(updateSnapshotState.snapshotId())) { logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshotId(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (shards == null) { - shards = new HashMap<>(entry.shards()); + if (shardsBuilder == null) { + shardsBuilder = ImmutableOpenMap.builder(entry.shards()); } - shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); + shardsBuilder.put(updateSnapshotState.shardId(), updateSnapshotState.status()); changedCount++; } } - if (shards != null) { + if (shardsBuilder != null) { + ImmutableOpenMap shards = shardsBuilder.build(); if (!completed(shards)) { - entries.add(new RestoreInProgress.Entry(entry.snapshotId(), RestoreInProgress.State.STARTED, entry.indices(), unmodifiableMap(shards))); + entries.add(new RestoreInProgress.Entry(entry.snapshotId(), RestoreInProgress.State.STARTED, entry.indices(), shards)); } else { logger.info("restore [{}] is done", entry.snapshotId()); if (batchedRestoreInfo == null) { @@ -611,15 +611,15 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (batchedRestoreInfo != null) { - for (final Entry>> entry : batchedRestoreInfo.entrySet()) { + for (final Entry>> entry : batchedRestoreInfo.entrySet()) { final SnapshotId snapshotId = entry.getKey(); final RestoreInfo restoreInfo = entry.getValue().v1(); - final Map shards = entry.getValue().v2(); + final ImmutableOpenMap shards = entry.getValue().v2(); RoutingTable routingTable = newState.getRoutingTable(); final List waitForStarted = new ArrayList<>(); - for (Map.Entry shard : shards.entrySet()) { - if (shard.getValue().state() == RestoreInProgress.State.SUCCESS ) { - ShardId shardId = shard.getKey(); + for (ObjectObjectCursor shard : shards) { + if (shard.value.state() == RestoreInProgress.State.SUCCESS ) { + ShardId shardId = shard.key; ShardRouting shardRouting = findPrimaryShard(routingTable, shardId); if (shardRouting != null && !shardRouting.active()) { logger.trace("[{}][{}] waiting for the shard to start", snapshotId, shardId); @@ -679,19 +679,19 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis }); } - private boolean completed(Map shards) { - for (RestoreInProgress.ShardRestoreStatus status : shards.values()) { - if (!status.state().completed()) { + private boolean completed(ImmutableOpenMap shards) { + for (ObjectCursor status : shards.values()) { + if (!status.value.state().completed()) { return false; } } return true; } - private int failedShards(Map shards) { + private int failedShards(ImmutableOpenMap shards) { int failedShards = 0; - for (RestoreInProgress.ShardRestoreStatus status : shards.values()) { - if (status.state() == RestoreInProgress.State.FAILURE) { + for (ObjectCursor status : shards.values()) { + if (status.value.state() == RestoreInProgress.State.FAILURE) { failedShards++; } } @@ -746,13 +746,13 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis // Some indices were deleted, let's make sure all indices that we are restoring still exist for (RestoreInProgress.Entry entry : restore.entries()) { List shardsToFail = null; - for (Map.Entry shard : entry.shards().entrySet()) { - if (!shard.getValue().state().completed()) { - if (!event.state().metaData().hasIndex(shard.getKey().getIndex())) { + for (ObjectObjectCursor shard : entry.shards()) { + if (!shard.value.state().completed()) { + if (!event.state().metaData().hasIndex(shard.key.getIndex())) { if (shardsToFail == null) { shardsToFail = new ArrayList<>(); } - shardsToFail.add(shard.getKey()); + shardsToFail.add(shard.key); } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index 2c6884bd4ab..ac2182c14db 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -58,7 +58,6 @@ import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static org.elasticsearch.cluster.metadata.AliasMetaData.newAliasMetaDataBuilder; import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange; import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason; @@ -680,7 +679,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase { new SnapshotId(randomName("repo"), randomName("snap")), RestoreInProgress.State.fromValue((byte) randomIntBetween(0, 3)), emptyList(), - emptyMap())); + ImmutableOpenMap.of())); default: throw new IllegalArgumentException("Shouldn't be here"); } diff --git a/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java b/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java index fe782f12a75..63cf44ca564 100644 --- a/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.serialization; import com.google.common.collect.ImmutableMap; + import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static java.util.Collections.unmodifiableMap; import static org.hamcrest.CoreMatchers.equalTo; public class DiffableTests extends ESTestCase { @@ -52,7 +54,7 @@ public class DiffableTests extends ESTestCase { map.remove("bar"); map.put("baz", new TestDiffable("4")); map.put("new", new TestDiffable("5")); - ImmutableMap after = ImmutableMap.copyOf(map); + Map after = unmodifiableMap(new HashMap<>(map)); Diff diff = DiffableUtils.diff(before, after); BytesStreamOutput out = new BytesStreamOutput(); diff.writeTo(out); diff --git a/core/src/test/java/org/elasticsearch/test/rest/section/ApiCallSection.java b/core/src/test/java/org/elasticsearch/test/rest/section/ApiCallSection.java index 852d71cc43a..da6c0b3be2c 100644 --- a/core/src/test/java/org/elasticsearch/test/rest/section/ApiCallSection.java +++ b/core/src/test/java/org/elasticsearch/test/rest/section/ApiCallSection.java @@ -18,9 +18,13 @@ */ package org.elasticsearch.test.rest.section; -import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import java.util.*; +import static java.util.Collections.unmodifiableMap; /** * Represents a test fragment that contains the information needed to call an api @@ -41,7 +45,7 @@ public class ApiCallSection { public Map getParams() { //make sure we never modify the parameters once returned - return ImmutableMap.copyOf(params); + return unmodifiableMap(params); } public void addParam(String key, String value) {