From c2ee6dd120ab7cf5b81df103ebc2522675d2a4bd Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 17 Apr 2011 18:39:00 +0300 Subject: [PATCH] improve gateway recovery and applying indices, also improve speed of index creation --- .../create/TransportCreateIndexAction.java | 2 +- .../cluster/metadata/IndexMetaData.java | 23 ++-- .../cluster/metadata/MappingMetaData.java | 34 ++++- .../metadata/MetaDataCreateIndexService.java | 96 +++++--------- .../elasticsearch/gateway/GatewayService.java | 125 ++++++------------ .../XContentDocumentMapperParser.java | 1 + .../local/LocalGatewayIndexStateTests.java | 4 +- 7 files changed, 116 insertions(+), 169 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 6ccc8deda01..8626b42636b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -79,7 +79,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi final AtomicReference responseRef = new AtomicReference(); final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.API, cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() { + createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() { @Override public void onResponse(MetaDataCreateIndexService.Response response) { responseRef.set(new CreateIndexResponse(response.acknowledged())); latch.countDown(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index b3329f6a6f9..7fec8bf353b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ImmutableSettings; @@ -243,13 +242,19 @@ public class IndexMetaData { return this; } - public Builder putMapping(MappingMetaData mappingMd) { - mappings.put(mappingMd.type(), mappingMd); + public Builder putMapping(String type, String source) throws IOException { + XContentParser parser = XContentFactory.xContent(source).createParser(source); + try { + putMapping(new MappingMetaData(type, parser.map())); + } finally { + parser.close(); + } return this; } - public Builder putMapping(String mappingType, String mappingSource) throws IOException { - return putMapping(new MappingMetaData(mappingType, new CompressedString(mappingSource))); + public Builder putMapping(MappingMetaData mappingMd) { + mappings.put(mappingMd.type(), mappingMd); + return this; } public Builder state(State state) { @@ -308,13 +313,7 @@ public class IndexMetaData { Map mapping = parser.map(); if (mapping.size() == 1) { String mappingType = mapping.keySet().iterator().next(); - String mappingSource = XContentFactory.jsonBuilder().map(mapping).string(); - - if (mappingSource == null) { - // crap, no mapping source, warn? - } else { - builder.putMapping(new MappingMetaData(mappingType, new CompressedString(mappingSource))); - } + builder.putMapping(new MappingMetaData(mappingType, mapping)); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 7dc73013fe2..e3fd4b15d38 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -23,16 +23,24 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DocumentMapper; import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.support.XContentMapValues.*; /** * @author kimchy (shay.banon) */ public class MappingMetaData { + private static ESLogger logger = ESLoggerFactory.getLogger(MappingMetaData.class.getName()); + public static class Routing { public static final Routing EMPTY = new Routing(false, null); @@ -82,10 +90,30 @@ public class MappingMetaData { this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path()); } - public MappingMetaData(String type, CompressedString source) { + public MappingMetaData(String type, Map mapping) throws IOException { this.type = type; - this.source = source; - this.routing = Routing.EMPTY; + this.source = new CompressedString(XContentFactory.jsonBuilder().map(mapping).string()); + Map withoutType = mapping; + if (mapping.size() == 1 && mapping.containsKey(type)) { + withoutType = (Map) mapping.get(type); + } + if (withoutType.containsKey("_routing")) { + boolean required = false; + String path = null; + Map routingNode = (Map) withoutType.get("_routing"); + for (Map.Entry entry : routingNode.entrySet()) { + String fieldName = Strings.toUnderscoreCase(entry.getKey()); + Object fieldNode = entry.getValue(); + if (fieldName.equals("required")) { + required = nodeBooleanValue(fieldNode); + } else if (fieldName.equals("path")) { + path = fieldNode.toString(); + } + } + this.routing = new Routing(required, path); + } else { + this.routing = Routing.EMPTY; + } } MappingMetaData(String type, CompressedString source, Routing routing) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index e49a56bb012..7bec49abfb5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -105,21 +105,17 @@ public class MetaDataCreateIndexService extends AbstractComponent { clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { try { - if (request.origin == Request.Origin.API) { - try { - validate(request, currentState); - } catch (Exception e) { - listener.onFailure(e); - return currentState; - } + try { + validate(request, currentState); + } catch (Exception e) { + listener.onFailure(e); + return currentState; } List templates = ImmutableList.of(); // we only find a template when its an API call (a new index) - if (request.origin == Request.Origin.API) { - // find templates, highest order are better matching - templates = findTemplates(request, currentState); - } + // find templates, highest order are better matching + templates = findTemplates(request, currentState); // add the request mapping Map> mappings = Maps.newHashMap(); @@ -139,20 +135,18 @@ public class MetaDataCreateIndexService extends AbstractComponent { } // now add config level mappings - if (request.origin == Request.Origin.API) { - File mappingsDir = new File(environment.configFile(), "mappings"); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - // first index level - File indexMappingsDir = new File(mappingsDir, request.index); - if (indexMappingsDir.exists() && indexMappingsDir.isDirectory()) { - addMappings(mappings, indexMappingsDir); - } + File mappingsDir = new File(environment.configFile(), "mappings"); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + // first index level + File indexMappingsDir = new File(mappingsDir, request.index); + if (indexMappingsDir.exists() && indexMappingsDir.isDirectory()) { + addMappings(mappings, indexMappingsDir); + } - // second is the _default mapping - File defaultMappingsDir = new File(mappingsDir, "_default"); - if (defaultMappingsDir.exists() && defaultMappingsDir.isDirectory()) { - addMappings(mappings, defaultMappingsDir); - } + // second is the _default mapping + File defaultMappingsDir = new File(mappingsDir, "_default"); + if (defaultMappingsDir.exists() && defaultMappingsDir.isDirectory()) { + addMappings(mappings, defaultMappingsDir); } } @@ -249,7 +243,18 @@ public class MetaDataCreateIndexService extends AbstractComponent { blocks.addIndexBlock(request.index, MetaDataStateIndexService.INDEX_CLOSED_BLOCK); } - return newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build(); + ClusterState updatedState = newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build(); + + if (request.state == State.OPEN) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()); + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) + .initializeEmpty(updatedState.metaData().index(request.index), true); + routingTableBuilder.add(indexRoutingBuilder); + RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); + updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); + } + + return updatedState; } catch (Exception e) { logger.warn("[{}] failed to create", e, request.index); listener.onFailure(e); @@ -258,29 +263,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { } @Override public void clusterStateProcessed(ClusterState clusterState) { - if (request.state == State.CLOSE) { // no need to do shard allocated when closed... - listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); - return; - } - if (!request.rerouteAfterCreation) { - listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); - return; - } - clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable()); - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) - .initializeEmpty(currentState.metaData().index(request.index), request.origin == Request.Origin.API); - routingTableBuilder.add(indexRoutingBuilder); - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); - return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - logger.info("[{}] created and added to cluster_state", request.index); - listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); - } - }); + listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); } }); } @@ -360,13 +343,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { public static class Request { - public static enum Origin { - API, - GATEWAY - } - - final Origin origin; - final String cause; final String index; @@ -381,10 +357,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { Set blocks = Sets.newHashSet(); - boolean rerouteAfterCreation = true; - - public Request(Origin origin, String cause, String index) { - this.origin = origin; + public Request(String cause, String index) { this.cause = cause; this.index = index; } @@ -427,11 +400,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.timeout = timeout; return this; } - - public Request rerouteAfterCreation(boolean rerouteAfterCreation) { - this.rerouteAfterCreation = rerouteAfterCreation; - return this; - } } public static class Response { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index b5d87f88553..af64da0f14c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -24,10 +24,7 @@ import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.*; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -40,15 +37,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.MetaData.*; -import static org.elasticsearch.common.unit.TimeValue.*; /** * @author kimchy (shay.banon) @@ -223,9 +217,16 @@ public class GatewayService extends AbstractLifecycleComponent i } @Override public void onSuccess(final ClusterState recoveredState) { - final AtomicInteger indicesCounter = new AtomicInteger(recoveredState.metaData().indices().size()); clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + assert currentState.metaData().indices().isEmpty(); + + // remove the block, since we recovered from gateway + ClusterBlocks.Builder blocks = ClusterBlocks.builder() + .blocks(currentState.blocks()) + .blocks(recoveredState.blocks()) + .removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK); + MetaData.Builder metaDataBuilder = newMetaDataBuilder() .metaData(currentState.metaData()); @@ -234,59 +235,39 @@ public class GatewayService extends AbstractLifecycleComponent i metaDataBuilder.put(entry.getValue()); } - return newClusterStateBuilder().state(currentState) + for (IndexMetaData indexMetaData : recoveredState.metaData()) { + metaDataBuilder.put(indexMetaData); + if (indexMetaData.state() == IndexMetaData.State.CLOSE) { + blocks.addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK); + } + } + + // update the state to reflect the new metadata and routing + ClusterState updatedState = newClusterStateBuilder().state(currentState) .version(recoveredState.version()) - .metaData(metaDataBuilder).build(); + .blocks(blocks) + .metaData(metaDataBuilder) + .build(); + + // initialize all index routing tables as empty + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()); + for (IndexMetaData indexMetaData : updatedState.metaData().indices().values()) { + if (indexMetaData.state() == IndexMetaData.State.OPEN) { + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index()) + .initializeEmpty(updatedState.metaData().index(indexMetaData.index()), false /*not from API*/); + routingTableBuilder.add(indexRoutingBuilder); + } + } + + // now, reroute + RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); + + return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { - if (recoveredState.metaData().indices().isEmpty()) { - markMetaDataAsReadFromGateway("success"); - latch.countDown(); - return; - } - // go over the meta data and create indices, we don't really need to copy over - // the meta data per index, since we create the index and it will be added automatically - - // also, don't reroute (or even initialize the routing table) for the indices created, we will do it - // in one batch once creating those indices is done - for (final IndexMetaData indexMetaData : recoveredState.metaData()) { - try { - createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index()) - .settings(indexMetaData.settings()) - .mappingsMetaData(indexMetaData.mappings()) - .state(indexMetaData.state()) - .rerouteAfterCreation(false) - .timeout(timeValueSeconds(30)), - - new MetaDataCreateIndexService.Listener() { - @Override public void onResponse(MetaDataCreateIndexService.Response response) { - if (indicesCounter.decrementAndGet() == 0) { - markMetaDataAsReadFromGateway("success"); - latch.countDown(); - } - } - - @Override public void onFailure(Throwable t) { - logger.error("failed to create index [{}]", t, indexMetaData.index()); - // we report success on index creation failure and do nothing - // should we disable writing the updated metadata? - if (indicesCounter.decrementAndGet() == 0) { - markMetaDataAsReadFromGateway("success"); - latch.countDown(); - } - } - }); - } catch (IOException e) { - logger.error("failed to create index [{}]", e, indexMetaData.index()); - // we report success on index creation failure and do nothing - // should we disable writing the updated metadata? - if (indicesCounter.decrementAndGet() == 0) { - markMetaDataAsReadFromGateway("success"); - latch.countDown(); - } - } - } + logger.info("recovered [{}] indices into cluster_state, allocating", clusterState.metaData().indices().size()); + latch.countDown(); } }); } @@ -296,34 +277,4 @@ public class GatewayService extends AbstractLifecycleComponent i logger.error("failed recover state, blocking...", t); } } - - private void markMetaDataAsReadFromGateway(String reason) { - clusterService.submitStateUpdateTask("gateway (marked as read, reroute, reason=" + reason + ")", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { - return currentState; - } - // remove the block, since we recovered from gateway - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK); - - // initialize all index routing tables as empty - RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable()); - for (IndexMetaData indexMetaData : currentState.metaData().indices().values()) { - if (indexMetaData.state() == IndexMetaData.State.OPEN) { - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index()) - .initializeEmpty(currentState.metaData().index(indexMetaData.index()), false); - routingTableBuilder.add(indexRoutingBuilder); - } - } - - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); - - return newClusterStateBuilder().state(currentState).blocks(blocks).routingResult(routingResult).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - logger.info("all indices created and rerouting has begun"); - } - }); - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java index d4dea6df121..e69af849e0d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java @@ -221,6 +221,7 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme return builder; } + // NOTE, we also parse this in MappingMetaData private RoutingFieldMapper.Builder parseRoutingField(Map routingNode, XContentMapper.TypeParser.ParserContext parserContext) { RoutingFieldMapper.Builder builder = routing(); parseField(builder, builder.name, routingNode, parserContext); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java index 2695aee5e0a..2778a5bb8d3 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java @@ -146,7 +146,7 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { logger.info("--> trying to index into a closed index ..."); try { - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet(); + client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet(); assert false; } catch (ClusterBlockException e) { // all is well @@ -200,7 +200,7 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { logger.info("--> trying to index into a closed index ..."); try { - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet(); + client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet(); assert false; } catch (ClusterBlockException e) { // all is well