diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index da208265d46..b1a87443b4d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -21,9 +21,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -39,8 +37,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.timer.Timeout; -import org.elasticsearch.common.timer.TimerTask; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; @@ -52,15 +48,12 @@ import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.river.RiverIndexName; -import org.elasticsearch.timer.TimerService; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; @@ -74,31 +67,25 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final Environment environment; - private final TimerService timerService; - private final ClusterService clusterService; private final IndicesService indicesService; private final ShardsAllocation shardsAllocation; - private final NodeIndexCreatedAction nodeIndexCreatedAction; - private final String riverIndexName; - @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, IndicesService indicesService, - ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, @RiverIndexName String riverIndexName) { + @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService, + ShardsAllocation shardsAllocation, @RiverIndexName String riverIndexName) { super(settings); this.environment = environment; - this.timerService = timerService; this.clusterService = clusterService; this.indicesService = indicesService; this.shardsAllocation = shardsAllocation; - this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.riverIndexName = riverIndexName; } - public void createIndex(final Request request, final Listener userListener) { + public void createIndex(final Request request, final Listener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); for (Map.Entry entry : request.settings.getAsMap().entrySet()) { if (!entry.getKey().startsWith("index.")) { @@ -109,9 +96,8 @@ public class MetaDataCreateIndexService extends AbstractComponent { } request.settings(updatedSettingsBuilder.build()); - clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final CreateIndexListener listener = new CreateIndexListener(request, userListener); try { if (currentState.routingTable().hasIndex(request.index)) { listener.onFailure(new IndexAlreadyExistsException(new Index(request.index))); @@ -218,33 +204,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); - final AtomicInteger counter = new AtomicInteger(currentState.nodes().size() - 1); // -1 since we added it on the master already - if (counter.get() == 0) { - // no nodes to add to - listener.onResponse(new Response(true, indexMetaData)); - } else { - - final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() { - @Override public void onNodeIndexCreated(String index, String nodeId) { - if (index.equals(request.index)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true, indexMetaData)); - nodeIndexCreatedAction.remove(this); - } - } - } - }; - nodeIndexCreatedAction.add(nodeIndexCreateListener); - - Timeout timeoutTask = timerService.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - listener.onResponse(new Response(false, indexMetaData)); - nodeIndexCreatedAction.remove(nodeIndexCreateListener); - } - }, request.timeout, TimerService.ExecutionType.THREADED); - listener.timeout = timeoutTask; - } - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); if (!request.blocks.isEmpty()) { for (ClusterBlock block : request.blocks) { @@ -258,6 +217,27 @@ public class MetaDataCreateIndexService extends AbstractComponent { return currentState; } } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); + for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { + routingTableBuilder.add(indexRoutingTable); + } + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) + .initializeEmpty(currentState.metaData().index(request.index)); + routingTableBuilder.add(indexRoutingBuilder); + RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + logger.info("[{}] created and added to cluster_state", request.index); + listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); + } + }); + } }); } @@ -277,58 +257,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - class CreateIndexListener implements Listener { - - private AtomicBoolean notified = new AtomicBoolean(); - - private final Request request; - - private final Listener listener; - - volatile Timeout timeout; - - private CreateIndexListener(Request request, Listener listener) { - this.request = request; - this.listener = listener; - } - - @Override public void onResponse(final Response response) { - if (notified.compareAndSet(false, true)) { - if (timeout != null) { - timeout.cancel(); - } - // do the reroute after indices have been created on all the other nodes so we can query them for some info (like shard allocation) - clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); - for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { - routingTableBuilder.add(indexRoutingTable); - } - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) - .initializeEmpty(currentState.metaData().index(request.index)); - routingTableBuilder.add(indexRoutingBuilder); - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); - return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - logger.info("[{}] created and added to cluster_state", request.index); - listener.onResponse(response); - } - }); - } - } - - @Override public void onFailure(Throwable t) { - if (notified.compareAndSet(false, true)) { - if (timeout != null) { - timeout.cancel(); - } - listener.onFailure(t); - } - } - } - public static interface Listener { void onResponse(Response response); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index d340845005a..c6487321c56 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -21,8 +21,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; +import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -31,15 +30,9 @@ import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.timer.Timeout; -import org.elasticsearch.common.timer.TimerTask; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.timer.TimerService; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.MetaData.*; @@ -49,27 +42,19 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; */ public class MetaDataDeleteIndexService extends AbstractComponent { - private final TimerService timerService; - private final ClusterService clusterService; private final ShardsAllocation shardsAllocation; - private final NodeIndexDeletedAction nodeIndexDeletedAction; - - @Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsAllocation shardsAllocation, - NodeIndexDeletedAction nodeIndexDeletedAction) { + @Inject public MetaDataDeleteIndexService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) { super(settings); - this.timerService = timerService; this.clusterService = clusterService; this.shardsAllocation = shardsAllocation; - this.nodeIndexDeletedAction = nodeIndexDeletedAction; } - public void deleteIndex(final Request request, final Listener userListener) { - clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() { + public void deleteIndex(final Request request, final Listener listener) { + clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final DeleteIndexListener listener = new DeleteIndexListener(request, userListener); try { RoutingTable routingTable = currentState.routingTable(); if (!routingTable.hasIndex(request.index)) { @@ -95,73 +80,19 @@ public class MetaDataDeleteIndexService extends AbstractComponent { ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build(); - final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); - - final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() { - @Override public void onNodeIndexDeleted(String index, String nodeId) { - if (index.equals(request.index)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true)); - nodeIndexDeletedAction.remove(this); - } - } - } - }; - nodeIndexDeletedAction.add(nodeIndexDeleteListener); - - Timeout timeoutTask = timerService.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - listener.onResponse(new Response(false)); - nodeIndexDeletedAction.remove(nodeIndexDeleteListener); - } - }, request.timeout, TimerService.ExecutionType.THREADED); - listener.timeout = timeoutTask; - - return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build(); } catch (Exception e) { listener.onFailure(e); return currentState; } } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + listener.onResponse(new Response(true)); + } }); } - class DeleteIndexListener implements Listener { - - private AtomicBoolean notified = new AtomicBoolean(); - - private final Request request; - - private final Listener listener; - - volatile Timeout timeout; - - private DeleteIndexListener(Request request, Listener listener) { - this.request = request; - this.listener = listener; - } - - @Override public void onResponse(final Response response) { - if (notified.compareAndSet(false, true)) { - if (timeout != null) { - timeout.cancel(); - } - listener.onResponse(response); - } - } - - @Override public void onFailure(Throwable t) { - if (notified.compareAndSet(false, true)) { - if (timeout != null) { - timeout.cancel(); - } - listener.onFailure(t); - } - } - } - - public static interface Listener { void onResponse(Response response); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 96ac707401e..6a0bcb689e7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -22,14 +22,12 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.timer.Timeout; -import org.elasticsearch.common.timer.TimerTask; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; @@ -39,19 +37,14 @@ import org.elasticsearch.index.mapper.MergeMappingException; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.timer.TimerService; import java.io.IOException; import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.common.collect.Maps.*; -import static org.elasticsearch.common.collect.Sets.*; import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*; /** @@ -63,17 +56,10 @@ public class MetaDataMappingService extends AbstractComponent { private final IndicesService indicesService; - private final TimerService timerService; - - private final NodeMappingCreatedAction nodeMappingCreatedAction; - - @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, - TimerService timerService, NodeMappingCreatedAction nodeMappingCreatedAction) { + @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; - this.timerService = timerService; - this.nodeMappingCreatedAction = nodeMappingCreatedAction; } public void updateMapping(final String index, final String type, final CompressedString mappingSource) throws IOException { @@ -114,7 +100,7 @@ public class MetaDataMappingService extends AbstractComponent { } public void removeMapping(final RemoveRequest request) { - clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (request.indices.length == 0) { throw new IndexMissingException(new Index("_all")); @@ -130,13 +116,16 @@ public class MetaDataMappingService extends AbstractComponent { return ClusterState.builder().state(currentState).metaData(builder).build(); } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + // TODO add a listener here! + } }); } - public void putMapping(final PutRequest request, final Listener userListener) { - clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ClusterStateUpdateTask() { + public void putMapping(final PutRequest request, final Listener listener) { + clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final PutMappingListener listener = new PutMappingListener(request, userListener); try { if (request.indices.length == 0) { throw new IndexMissingException(new Index("_all")); @@ -181,7 +170,6 @@ public class MetaDataMappingService extends AbstractComponent { } final Map> mappings = newHashMap(); - int expectedReplies = 0; for (Map.Entry entry : newMappers.entrySet()) { String index = entry.getKey(); // do the actual merge here on the master, and update the mapping source @@ -197,7 +185,6 @@ public class MetaDataMappingService extends AbstractComponent { if (existingSource.equals(updatedSource)) { // same source, no changes, ignore it } else { - expectedReplies += (currentState.nodes().size() - 1); // for this index, on update, don't include the master, since we update it already // use the merged mapping source mappings.put(index, new Tuple(existingMapper.type(), updatedSource)); if (logger.isDebugEnabled()) { @@ -207,7 +194,6 @@ public class MetaDataMappingService extends AbstractComponent { } } } else { - expectedReplies += currentState.nodes().size(); CompressedString newSource = newMapper.mappingSource(); mappings.put(index, new Tuple(newMapper.type(), newSource)); if (logger.isDebugEnabled()) { @@ -236,76 +222,19 @@ public class MetaDataMappingService extends AbstractComponent { } } - if (expectedReplies == 0) { - listener.onResponse(new Response(true)); - } else { - final AtomicInteger counter = new AtomicInteger(expectedReplies); - final Set indicesSet = newHashSet(request.indices); - final NodeMappingCreatedAction.Listener nodeMappingListener = new NodeMappingCreatedAction.Listener() { - @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { - if (indicesSet.contains(response.index()) && response.type().equals(request.mappingType)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true)); - nodeMappingCreatedAction.remove(this); - } - } - } - }; - nodeMappingCreatedAction.add(nodeMappingListener); - - Timeout timeoutTask = timerService.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - listener.onResponse(new Response(false)); - nodeMappingCreatedAction.remove(nodeMappingListener); - } - }, request.timeout, TimerService.ExecutionType.THREADED); - listener.timeout = timeoutTask; - } - - return newClusterStateBuilder().state(currentState).metaData(builder).build(); } catch (Exception e) { listener.onFailure(e); return currentState; } } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + listener.onResponse(new Response(true)); + } }); } - class PutMappingListener implements Listener { - - private AtomicBoolean notified = new AtomicBoolean(); - - private final PutRequest request; - - private final Listener listener; - - volatile Timeout timeout; - - private PutMappingListener(PutRequest request, Listener listener) { - this.request = request; - this.listener = listener; - } - - @Override public void onResponse(final Response response) { - if (notified.compareAndSet(false, true)) { - if (timeout != null) { - timeout.cancel(); - } - listener.onResponse(response); - } - } - - @Override public void onFailure(Throwable t) { - if (notified.compareAndSet(false, true)) { - if (timeout != null) { - timeout.cancel(); - } - listener.onFailure(t); - } - } - } - public static interface Listener { void onResponse(Response response); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 4759dc76c7d..e88df55f2c8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -22,7 +22,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -55,7 +55,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent { } } final Settings settings = updatedSettingsBuilder.build(); - clusterService.submitStateUpdateTask("update-settings", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("update-settings", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { try { boolean changed = false; @@ -77,14 +77,16 @@ public class MetaDataUpdateSettingsService extends AbstractComponent { logger.info("Updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices); - listener.onSuccess(); - return ClusterState.builder().state(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder).build(); } catch (Exception e) { listener.onFailure(e); return currentState; } } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + listener.onSuccess(); + } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index d21622df316..4dd4ff70b74 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -37,10 +37,8 @@ import org.elasticsearch.common.trove.TObjectIntIterator; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.transport.ConnectTransportException; @@ -56,8 +54,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.*; */ public class LocalGatewayNodeAllocation extends NodeAllocation { - private final IndicesService indicesService; - private final TransportNodesListGatewayStartedShards listGatewayStartedShards; private final TransportNodesListShardStoreMetaData listShardStoreMetaData; @@ -68,10 +64,9 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { private final String initialShards; - @Inject public LocalGatewayNodeAllocation(Settings settings, IndicesService indicesService, + @Inject public LocalGatewayNodeAllocation(Settings settings, TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) { super(settings); - this.indicesService = indicesService; this.listGatewayStartedShards = listGatewayStartedShards; this.listShardStoreMetaData = listShardStoreMetaData; @@ -250,14 +245,6 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { Iterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); - InternalIndexService indexService = (InternalIndexService) indicesService.indexService(shard.index()); - if (indexService == null) { - continue; - } - // if the store is not persistent, it makes no sense to test for special allocation - if (!indexService.store().persistent()) { - continue; - } // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing boolean canBeAllocatedToAtLeastOneNode = false; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java index b454b54c456..ca2410a6cd3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -26,7 +26,6 @@ import org.elasticsearch.index.shard.recovery.RecoveryTarget; import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.memory.IndexingMemoryBufferController; -import org.elasticsearch.indices.memory.IndicesMemoryCleaner; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; /** @@ -49,7 +48,6 @@ public class IndicesModule extends AbstractModule { bind(RecoverySource.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton(); - bind(IndicesMemoryCleaner.class).asEagerSingleton(); bind(IndexingMemoryBufferController.class).asEagerSingleton(); bind(IndicesAnalysisService.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndicesMemoryCleaner.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndicesMemoryCleaner.java deleted file mode 100644 index aca1e3d03a2..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndicesMemoryCleaner.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.memory; - -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.FlushNotAllowedEngineException; -import org.elasticsearch.index.service.IndexService; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.service.IndexShard; -import org.elasticsearch.index.shard.service.InternalIndexShard; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.IndicesService; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Set; - -import static org.elasticsearch.common.collect.Sets.*; - -/** - * @author kimchy (shay.banon) - */ -public class IndicesMemoryCleaner extends AbstractComponent { - - private final IndicesService indicesService; - - @Inject public IndicesMemoryCleaner(Settings settings, IndicesService indicesService) { - super(settings); - this.indicesService = indicesService; - } - - public TranslogCleanResult cleanTranslog(int translogNumberOfOperationsThreshold) { - int totalShards = 0; - int cleanedShards = 0; - long cleaned = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (indexShard.state() != IndexShardState.STARTED) { - continue; - } - totalShards++; - Translog translog = ((InternalIndexShard) indexShard).translog(); - if (translog.size() > translogNumberOfOperationsThreshold) { - try { - indexShard.flush(new Engine.Flush()); - cleanedShards++; - cleaned = indexShard.estimateFlushableMemorySize().bytes(); - } catch (FlushNotAllowedEngineException e) { - // ignore this exception, we are not allowed to perform flush - } - } - } - } - return new TranslogCleanResult(totalShards, cleanedShards, new ByteSizeValue(cleaned, ByteSizeUnit.BYTES)); - } - - public void cacheClearUnreferenced() { - for (IndexService indexService : indicesService) { - indexService.cache().clearUnreferenced(); - } - } - - public void cacheClear() { - for (IndexService indexService : indicesService) { - indexService.cache().clear(); - } - } - - public void fullMemoryClean() { - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - try { - indexShard.flush(new Engine.Flush().full(true)); - } catch (FlushNotAllowedEngineException e) { - // ignore this one, its temporal - } catch (IllegalIndexShardStateException e) { - // ignore this one as well - } catch (Exception e) { - logger.warn(indexShard.shardId() + ": Failed to force flush in order to clean memory", e); - } - } - } - } - - public void forceCleanMemory(Set shardsToIgnore) { - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (!shardsToIgnore.contains(indexShard.shardId())) { - try { - indexShard.flush(new Engine.Flush().full(false)); - } catch (FlushNotAllowedEngineException e) { - // ignore this one, its temporal - } catch (IllegalIndexShardStateException e) { - // ignore this one as well - } catch (Exception e) { - logger.warn(indexShard.shardId() + ": Failed to force flush in order to clean memory", e); - } - } - } - } - } - - /** - * Checks if memory needs to be cleaned and cleans it. Returns the amount of memory cleaned. - */ - public MemoryCleanResult cleanMemory(long memoryToClean, ByteSizeValue minimumFlushableSizeToClean) { - int totalShards = 0; - long estimatedFlushableSize = 0; - ArrayList> shards = new ArrayList>(); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (indexShard.state() != IndexShardState.STARTED) { - continue; - } - totalShards++; - ByteSizeValue estimatedSize = indexShard.estimateFlushableMemorySize(); - estimatedFlushableSize += estimatedSize.bytes(); - if (estimatedSize != null) { - shards.add(new Tuple(estimatedSize, indexShard)); - } - } - } - Collections.sort(shards, new Comparator>() { - @Override public int compare(Tuple o1, Tuple o2) { - return (int) (o1.v1().bytes() - o2.v1().bytes()); - } - }); - int cleanedShards = 0; - long cleaned = 0; - Set shardsCleaned = newHashSet(); - for (Tuple tuple : shards) { - if (tuple.v1().bytes() < minimumFlushableSizeToClean.bytes()) { - // we passed the minimum threshold, don't flush - break; - } - try { - tuple.v2().flush(new Engine.Flush()); - } catch (FlushNotAllowedEngineException e) { - // ignore this one, its temporal - } catch (IllegalIndexShardStateException e) { - // ignore this one as well - } catch (Exception e) { - logger.warn(tuple.v2().shardId() + ": Failed to flush in order to clean memory", e); - } - shardsCleaned.add(tuple.v2().shardId()); - cleanedShards++; - cleaned += tuple.v1().bytes(); - if (cleaned > memoryToClean) { - break; - } - } - return new MemoryCleanResult(totalShards, cleanedShards, new ByteSizeValue(estimatedFlushableSize), new ByteSizeValue(cleaned), shardsCleaned); - } - - public static class TranslogCleanResult { - private final int totalShards; - private final int cleanedShards; - private final ByteSizeValue cleaned; - - public TranslogCleanResult(int totalShards, int cleanedShards, ByteSizeValue cleaned) { - this.totalShards = totalShards; - this.cleanedShards = cleanedShards; - this.cleaned = cleaned; - } - - public int totalShards() { - return totalShards; - } - - public int cleanedShards() { - return cleanedShards; - } - - public ByteSizeValue cleaned() { - return cleaned; - } - - @Override public String toString() { - return "cleaned [" + cleaned + "], cleaned_shards [" + cleanedShards + "], total_shards [" + totalShards + "]"; - } - } - - public static class MemoryCleanResult { - private final int totalShards; - private final int cleanedShards; - private final ByteSizeValue estimatedFlushableSize; - private final ByteSizeValue cleaned; - private final Set shardsCleaned; - - public MemoryCleanResult(int totalShards, int cleanedShards, ByteSizeValue estimatedFlushableSize, ByteSizeValue cleaned, Set shardsCleaned) { - this.totalShards = totalShards; - this.cleanedShards = cleanedShards; - this.estimatedFlushableSize = estimatedFlushableSize; - this.cleaned = cleaned; - this.shardsCleaned = shardsCleaned; - } - - public int totalShards() { - return totalShards; - } - - public int cleanedShards() { - return cleanedShards; - } - - public ByteSizeValue estimatedFlushableSize() { - return estimatedFlushableSize; - } - - public ByteSizeValue cleaned() { - return cleaned; - } - - public Set shardsCleaned() { - return this.shardsCleaned; - } - - @Override public String toString() { - return "cleaned [" + cleaned + "], estimated_flushable_size [" + estimatedFlushableSize + "], cleaned_shards [" + cleanedShards + "], total_shards [" + totalShards + "]"; - } - } -} diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java index dad67696a66..26b697aff3e 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java @@ -172,7 +172,8 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().prepareDelete("test").execute().actionGet(); assertThat(deleteIndexResponse.acknowledged(), equalTo(true)); - Thread.sleep(200); + Thread.sleep(500); // wait till the cluster state gets published + clusterState2 = clusterService2.state(); routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2, nullValue()); @@ -303,6 +304,8 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet(); assertThat(deleteIndexResponse.acknowledged(), equalTo(true)); + Thread.sleep(500); // wait till the cluster state gets published + clusterState2 = clusterService2.state(); routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2, nullValue());