From a76824e39555eb8f52c62f07ed53c1d71eb3187e Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 23 Oct 2010 17:03:38 +0200 Subject: [PATCH] make index not recovered a bock used in both gateways (shared/blob and local) --- .../elasticsearch/gateway/GatewayService.java | 21 +++++++++++++ .../gateway/local/LocalGateway.java | 31 ++----------------- .../local/LocalGatewayNodeAllocation.java | 5 +-- .../gateway/shared/SharedStorageGateway.java | 28 +++++++++++------ .../gateway/fs/SimpleFsIndexGatewayTests.java | 3 -- 5 files changed, 45 insertions(+), 43 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 4622a49436c..03738709f76 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -27,6 +27,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -35,6 +37,7 @@ import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; import javax.annotation.Nullable; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,6 +51,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener { public static final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL); + public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered", ClusterBlockLevel.READ_WRITE); private final Gateway gateway; @@ -165,6 +169,23 @@ public class GatewayService extends AbstractLifecycleComponent i } }); } + } else { + for (Map.Entry> entry : event.state().blocks().indices().entrySet()) { + final String index = entry.getKey(); + ImmutableSet indexBlocks = entry.getValue(); + if (indexBlocks.contains(GatewayService.INDEX_NOT_RECOVERED_BLOCK)) { + IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index); + if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) { + clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + blocks.removeIndexBlock(index, GatewayService.INDEX_NOT_RECOVERED_BLOCK); + return ClusterState.builder().state(currentState).blocks(blocks).build(); + } + }); + } + } + } } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 7b9d5b6bffc..ca8d0f56a3b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -22,9 +22,6 @@ package org.elasticsearch.gateway.local; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.cluster.*; -import org.elasticsearch.cluster.block.ClusterBlock; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; @@ -44,10 +41,10 @@ import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; import java.io.*; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -64,8 +61,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*; */ public class LocalGateway extends AbstractLifecycleComponent implements Gateway, ClusterStateListener { - public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered (not enough nodes with shards allocated found)", ClusterBlockLevel.READ_WRITE); - private File location; private final ClusterService clusterService; @@ -185,7 +180,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()) .settings(indexMetaData.settings()) .mappingsCompressed(indexMetaData.mappings()) - .blocks(ImmutableSet.of(INDEX_NOT_RECOVERED_BLOCK)) + .blocks(ImmutableSet.of(GatewayService.INDEX_NOT_RECOVERED_BLOCK)) .timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() { @@ -216,7 +211,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements } @Override public void clusterChanged(final ClusterChangedEvent event) { - // nothing to do until we actually recover from hte gateway + // nothing to do until we actually recover from the gateway if (!event.state().metaData().recoveredFromGateway()) { return; } @@ -226,26 +221,6 @@ public class LocalGateway extends AbstractLifecycleComponent implements return; } - // go over the indices, if they are blocked, and all are allocated, update the cluster state that it is no longer blocked - if (event.state().nodes().localNodeMaster()) { - for (Map.Entry> entry : event.state().blocks().indices().entrySet()) { - final String index = entry.getKey(); - ImmutableSet indexBlocks = entry.getValue(); - if (indexBlocks.contains(INDEX_NOT_RECOVERED_BLOCK)) { - IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index); - if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) { - clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - blocks.removeIndexBlock(index, INDEX_NOT_RECOVERED_BLOCK); - return ClusterState.builder().state(currentState).blocks(blocks).build(); - } - }); - } - } - } - } - if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) { executor.execute(new Runnable() { @Override public void run() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 6b743dd4bae..8ccae2550c4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.trove.TObjectIntIterator; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; @@ -86,7 +87,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } for (ShardRouting failedShard : allocation.failedShards()) { IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index()); - if (!allocation.routingNodes().blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) { + if (!allocation.routingNodes().blocks().hasIndexBlock(indexRoutingTable.index(), GatewayService.INDEX_NOT_RECOVERED_BLOCK)) { continue; } @@ -151,7 +152,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { // only do the allocation if there is a local "INDEX NOT RECOVERED" block // we check this here since it helps distinguish between index creation though an API, where the below logic // should not apply, and when recovering from the gateway, where we should apply this logic - if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) { + if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), GatewayService.INDEX_NOT_RECOVERED_BLOCK)) { continue; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java index fe2f94a9b2b..0ee0b7bfe10 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -25,10 +25,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.GatewayService; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -145,17 +147,23 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent