From f2e39e4ee21b77b0c021b35cfbc2ff47952c5d77 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 29 Jun 2012 01:01:26 +0200 Subject: [PATCH] Auto import dangling indices, closes #2067. --- .../cluster/block/ClusterBlocks.java | 21 ++ .../cluster/metadata/IndexMetaData.java | 5 + .../metadata/MetaDataCreateIndexService.java | 7 +- .../metadata/MetaDataStateIndexService.java | 7 +- .../cluster/routing/RoutingTable.java | 10 + .../elasticsearch/gateway/GatewayService.java | 24 +- .../gateway/local/LocalGatewayModule.java | 2 + .../meta/LocalAllocateDangledIndices.java | 233 ++++++++++++++++++ .../state/meta/LocalGatewayMetaState.java | 166 +++++++++---- .../state/shards/LocalGatewayShardsState.java | 102 ++++---- ...ransportNodesListGatewayStartedShards.java | 15 +- .../indices/store/IndicesStore.java | 3 +- .../local/LocalGatewayIndexStateTests.java | 160 +++++++++++- 13 files changed, 609 insertions(+), 146 deletions(-) create mode 100644 src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java diff --git a/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index b2aa10b8709..9966c84b8d8 100644 --- a/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.rest.RestStatus; @@ -233,6 +235,25 @@ public class ClusterBlocks { return this; } + public Builder addBlocks(IndexMetaData indexMetaData) { + if (indexMetaData.state() == IndexMetaData.State.CLOSE) { + addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK); + } + if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_READ_ONLY, false)) { + addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_ONLY_BLOCK); + } + if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_READ, false)) { + addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_BLOCK); + } + if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_WRITE, false)) { + addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_WRITE_BLOCK); + } + if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_METADATA, false)) { + addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_METADATA_BLOCK); + } + return this; + } + public Builder addGlobalBlock(ClusterBlock block) { global.add(block); return this; diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index c3cb3aef5ee..7f3adef50a1 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -410,6 +410,11 @@ public class IndexMetaData { return index; } + public Builder index(String name) { + this.index = index; + return this; + } + public Builder numberOfShards(int numberOfShards) { settings = settingsBuilder().put(settings).put(SETTING_NUMBER_OF_SHARDS, numberOfShards).build(); return this; diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 889d116bd40..f7302db953b 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -319,10 +318,8 @@ public class MetaDataCreateIndexService extends AbstractComponent { ClusterState updatedState = newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build(); if (request.state == State.OPEN) { - RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()); - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) - .initializeEmpty(updatedState.metaData().index(request.index), true); - routingTableBuilder.add(indexRoutingBuilder); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()) + .add(updatedState.metaData().index(request.index), true); RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index 6a6d89996f0..438033e6081 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -127,10 +126,8 @@ public class MetaDataStateIndexService extends AbstractComponent { ClusterState updatedState = ClusterState.builder().state(currentState).metaData(mdBuilder).blocks(blocks).build(); - RoutingTable.Builder rtBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()); - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) - .initializeEmpty(updatedState.metaData().index(request.index), false); - rtBuilder.add(indexRoutingBuilder); + RoutingTable.Builder rtBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()) + .add(updatedState.metaData().index(request.index), false); RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 50a212b3b3f..98292845988 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing; import com.google.common.collect.*; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -340,6 +341,15 @@ public class RoutingTable implements Iterable { return this; } + public Builder add(IndexMetaData indexMetaData, boolean fromApi) { + if (indexMetaData.state() == IndexMetaData.State.OPEN) { + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index()) + .initializeEmpty(indexMetaData, fromApi); + add(indexRoutingBuilder); + } + return this; + } + public Builder add(IndexRoutingTable indexRoutingTable) { indexRoutingTable.validate(); indicesRouting.put(indexRoutingTable.index(), indexRoutingTable); diff --git a/src/main/java/org/elasticsearch/gateway/GatewayService.java b/src/main/java/org/elasticsearch/gateway/GatewayService.java index 0e0ba7fd371..77c0f731078 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -26,9 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -256,21 +254,7 @@ public class GatewayService extends AbstractLifecycleComponent i for (IndexMetaData indexMetaData : recoveredState.metaData()) { metaDataBuilder.put(indexMetaData, false); - if (indexMetaData.state() == IndexMetaData.State.CLOSE) { - blocks.addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK); - } - if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_READ_ONLY, false)) { - blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_ONLY_BLOCK); - } - if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_READ, false)) { - blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_BLOCK); - } - if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_WRITE, false)) { - blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_WRITE_BLOCK); - } - if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_METADATA, false)) { - blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_METADATA_BLOCK); - } + blocks.addBlocks(indexMetaData); } // update the state to reflect the new metadata and routing @@ -282,11 +266,7 @@ public class GatewayService extends AbstractLifecycleComponent i // initialize all index routing tables as empty RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable()); for (IndexMetaData indexMetaData : updatedState.metaData().indices().values()) { - if (indexMetaData.state() == IndexMetaData.State.OPEN) { - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index()) - .initializeEmpty(updatedState.metaData().index(indexMetaData.index()), false /*not from API*/); - routingTableBuilder.add(indexRoutingBuilder); - } + routingTableBuilder.add(indexMetaData, false /* not from API */); } // start with 0 based versions for routing table routingTableBuilder.version(0); diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java index 36f8465f36a..a5c0b580b9e 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.PreProcessModule; import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.local.state.meta.LocalAllocateDangledIndices; import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState; import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaState; import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState; @@ -41,6 +42,7 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu bind(TransportNodesListGatewayMetaState.class).asEagerSingleton(); bind(LocalGatewayMetaState.class).asEagerSingleton(); bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton(); + bind(LocalAllocateDangledIndices.class).asEagerSingleton(); } @Override diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java new file mode 100644 index 00000000000..79b6d7a416b --- /dev/null +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java @@ -0,0 +1,233 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local.state.meta; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.Arrays; + +import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; + +/** + */ +public class LocalAllocateDangledIndices extends AbstractComponent { + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final AllocationService allocationService; + + @Inject + public LocalAllocateDangledIndices(Settings settings, TransportService transportService, ClusterService clusterService, AllocationService allocationService) { + super(settings); + this.transportService = transportService; + this.clusterService = clusterService; + this.allocationService = allocationService; + transportService.registerHandler(new AllocateDangledRequestHandler()); + } + + public void allocateDangled(IndexMetaData[] indices, final Listener listener) { + ClusterState clusterState = clusterService.state(); + DiscoveryNode masterNode = clusterState.nodes().masterNode(); + if (masterNode == null) { + listener.onFailure(new MasterNotDiscoveredException("no master to send allocate dangled request")); + return; + } + AllocateDangledRequest request = new AllocateDangledRequest(clusterState.nodes().localNode(), indices); + transportService.sendRequest(masterNode, AllocateDangledRequestHandler.ACTION, request, new TransportResponseHandler() { + @Override + public AllocateDangledResponse newInstance() { + return new AllocateDangledResponse(); + } + + @Override + public void handleResponse(AllocateDangledResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + + public static interface Listener { + void onResponse(AllocateDangledResponse response); + + void onFailure(Throwable e); + } + + class AllocateDangledRequestHandler implements ActionTransportRequestHandler { + + public static final String ACTION = "/gateway/local/allocate_dangled"; + + @Override + public String action() { + return ACTION; + } + + @Override + public AllocateDangledRequest newInstance() { + return new AllocateDangledRequest(); + } + + @Override + public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel) throws Exception { + String[] indexNames = new String[request.indices.length]; + for (int i = 0; i < request.indices.length; i++) { + indexNames[i] = request.indices[i].index(); + } + clusterService.submitStateUpdateTask("allocation dangled indices " + Arrays.toString(indexNames), new ProcessedClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + if (currentState.blocks().disableStatePersistence()) { + return currentState; + } + MetaData.Builder metaData = MetaData.builder() + .metaData(currentState.metaData()); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable()); + + boolean importNeeded = false; + StringBuilder sb = new StringBuilder(); + for (IndexMetaData indexMetaData : request.indices) { + if (currentState.metaData().hasIndex(indexMetaData.index())) { + continue; + } + importNeeded = true; + metaData.put(indexMetaData, false); + blocks.addBlocks(indexMetaData); + routingTableBuilder.add(indexMetaData, false); + sb.append("[").append(indexMetaData.index()).append("/").append(indexMetaData.state()).append("]"); + } + if (!importNeeded) { + return currentState; + } + logger.info("auto importing dangled indices {} from [{}]", sb.toString(), request.fromNode); + + ClusterState updatedState = ClusterState.builder().state(currentState).metaData(metaData).blocks(blocks).routingTable(routingTableBuilder).build(); + + // now, reroute + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); + + return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + } + + @Override + public void clusterStateProcessed(ClusterState clusterState) { + try { + channel.sendResponse(new AllocateDangledResponse(true)); + } catch (IOException e) { + logger.error("failed send response for allocating dangled", e); + } + } + }); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + + static class AllocateDangledRequest implements Streamable { + + DiscoveryNode fromNode; + IndexMetaData[] indices; + + AllocateDangledRequest() { + } + + AllocateDangledRequest(DiscoveryNode fromNode, IndexMetaData[] indices) { + this.fromNode = fromNode; + this.indices = indices; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + fromNode = DiscoveryNode.readNode(in); + indices = new IndexMetaData[in.readVInt()]; + for (int i = 0; i < indices.length; i++) { + indices[i] = IndexMetaData.Builder.readFrom(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + fromNode.writeTo(out); + out.writeVInt(indices.length); + for (IndexMetaData indexMetaData : indices) { + IndexMetaData.Builder.writeTo(indexMetaData, out); + } + } + } + + public static class AllocateDangledResponse implements Streamable { + + private boolean ack; + + AllocateDangledResponse() { + } + + AllocateDangledResponse(boolean ack) { + this.ack = ack; + } + + public boolean ack() { + return ack; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + ack = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(ack); + } + } +} diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index eb7a877f369..2824e62318a 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -19,8 +19,10 @@ package org.elasticsearch.gateway.local.state.meta; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; @@ -46,6 +48,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledFuture; @@ -55,25 +58,65 @@ import java.util.concurrent.ScheduledFuture; */ public class LocalGatewayMetaState extends AbstractComponent implements ClusterStateListener { + static enum AutoImportDangledState { + NO() { + @Override + public boolean shouldImport() { + return false; + } + }, + YES() { + @Override + public boolean shouldImport() { + return true; + } + }, + CLOSED() { + @Override + public boolean shouldImport() { + return true; + } + }; + + public abstract boolean shouldImport(); + + public static AutoImportDangledState fromString(String value) { + if ("no".equalsIgnoreCase(value)) { + return NO; + } else if ("yes".equalsIgnoreCase(value)) { + return YES; + } else if ("closed".equalsIgnoreCase(value)) { + return CLOSED; + } else { + throw new ElasticSearchIllegalArgumentException("failed to parse [" + value + "], not a valid auto dangling import type"); + } + } + } + private final NodeEnvironment nodeEnv; private final ThreadPool threadPool; + private final LocalAllocateDangledIndices allocateDangledIndices; + private volatile MetaData currentMetaData; private final XContentType format; private final ToXContent.Params formatParams; + private final AutoImportDangledState autoImportDangled; private final TimeValue danglingTimeout; private final Map danglingIndices = ConcurrentCollections.newConcurrentMap(); private final Object danglingMutex = new Object(); @Inject - public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception { + public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv, + TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices) throws Exception { super(settings); this.nodeEnv = nodeEnv; this.threadPool = threadPool; this.format = XContentType.fromRestContentType(settings.get("format", "smile")); + this.allocateDangledIndices = allocateDangledIndices; nodesListGatewayMetaState.init(this); if (this.format == XContentType.SMILE) { @@ -84,8 +127,11 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS formatParams = ToXContent.EMPTY_PARAMS; } + this.autoImportDangled = AutoImportDangledState.fromString(settings.get("gateway.local.auto_import_dangled", AutoImportDangledState.YES.toString())); this.danglingTimeout = settings.getAsTime("gateway.local.dangling_timeout", TimeValue.timeValueHours(2)); + logger.debug("using gateway.local.auto_import_dangled [{}], with gateway.local.dangling_timeout [{}]", this.autoImportDangled, this.danglingTimeout); + if (DiscoveryNode.masterNode(settings)) { try { pre019Upgrade(); @@ -113,47 +159,44 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS return; } - if (!event.state().nodes().localNode().masterNode()) { - return; - } + // we don't check if metaData changed, since we might be called several times and we need to check dangling... - if (!event.metaDataChanged()) { - return; - } - - // check if the global state changed? boolean success = true; - if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) { - try { - writeGlobalState("changed", event.state().metaData(), currentMetaData); - } catch (Exception e) { - success = false; + // only applied to master node, writing the global and index level states + if (event.state().nodes().localNode().masterNode()) { + // check if the global state changed? + if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) { + try { + writeGlobalState("changed", event.state().metaData(), currentMetaData); + } catch (Exception e) { + success = false; + } + } + + // check and write changes in indices + for (IndexMetaData indexMetaData : event.state().metaData()) { + String writeReason = null; + IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index()); + if (currentIndexMetaData == null) { + writeReason = "freshly created"; + } else if (currentIndexMetaData.version() != indexMetaData.version()) { + writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]"; + } + + // we update the writeReason only if we really need to write it + if (writeReason == null) { + continue; + } + + try { + writeIndex(writeReason, indexMetaData, currentIndexMetaData); + } catch (Exception e) { + success = false; + } } } - // check and write changes in indices - for (IndexMetaData indexMetaData : event.state().metaData()) { - String writeReason = null; - IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index()); - if (currentIndexMetaData == null) { - writeReason = "freshly created"; - } else if (currentIndexMetaData.version() != indexMetaData.version()) { - writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]"; - } - - // we update the writeReason only if we really need to write it - if (writeReason == null) { - continue; - } - - try { - writeIndex(writeReason, indexMetaData, currentIndexMetaData); - } catch (Exception e) { - success = false; - } - } - - // handle dangling indices + // handle dangling indices, we handle those for all nodes that have a node file (data or master) if (nodeEnv.hasNodeFile()) { if (danglingTimeout.millis() >= 0) { synchronized (danglingMutex) { @@ -179,7 +222,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName))); } else { - logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}]", indexName, danglingTimeout); + logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName)))); } } @@ -188,14 +231,49 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } } } + if (autoImportDangled.shouldImport() && !danglingIndices.isEmpty()) { + final List dangled = Lists.newArrayList(); + for (String indexName : danglingIndices.keySet()) { + IndexMetaData indexMetaData = loadIndex(indexName); + // we might have someone copying over an index, renaming the directory, handle that + if (!indexMetaData.index().equals(indexName)) { + logger.info("dangled index directory name is [{}], state name is [{}], renaming to directory name", indexName, indexMetaData.index()); + indexMetaData = IndexMetaData.newIndexMetaDataBuilder(indexMetaData).index(indexName).build(); + } + if (autoImportDangled == AutoImportDangledState.CLOSED) { + indexMetaData = IndexMetaData.newIndexMetaDataBuilder(indexMetaData).state(IndexMetaData.State.CLOSE).build(); + } + if (indexMetaData != null) { + dangled.add(indexMetaData); + } + } + IndexMetaData[] dangledIndices = dangled.toArray(new IndexMetaData[dangled.size()]); + try { + allocateDangledIndices.allocateDangled(dangledIndices, new LocalAllocateDangledIndices.Listener() { + @Override + public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) { + logger.trace("allocated dangled"); + } + + @Override + public void onFailure(Throwable e) { + logger.info("failed to send allocated dangled", e); + } + }); + } catch (Exception e) { + logger.warn("failed to send allocate dangled", e); + } + } } - // delete indices that are no longer there... - if (currentMetaData != null) { - for (IndexMetaData current : currentMetaData) { - if (event.state().metaData().index(current.index()) == null) { - if (!danglingIndices.containsKey(current.index())) { - deleteIndex(current.index()); + if (event.state().nodes().localNode().masterNode()) { + // delete indices that are no longer there..., allocated dangled ones + if (currentMetaData != null) { + for (IndexMetaData current : currentMetaData) { + if (event.state().metaData().index(current.index()) == null) { + if (!danglingIndices.containsKey(current.index())) { + deleteIndex(current.index()); + } } } } diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java index c1bbd584914..ff7873ba4f3 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java @@ -66,7 +66,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste try { pre019Upgrade(); long start = System.currentTimeMillis(); - loadStartedShards(); + currentState = loadShardsStateInfo(); logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start)); } catch (Exception e) { logger.error("failed to read local state (started shards), exiting...", e); @@ -79,6 +79,10 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste return this.currentState; } + public ShardStateInfo loadShardInfo(ShardId shardId) throws Exception { + return loadShardStateInfo(shardId); + } + @Override public void clusterChanged(ClusterChangedEvent event) { if (event.state().blocks().disableStatePersistence()) { @@ -166,65 +170,65 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste this.currentState = newState; } - private void loadStartedShards() throws Exception { + private Map loadShardsStateInfo() throws Exception { Set shardIds = nodeEnv.findAllShardIds(); long highestVersion = -1; Map shardsState = Maps.newHashMap(); for (ShardId shardId : shardIds) { - long highestShardVersion = -1; - ShardStateInfo highestShardState = null; - for (File shardLocation : nodeEnv.shardLocations(shardId)) { - File shardStateDir = new File(shardLocation, "_state"); - if (!shardStateDir.exists() || !shardStateDir.isDirectory()) { - continue; - } - // now, iterate over the current versions, and find latest one - File[] stateFiles = shardStateDir.listFiles(); - if (stateFiles == null) { - continue; - } - for (File stateFile : stateFiles) { - if (!stateFile.getName().startsWith("state-")) { - continue; - } - try { - long version = Long.parseLong(stateFile.getName().substring("state-".length())); - if (version > highestShardVersion) { - byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); - if (data.length == 0) { - logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); - continue; - } - ShardStateInfo readState = readShardState(data); - if (readState == null) { - logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); - continue; - } - assert readState.version == version; - highestShardState = readState; - highestShardVersion = version; - } - } catch (Exception e) { - logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id()); - } - } - } - // did we find a state file? - if (highestShardState == null) { + ShardStateInfo shardStateInfo = loadShardStateInfo(shardId); + if (shardStateInfo == null) { continue; } - - shardsState.put(shardId, highestShardState); + shardsState.put(shardId, shardStateInfo); // update the global version - if (highestShardVersion > highestVersion) { - highestVersion = highestShardVersion; + if (shardStateInfo.version > highestVersion) { + highestVersion = shardStateInfo.version; } } - // update the current started shards only if there is data there... - if (highestVersion != -1) { - currentState = shardsState; + return shardsState; + } + + private ShardStateInfo loadShardStateInfo(ShardId shardId) { + long highestShardVersion = -1; + ShardStateInfo highestShardState = null; + for (File shardLocation : nodeEnv.shardLocations(shardId)) { + File shardStateDir = new File(shardLocation, "_state"); + if (!shardStateDir.exists() || !shardStateDir.isDirectory()) { + continue; + } + // now, iterate over the current versions, and find latest one + File[] stateFiles = shardStateDir.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + if (!stateFile.getName().startsWith("state-")) { + continue; + } + try { + long version = Long.parseLong(stateFile.getName().substring("state-".length())); + if (version > highestShardVersion) { + byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); + if (data.length == 0) { + logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); + continue; + } + ShardStateInfo readState = readShardState(data); + if (readState == null) { + logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); + continue; + } + assert readState.version == version; + highestShardState = readState; + highestShardVersion = version; + } + } catch (Exception e) { + logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id()); + } + } } + return highestShardState; } @Nullable diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java index 66170c13355..8b2ef1adeed 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java @@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -117,15 +116,15 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat @Override protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException { - Map shardsStateInfo = shardsState.currentStartedShards(); - if (shardsStateInfo != null) { - for (Map.Entry entry : shardsStateInfo.entrySet()) { - if (entry.getKey().equals(request.shardId)) { - return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue().version); - } + try { + ShardStateInfo shardStateInfo = shardsState.loadShardInfo(request.shardId); + if (shardStateInfo != null) { + return new NodeLocalGatewayStartedShards(clusterService.localNode(), shardStateInfo.version); } + return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1); + } catch (Exception e) { + throw new ElasticSearchException("failed to load started shards", e); } - return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1); } @Override diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index e4bd18798a6..3a001fbfb04 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -149,7 +149,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe continue; } // only delete an unallocated shard if all (other shards) are started - if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) { + int startedShardsCount = indexShardRoutingTable.countWithState(ShardRoutingState.STARTED); + if (startedShardsCount > 0 && startedShardsCount == indexShardRoutingTable.size()) { if (logger.isDebugEnabled()) { logger.debug("[{}][{}] deleting unallocated shard", indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id()); } diff --git a/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java b/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java index 8d5dbc6fdb4..3ad7ab7686e 100644 --- a/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java +++ b/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; @@ -325,15 +326,19 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { } @Test - public void testDanglingIndices() throws Exception { + public void testDanglingIndicesAutoImportYes() throws Exception { + Settings settings = settingsBuilder() + .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "yes") + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .build(); logger.info("--> cleaning nodes"); buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); cleanAndCloseNodes(); logger.info("--> starting two nodes"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + startNode("node1", settings); + startNode("node2", settings); logger.info("--> indexing a simple document"); client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); @@ -357,13 +362,139 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { gateway1.reset(); logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + startNode("node1", settings); + startNode("node2", settings); logger.info("--> waiting for green status"); health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.timedOut(), equalTo(false)); + // we need to wait for the allocate dangled to kick in + Thread.sleep(500); + + logger.info("--> verify that the dangling index exists"); + assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(true)); + logger.info("--> waiting for green status"); + health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify the doc is there"); + assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true)); + } + + @Test + public void testDanglingIndicesAutoImportClose() throws Exception { + Settings settings = settingsBuilder() + .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "closed") + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .build(); + logger.info("--> cleaning nodes"); + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); + cleanAndCloseNodes(); + + logger.info("--> starting two nodes"); + startNode("node1", settings); + startNode("node2", settings); + + logger.info("--> indexing a simple document"); + client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + + logger.info("--> waiting for green status"); + ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify 1 doc in the index"); + for (int i = 0; i < 10; i++) { + assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l)); + } + assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true)); + + logger.info("--> shutting down the nodes"); + Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class); + closeNode("node1"); + closeNode("node2"); + + logger.info("--> deleting the data for the first node"); + gateway1.reset(); + + logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); + startNode("node1", settings); + startNode("node2", settings); + + logger.info("--> waiting for green status"); + health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + // we need to wait for the allocate dangled to kick in + Thread.sleep(500); + + logger.info("--> verify that the dangling index exists"); + assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(true)); + logger.info("--> waiting for green status"); + health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify the index state is closed"); + assertThat(client("node1").admin().cluster().prepareState().execute().actionGet().state().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); + logger.info("--> open the index"); + client("node1").admin().indices().prepareOpen("test").execute().actionGet(); + logger.info("--> waiting for green status"); + health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify the doc is there"); + assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true)); + } + + @Test + public void testDanglingIndicesNoAutoImport() throws Exception { + Settings settings = settingsBuilder() + .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no") + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .build(); + logger.info("--> cleaning nodes"); + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); + cleanAndCloseNodes(); + + logger.info("--> starting two nodes"); + startNode("node1", settings); + startNode("node2", settings); + + logger.info("--> indexing a simple document"); + client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + + logger.info("--> waiting for green status"); + ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify 1 doc in the index"); + for (int i = 0; i < 10; i++) { + assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l)); + } + assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true)); + + logger.info("--> shutting down the nodes"); + Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class); + closeNode("node1"); + closeNode("node2"); + + logger.info("--> deleting the data for the first node"); + gateway1.reset(); + + logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); + startNode("node1", settings); + startNode("node2", settings); + + logger.info("--> waiting for green status"); + health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + assertThat(health.timedOut(), equalTo(false)); + + // we need to wait for the allocate dangled to kick in (even though in this case its disabled) + // just to make sure + Thread.sleep(500); + logger.info("--> verify that the dangling index does not exists"); assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(false)); @@ -372,8 +503,8 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { closeNode("node2"); logger.info("--> start the nodes back, but make sure we do recovery only after we have 2 nodes in the cluster"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("gateway.recover_after_nodes", 2).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("gateway.recover_after_nodes", 2).build()); + startNode("node1", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build()); + startNode("node2", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build()); logger.info("--> waiting for green status"); health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); @@ -386,15 +517,20 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { } @Test - public void testDanglingIndicesStillDanglingAndCreatingSameIndex() throws Exception { + public void testDanglingIndicesNoAutoImportStillDanglingAndCreatingSameIndex() throws Exception { + Settings settings = settingsBuilder() + .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no") + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .build(); + logger.info("--> cleaning nodes"); buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); cleanAndCloseNodes(); logger.info("--> starting two nodes"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + startNode("node1", settings); + startNode("node2", settings); logger.info("--> indexing a simple document"); client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); @@ -417,8 +553,8 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { gateway1.reset(); logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + startNode("node1", settings); + startNode("node2", settings); logger.info("--> waiting for green status"); health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();