make index not recovered a bock used in both gateways (shared/blob and local)

This commit is contained in:
kimchy 2010-10-23 17:03:38 +02:00
parent 3f8c03db95
commit a76824e395
5 changed files with 45 additions and 43 deletions

View File

@ -27,6 +27,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -35,6 +37,7 @@ import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -48,6 +51,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener {
public static final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL);
public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered", ClusterBlockLevel.READ_WRITE);
private final Gateway gateway;
@ -165,6 +169,23 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
});
}
} else {
for (Map.Entry<String, ImmutableSet<ClusterBlock>> entry : event.state().blocks().indices().entrySet()) {
final String index = entry.getKey();
ImmutableSet<ClusterBlock> indexBlocks = entry.getValue();
if (indexBlocks.contains(GatewayService.INDEX_NOT_RECOVERED_BLOCK)) {
IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index);
if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) {
clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
blocks.removeIndexBlock(index, GatewayService.INDEX_NOT_RECOVERED_BLOCK);
return ClusterState.builder().state(currentState).blocks(blocks).build();
}
});
}
}
}
}
}
}

View File

@ -22,9 +22,6 @@ package org.elasticsearch.gateway.local;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
@ -44,10 +41,10 @@ import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import java.io.*;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -64,8 +61,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
*/
public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered (not enough nodes with shards allocated found)", ClusterBlockLevel.READ_WRITE);
private File location;
private final ClusterService clusterService;
@ -185,7 +180,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index())
.settings(indexMetaData.settings())
.mappingsCompressed(indexMetaData.mappings())
.blocks(ImmutableSet.of(INDEX_NOT_RECOVERED_BLOCK))
.blocks(ImmutableSet.of(GatewayService.INDEX_NOT_RECOVERED_BLOCK))
.timeout(timeValueSeconds(30)),
new MetaDataCreateIndexService.Listener() {
@ -216,7 +211,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
// nothing to do until we actually recover from hte gateway
// nothing to do until we actually recover from the gateway
if (!event.state().metaData().recoveredFromGateway()) {
return;
}
@ -226,26 +221,6 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
return;
}
// go over the indices, if they are blocked, and all are allocated, update the cluster state that it is no longer blocked
if (event.state().nodes().localNodeMaster()) {
for (Map.Entry<String, ImmutableSet<ClusterBlock>> entry : event.state().blocks().indices().entrySet()) {
final String index = entry.getKey();
ImmutableSet<ClusterBlock> indexBlocks = entry.getValue();
if (indexBlocks.contains(INDEX_NOT_RECOVERED_BLOCK)) {
IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index);
if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) {
clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
blocks.removeIndexBlock(index, INDEX_NOT_RECOVERED_BLOCK);
return ClusterState.builder().state(currentState).blocks(blocks).build();
}
});
}
}
}
}
if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) {
executor.execute(new Runnable() {
@Override public void run() {

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.trove.TObjectIntIterator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
@ -86,7 +87,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
for (ShardRouting failedShard : allocation.failedShards()) {
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index());
if (!allocation.routingNodes().blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
if (!allocation.routingNodes().blocks().hasIndexBlock(indexRoutingTable.index(), GatewayService.INDEX_NOT_RECOVERED_BLOCK)) {
continue;
}
@ -151,7 +152,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
// only do the allocation if there is a local "INDEX NOT RECOVERED" block
// we check this here since it helps distinguish between index creation though an API, where the below logic
// should not apply, and when recovering from the gateway, where we should apply this logic
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), GatewayService.INDEX_NOT_RECOVERED_BLOCK)) {
continue;
}

View File

@ -25,10 +25,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.gateway.GatewayService;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@ -145,17 +147,23 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : fMetaData) {
try {
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappingsCompressed(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index())
.settings(indexMetaData.settings())
.mappingsCompressed(indexMetaData.mappings())
.blocks(ImmutableSet.of(GatewayService.INDEX_NOT_RECOVERED_BLOCK))
.timeout(timeValueSeconds(30)),
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", indexMetaData.index(), t);
}
});
new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", indexMetaData.index(), t);
}
});
} catch (IOException e) {
logger.error("failed to create index [{}]", indexMetaData.index(), e);
}

View File

@ -24,7 +24,4 @@ package org.elasticsearch.test.integration.gateway.fs;
*/
public class SimpleFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
@Override public void testSnapshotOperations() throws Exception {
super.testSnapshotOperations(); //To change body of overridden methods use File | Settings | File Templates.
}
}