diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java
index 8266fbce094..d0c2e6aa830 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -94,7 +94,7 @@ public class ClusterState {
     }
 
     public RoutingNodes routingNodes() {
-        return routingTable.routingNodes(metaData);
+        return routingTable.routingNodes(metaData, blocks);
     }
 
     public RoutingNodes getRoutingNodes() {
@@ -117,7 +117,7 @@ public class ClusterState {
         if (routingNodes != null) {
             return routingNodes;
         }
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = routingTable.routingNodes(metaData, blocks);
         return routingNodes;
     }
 
@@ -177,6 +177,11 @@ public class ClusterState {
             return this;
         }
 
+        public Builder version(long version) {
+            this.version = version;
+            return this;
+        }
+
         public Builder state(ClusterState state) {
             this.version = state.version();
             this.nodes = state.nodes();
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
index 3a06cdf12e5..3278f2dd056 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
@@ -91,6 +91,10 @@ public class ClusterBlocks {
         return levelHolders[level.id()].indices();
     }
 
+    public boolean hasIndexBlock(String index, ClusterBlock block) {
+        return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
+    }
+
     public void indexBlockedRaiseException(ClusterBlockLevel level, String index) throws ClusterBlockException {
         if (!indexBlocked(level, index)) {
             return;
@@ -136,22 +140,6 @@ public class ClusterBlocks {
         }
     }
 
-    static class LevelHolder {
-        private final Set<ClusterBlock> global = Sets.newHashSet();
-        private final Map<String, Set<ClusterBlock>> indices = Maps.newHashMap();
-
-        LevelHolder() {
-        }
-
-        public Set<ClusterBlock> global() {
-            return global;
-        }
-
-        public Map<String, Set<ClusterBlock>> indices() {
-            return indices;
-        }
-    }
-
     public static Builder builder() {
         return new Builder();
     }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
index 9f423e8d56b..669e5a4c57e 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
@@ -303,12 +303,17 @@ public class MetaData implements Iterable<IndexMetaData> {
         public static MetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
             Builder builder = new Builder();
 
-            String currentFieldName = null;
-            XContentParser.Token token = parser.nextToken();
-            if (token == null) {
-                // no data...
-                return builder.build();
+            XContentParser.Token token = parser.currentToken();
+            String currentFieldName = parser.currentName();
+            if (!"meta-data".equals(currentFieldName)) {
+                token = parser.nextToken();
+                currentFieldName = parser.currentName();
+                if (token == null) {
+                    // no data...
+                    return builder.build();
+                }
             }
+
             while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                 if (token == XContentParser.Token.FIELD_NAME) {
                     currentFieldName = parser.currentName();
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
index 336c4d7715d..a9fc9b7ff24 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
@@ -24,11 +24,14 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
 import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
+import org.elasticsearch.cluster.block.ClusterBlock;
+import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.collect.Sets;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.compress.CompressedString;
 import org.elasticsearch.common.inject.Inject;
@@ -53,6 +56,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -218,7 +222,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
                         listener.timeout = timeoutTask;
                     }
 
-                    return newClusterStateBuilder().state(currentState).metaData(newMetaData).build();
+                    ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
+                    if (!request.blocks.isEmpty()) {
+                        for (ClusterBlock block : request.blocks) {
+                            blocks.addIndexBlock(request.index, block);
+                        }
+                    }
+
+                    return newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build();
                 } catch (Exception e) {
                     listener.onFailure(e);
                     return currentState;
@@ -314,6 +325,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
 
         TimeValue timeout = TimeValue.timeValueSeconds(5);
 
+        Set<ClusterBlock> blocks = Sets.newHashSet();
+
         public Request(String cause, String index) {
             this.cause = cause;
             this.index = index;
@@ -336,6 +349,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
             return this;
         }
 
+        public Request blocks(Set<ClusterBlock> blocks) {
+            this.blocks.addAll(blocks);
+            return this;
+        }
+
         public Request timeout(TimeValue timeout) {
             this.timeout = timeout;
             return this;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
index 5e742301e30..c2044c24c2c 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
@@ -153,7 +153,10 @@ public class DiscoveryNode implements Streamable, Serializable {
      */
     public boolean dataNode() {
         String data = attributes.get("data");
-        return data == null || data.equals("true");
+        if (data == null) {
+            return !clientNode();
+        }
+        return data.equals("true");
     }
 
     /**
@@ -175,6 +178,24 @@ public class DiscoveryNode implements Streamable, Serializable {
         return clientNode();
     }
 
+    /**
+     * Can this node become master or not.
+     */
+    public boolean masterNode() {
+        String master = attributes.get("master");
+        if (master == null) {
+            return !clientNode();
+        }
+        return master.equals("true");
+    }
+
+    /**
+     * Can this node become master or not.
+     */
+    public boolean isMasterNode() {
+        return masterNode();
+    }
+
     public static DiscoveryNode readNode(StreamInput in) throws IOException {
         DiscoveryNode node = new DiscoveryNode();
         node.readFrom(in);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
index 912b681029a..a1a1fc9dccc 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
@@ -46,13 +46,16 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
 
     private final ImmutableMap<String, DiscoveryNode> dataNodes;
 
+    private final ImmutableMap<String, DiscoveryNode> masterNodes;
+
     private final String masterNodeId;
 
     private final String localNodeId;
 
-    private DiscoveryNodes(ImmutableMap<String, DiscoveryNode> nodes, ImmutableMap<String, DiscoveryNode> dataNodes, String masterNodeId, String localNodeId) {
+    private DiscoveryNodes(ImmutableMap<String, DiscoveryNode> nodes, ImmutableMap<String, DiscoveryNode> dataNodes, ImmutableMap<String, DiscoveryNode> masterNodes, String masterNodeId, String localNodeId) {
         this.nodes = nodes;
         this.dataNodes = dataNodes;
+        this.masterNodes = masterNodes;
         this.masterNodeId = masterNodeId;
         this.localNodeId = localNodeId;
     }
@@ -104,6 +107,14 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
         return dataNodes();
     }
 
+    public ImmutableMap<String, DiscoveryNode> masterNodes() {
+        return this.masterNodes;
+    }
+
+    public ImmutableMap<String, DiscoveryNode> getMasterNodes() {
+        return masterNodes();
+    }
+
     public DiscoveryNode get(String nodeId) {
         return nodes.get(nodeId);
     }
@@ -366,12 +377,16 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
 
         public DiscoveryNodes build() {
             ImmutableMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableMap.builder();
+            ImmutableMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableMap.builder();
             for (Map.Entry<String, DiscoveryNode> nodeEntry : nodes.entrySet()) {
                 if (nodeEntry.getValue().dataNode()) {
                     dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
                 }
+                if (nodeEntry.getValue().masterNode()) {
+                    masterNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
+                }
             }
-            return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodeId, localNodeId);
+            return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId);
         }
 
         public static void writeTo(DiscoveryNodes nodes, StreamOutput out) throws IOException {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
index 1c039a120fd..e7f53e83305 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
@@ -105,6 +105,34 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
         return shards.get(shardId);
     }
 
+    public boolean allPrimaryShardsActive() {
+        return primaryShardsActive() == shards().size();
+    }
+
+    public int primaryShardsActive() {
+        int counter = 0;
+        for (IndexShardRoutingTable shardRoutingTable : this) {
+            if (shardRoutingTable.primaryShard().active()) {
+                counter++;
+            }
+        }
+        return counter;
+    }
+
+    public boolean allPrimaryShardsUnassigned() {
+        return primaryShardsUnassigned() == shards.size();
+    }
+
+    public int primaryShardsUnassigned() {
+        int counter = 0;
+        for (IndexShardRoutingTable shardRoutingTable : this) {
+            if (shardRoutingTable.primaryShard().unassigned()) {
+                counter++;
+            }
+        }
+        return counter;
+    }
+
     public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
         List<ShardRouting> shards = newArrayList();
         for (IndexShardRoutingTable shardRoutingTable : this) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index f43d35cfd71..65ebcd29972 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -19,6 +19,7 @@
 
 package org.elasticsearch.cluster.routing;
 
+import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.util.concurrent.NotThreadSafe;
 
@@ -35,6 +36,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
 
     private final MetaData metaData;
 
+    private final ClusterBlocks blocks;
+
     private final RoutingTable routingTable;
 
     private final Map<String, RoutingNode> nodesToShards = newHashMap();
@@ -43,8 +46,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
 
     private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
 
-    public RoutingNodes(MetaData metaData, RoutingTable routingTable) {
+    public RoutingNodes(MetaData metaData, ClusterBlocks blocks, RoutingTable routingTable) {
         this.metaData = metaData;
+        this.blocks = blocks;
         this.routingTable = routingTable;
         Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
         for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
@@ -100,6 +104,14 @@ public class RoutingNodes implements Iterable<RoutingNode> {
         return metaData();
     }
 
+    public ClusterBlocks blocks() {
+        return this.blocks;
+    }
+
+    public ClusterBlocks getBlocks() {
+        return this.blocks;
+    }
+
     public int requiredAverageNumberOfShardsPerNode() {
         return metaData.totalNumberOfShards() / nodesToShards.size();
     }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
index bf4cdaf090d..bc6d0b23d54 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
@@ -19,6 +19,8 @@
 
 package org.elasticsearch.cluster.routing;
 
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.collect.ImmutableMap;
 import org.elasticsearch.common.collect.Iterables;
@@ -72,8 +74,12 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
         return indicesRouting();
     }
 
-    public RoutingNodes routingNodes(MetaData metaData) {
-        return new RoutingNodes(metaData, this);
+    public RoutingNodes routingNodes(ClusterState state) {
+        return routingNodes(state.metaData(), state.blocks());
+    }
+
+    public RoutingNodes routingNodes(MetaData metaData, ClusterBlocks blocks) {
+        return new RoutingNodes(metaData, blocks, this);
     }
 
     public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index 1957b17b648..8070d096dcf 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -135,16 +135,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
 
     @Override protected void doStart() throws ElasticSearchException {
         Map<String, String> nodeAttributes = buildCommonNodesAttributes(settings);
-        Boolean zenMaster = componentSettings.getAsBoolean("master", null);
-        if (zenMaster != null) {
-            if (zenMaster.equals(Boolean.FALSE)) {
-                nodeAttributes.put("zen.master", "false");
-            }
-        } else if (nodeAttributes.containsKey("client")) {
-            if (nodeAttributes.get("client").equals("true")) {
-                nodeAttributes.put("zen.master", "false");
-            }
-        }
         // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
         String nodeId = UUID.randomUUID().toString();
         localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java
index cc98ae8d9d5..4e77ce90032 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java
@@ -79,10 +79,8 @@ public class ElectMasterService extends AbstractComponent {
         // clean non master nodes
         for (Iterator<DiscoveryNode> it = possibleNodes.iterator(); it.hasNext();) {
             DiscoveryNode node = it.next();
-            if (node.attributes().containsKey("zen.master")) {
-                if (node.attributes().get("zen.master").equals("false")) {
-                    it.remove();
-                }
+            if (!node.masterNode()) {
+                it.remove();
             }
         }
         Collections.sort(possibleNodes, nodeComparator);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java
index 427433cfdc8..865997b815e 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -49,7 +49,8 @@ public class NodeEnvironment extends AbstractComponent {
     @Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
         super(settings);
 
-        if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false)) {
+        if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false) ||
+                !settings.getAsBoolean("node.master", true)) {
             nodeFile = null;
             lock = null;
             return;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java
new file mode 100644
index 00000000000..6929517ea89
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gateway.local;
+
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.cluster.*;
+import org.elasticsearch.cluster.block.ClusterBlock;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.MutableShardRouting;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.common.collect.ImmutableSet;
+import org.elasticsearch.common.collect.Sets;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.io.FileSystemUtils;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.gateway.Gateway;
+import org.elasticsearch.gateway.GatewayException;
+import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.*;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.elasticsearch.cluster.ClusterState.*;
+import static org.elasticsearch.cluster.metadata.MetaData.*;
+import static org.elasticsearch.common.unit.TimeValue.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
+
+    public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered (not enough nodes with shards allocated found)", ClusterBlockLevel.ALL);
+
+    private File location;
+
+    private final ClusterName clusterName;
+
+    private final ClusterService clusterService;
+
+    private final NodeEnvironment nodeEnv;
+
+    private final MetaDataCreateIndexService createIndexService;
+
+    private final TransportNodesListGatewayState listGatewayState;
+
+    private volatile LocalGatewayState currentState;
+
+    @Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
+                                NodeEnvironment nodeEnv, ClusterName clusterName, ThreadPool threadPool, TransportNodesListGatewayState listGatewayState) {
+        super(settings);
+        this.clusterName = clusterName;
+        this.clusterService = clusterService;
+        this.createIndexService = createIndexService;
+        this.nodeEnv = nodeEnv;
+        this.listGatewayState = listGatewayState.initGateway(this);
+    }
+
+    @Override public String type() {
+        return "local";
+    }
+
+    public LocalGatewayState currentState() {
+        return this.currentState;
+    }
+
+    @Override protected void doStart() throws ElasticSearchException {
+        // if this is not a possible master node or data node, bail, we won't save anything here...
+        if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) {
+            location = null;
+            return;
+        }
+        // create the location where the state will be stored
+        this.location = new File(nodeEnv.nodeFile(), "_state");
+        this.location.mkdirs();
+
+        try {
+            long version = findLatestStateVersion();
+            if (version != -1) {
+                this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version))));
+            }
+        } catch (Exception e) {
+            logger.warn("failed to read local state", e);
+        }
+
+        clusterService.add(this);
+    }
+
+    @Override protected void doStop() throws ElasticSearchException {
+        clusterService.remove(this);
+    }
+
+    @Override protected void doClose() throws ElasticSearchException {
+    }
+
+    @Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
+        Set<String> nodesIds = Sets.newHashSet();
+        nodesIds.addAll(clusterService.state().nodes().dataNodes().keySet());
+        nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet());
+        TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet();
+
+        TransportNodesListGatewayState.NodeLocalGatewayState electedState = null;
+        for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) {
+            if (nodeState.state() == null) {
+                continue;
+            }
+            if (electedState == null) {
+                electedState = nodeState;
+            } else if (nodeState.state().version() > electedState.state().version()) {
+                electedState = nodeState;
+            }
+        }
+        if (electedState == null) {
+            logger.debug("no state elected");
+            listener.onSuccess();
+            return;
+        }
+        logger.debug("elected state from [{}]", electedState.node());
+        final LocalGatewayState state = electedState.state();
+        final AtomicInteger indicesCounter = new AtomicInteger(state.metaData().indices().size());
+        clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
+            @Override public ClusterState execute(ClusterState currentState) {
+                MetaData.Builder metaDataBuilder = newMetaDataBuilder()
+                        .metaData(currentState.metaData());
+                // mark the metadata as read from gateway
+                metaDataBuilder.markAsRecoveredFromGateway();
+
+                return newClusterStateBuilder().state(currentState)
+                        .version(state.version())
+                        .metaData(metaDataBuilder).build();
+            }
+
+            @Override public void clusterStateProcessed(ClusterState clusterState) {
+                // go over the meta data and create indices, we don't really need to copy over
+                // the meta data per index, since we create the index and it will be added automatically
+                for (final IndexMetaData indexMetaData : state.metaData()) {
+                    try {
+                        createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index())
+                                .settings(indexMetaData.settings())
+                                .mappingsCompressed(indexMetaData.mappings())
+                                .blocks(ImmutableSet.of(INDEX_NOT_RECOVERED_BLOCK))
+                                .timeout(timeValueSeconds(30)),
+
+                                new MetaDataCreateIndexService.Listener() {
+                                    @Override public void onResponse(MetaDataCreateIndexService.Response response) {
+                                        if (indicesCounter.decrementAndGet() == 0) {
+                                            listener.onSuccess();
+                                        }
+                                    }
+
+                                    @Override public void onFailure(Throwable t) {
+                                        logger.error("failed to create index [{}]", indexMetaData.index(), t);
+                                    }
+                                });
+                    } catch (IOException e) {
+                        logger.error("failed to create index [{}]", indexMetaData.index(), e);
+                    }
+                }
+            }
+        });
+    }
+
+    @Override public Class<? extends Module> suggestIndexGateway() {
+        return NoneIndexGatewayModule.class;
+    }
+
+    @Override public void reset() throws Exception {
+    }
+
+    @Override public void clusterChanged(final ClusterChangedEvent event) {
+        // nothing to do until we actually recover from hte gateway
+        if (!event.state().metaData().recoveredFromGateway()) {
+            return;
+        }
+
+        // go over the indices, if they are blocked, and all are allocated, update the cluster state that it is no longer blocked
+        for (Map.Entry<String, ImmutableSet<ClusterBlock>> entry : event.state().blocks().indices().entrySet()) {
+            final String index = entry.getKey();
+            ImmutableSet<ClusterBlock> indexBlocks = entry.getValue();
+            if (indexBlocks.contains(INDEX_NOT_RECOVERED_BLOCK)) {
+                IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index);
+                if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) {
+                    clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() {
+                        @Override public ClusterState execute(ClusterState currentState) {
+                            ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
+                            blocks.removeIndexBlock(index, INDEX_NOT_RECOVERED_BLOCK);
+                            return ClusterState.builder().state(currentState).blocks(blocks).build();
+                        }
+                    });
+                }
+            }
+        }
+
+        if (!event.routingTableChanged() && !event.metaDataChanged()) {
+            return;
+        }
+
+        // builder the current state
+        LocalGatewayState.Builder builder = LocalGatewayState.builder();
+        if (currentState != null) {
+            builder.state(currentState);
+        }
+        builder.version(event.state().version());
+        builder.metaData(event.state().metaData());
+        // remove from the current state all the shards that are primary and started, we won't need them anymore
+        for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
+            for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
+                if (indexShardRoutingTable.primaryShard().active()) {
+                    builder.remove(indexShardRoutingTable.shardId());
+                }
+            }
+        }
+        // now, add all the ones that are active and on this node
+        RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
+        if (routingNode != null) {
+            // out node is not in play yet...
+            for (MutableShardRouting shardRouting : routingNode) {
+                if (shardRouting.active()) {
+                    builder.put(shardRouting.shardId(), event.state().version());
+                }
+            }
+        }
+
+        try {
+            LocalGatewayState stateToWrite = builder.build();
+            BinaryXContentBuilder xContentBuilder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
+            xContentBuilder.prettyPrint();
+            xContentBuilder.startObject();
+            LocalGatewayState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
+            xContentBuilder.endObject();
+
+            File stateFile = new File(location, "state-" + event.state().version());
+            FileOutputStream fos = new FileOutputStream(stateFile);
+            fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
+            fos.close();
+
+            FileSystemUtils.syncFile(stateFile);
+
+            currentState = stateToWrite;
+        } catch (IOException e) {
+            logger.warn("failed to write updated state", e);
+        }
+
+        // delete all the other files
+        File[] files = location.listFiles(new FilenameFilter() {
+            @Override public boolean accept(File dir, String name) {
+                return !name.equals("state-" + event.state().version());
+            }
+        });
+        for (File file : files) {
+            file.delete();
+        }
+    }
+
+    private long findLatestStateVersion() throws IOException {
+        long index = -1;
+        for (File stateFile : location.listFiles()) {
+            if (logger.isTraceEnabled()) {
+                logger.trace("[findLatestState]: Processing [" + stateFile.getName() + "]");
+            }
+            String name = stateFile.getName();
+            if (!name.startsWith("state-")) {
+                continue;
+            }
+            long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
+            if (fileIndex >= index) {
+                // try and read the meta data
+                try {
+                    readState(Streams.copyToByteArray(new FileInputStream(stateFile)));
+                    index = fileIndex;
+                } catch (IOException e) {
+                    logger.warn("[findLatestState]: Failed to read state from [" + name + "], ignoring...", e);
+                }
+            }
+        }
+
+        return index;
+    }
+
+    private LocalGatewayState readState(byte[] data) throws IOException {
+        XContentParser parser = null;
+        try {
+            parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
+            return LocalGatewayState.Builder.fromXContent(parser, settings);
+        } finally {
+            if (parser != null) {
+                parser.close();
+            }
+        }
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java
new file mode 100644
index 00000000000..9bfa6a91f1a
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gateway.local;
+
+import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.inject.PreProcessModule;
+import org.elasticsearch.gateway.Gateway;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LocalGatewayModule extends AbstractModule implements PreProcessModule {
+
+    @Override protected void configure() {
+        bind(Gateway.class).to(LocalGateway.class).asEagerSingleton();
+        bind(TransportNodesListGatewayState.class).asEagerSingleton();
+    }
+
+    @Override public void processModule(Module module) {
+        if (module instanceof ShardAllocationModule) {
+            ((ShardAllocationModule) module).addNodeAllocation(LocalGatewayNodeAllocation.class);
+        }
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java
new file mode 100644
index 00000000000..01eb0dbc49e
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gateway.local;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.MutableShardRouting;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
+import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.collect.Sets;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LocalGatewayNodeAllocation extends NodeAllocation {
+
+    private final TransportNodesListGatewayState listGatewayState;
+
+    @Inject public LocalGatewayNodeAllocation(Settings settings, TransportNodesListGatewayState listGatewayState) {
+        super(settings);
+        this.listGatewayState = listGatewayState;
+    }
+
+    @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
+        boolean changed = false;
+
+        for (IndexRoutingTable indexRoutingTable : routingNodes.routingTable()) {
+            // only do the allocation if there is a local "INDEX NOT RECOVERED" block
+            if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
+                continue;
+            }
+
+            if (indexRoutingTable.allPrimaryShardsUnassigned()) {
+                // all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign
+                Set<String> nodesIds = Sets.newHashSet();
+                nodesIds.addAll(nodes.dataNodes().keySet());
+                nodesIds.addAll(nodes.masterNodes().keySet());
+                TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet();
+
+                // make a list of ShardId to Node, each one from the latest version
+                Map<ShardId, Tuple<DiscoveryNode, Long>> shards = Maps.newHashMap();
+                for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) {
+                    for (Map.Entry<ShardId, Long> entry : nodeState.state().shards().entrySet()) {
+                        if (entry.getKey().index().name().equals(indexRoutingTable.index())) {
+                            Tuple<DiscoveryNode, Long> t = shards.get(entry.getKey());
+                            if (t == null || entry.getValue() > t.v2().longValue()) {
+                                t = new Tuple<DiscoveryNode, Long>(nodeState.node(), entry.getValue());
+                                shards.put(entry.getKey(), t);
+                            }
+                        }
+                    }
+                }
+
+                // check if we managed to allocate to all of them, if not, move all relevant shards to ignored
+                if (shards.size() < indexRoutingTable.shards().size()) {
+                    for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
+                        MutableShardRouting shardRouting = it.next();
+                        if (shardRouting.index().equals(indexRoutingTable.index())) {
+                            it.remove();
+                            routingNodes.ignoredUnassigned().add(shardRouting);
+                        }
+                    }
+                } else {
+                    changed = true;
+                    // we found all nodes to allocate to, do the allocation
+                    for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
+                        MutableShardRouting shardRouting = it.next();
+                        if (shardRouting.primary()) {
+                            DiscoveryNode node = shards.get(shardRouting.shardId()).v1();
+                            RoutingNode routingNode = routingNodes.node(node.id());
+                            routingNode.add(shardRouting);
+                            it.remove();
+                        }
+                    }
+                }
+            }
+        }
+
+        return changed;
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java
new file mode 100644
index 00000000000..b9b44d3fe93
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gateway.local;
+
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.collect.ImmutableMap;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.builder.XContentBuilder;
+import org.elasticsearch.index.shard.ShardId;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LocalGatewayState {
+
+    public static class StartedShard {
+        private final long version;
+        private final ShardId shardId;
+
+        public StartedShard(long version, ShardId shardId) {
+            this.version = version;
+            this.shardId = shardId;
+        }
+
+        public long version() {
+            return version;
+        }
+
+        public ShardId shardId() {
+            return shardId;
+        }
+    }
+
+    private final long version;
+
+    private final MetaData metaData;
+
+    private final ImmutableMap<ShardId, Long> shards;
+
+    public LocalGatewayState(long version, MetaData metaData, Map<ShardId, Long> shards) {
+        this.version = version;
+        this.metaData = metaData;
+        this.shards = ImmutableMap.copyOf(shards);
+    }
+
+    public long version() {
+        return version;
+    }
+
+    public MetaData metaData() {
+        return metaData;
+    }
+
+    public ImmutableMap<ShardId, Long> shards() {
+        return this.shards;
+    }
+
+    public Long startedShardVersion(ShardId shardId) {
+        return shards.get(shardId);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private long version;
+
+        private MetaData metaData;
+
+        private Map<ShardId, Long> shards = Maps.newHashMap();
+
+        public Builder state(LocalGatewayState state) {
+            this.version = state.version();
+            this.metaData = state.metaData();
+            this.shards.putAll(state.shards);
+            return this;
+        }
+
+        public Builder version(long version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder metaData(MetaData metaData) {
+            this.metaData = metaData;
+            return this;
+        }
+
+        public Builder remove(ShardId shardId) {
+            this.shards.remove(shardId);
+            return this;
+        }
+
+        public Builder put(ShardId shardId, long version) {
+            this.shards.put(shardId, version);
+            return this;
+        }
+
+        public LocalGatewayState build() {
+            return new LocalGatewayState(version, metaData, shards);
+        }
+
+        public static void toXContent(LocalGatewayState state, XContentBuilder builder, ToXContent.Params params) throws IOException {
+            builder.startObject("state");
+
+            builder.field("version", state.version());
+            MetaData.Builder.toXContent(state.metaData(), builder, params);
+
+            builder.startArray("shards");
+            for (Map.Entry<ShardId, Long> entry : state.shards.entrySet()) {
+                builder.startObject();
+                builder.field("index", entry.getKey().index().name());
+                builder.field("id", entry.getKey().id());
+                builder.field("version", entry.getValue());
+                builder.endObject();
+            }
+            builder.endArray();
+
+            builder.endObject();
+        }
+
+        public static LocalGatewayState fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
+            Builder builder = new Builder();
+
+            String currentFieldName = null;
+            XContentParser.Token token = parser.nextToken();
+            if (token == null) {
+                // no data...
+                return builder.build();
+            }
+            while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+                if (token == XContentParser.Token.FIELD_NAME) {
+                    currentFieldName = parser.currentName();
+                } else if (token == XContentParser.Token.START_OBJECT) {
+                    if ("meta-data".equals(currentFieldName)) {
+                        builder.metaData = MetaData.Builder.fromXContent(parser, globalSettings);
+                    }
+                } else if (token == XContentParser.Token.START_ARRAY) {
+                    if ("shards".equals(currentFieldName)) {
+                        while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                            if (token == XContentParser.Token.START_OBJECT) {
+                                String shardIndex = null;
+                                int shardId = -1;
+                                long version = -1;
+                                while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+                                    if (token == XContentParser.Token.FIELD_NAME) {
+                                        currentFieldName = parser.currentName();
+                                    } else if (token.isValue()) {
+                                        if ("index".equals(currentFieldName)) {
+                                            shardIndex = parser.text();
+                                        } else if ("id".equals(currentFieldName)) {
+                                            shardId = parser.intValue();
+                                        } else if ("version".equals(currentFieldName)) {
+                                            version = parser.longValue();
+                                        }
+                                    }
+                                }
+                                builder.shards.put(new ShardId(shardIndex, shardId), version);
+                            }
+                        }
+                    }
+                } else if (token.isValue()) {
+                    if ("version".equals(currentFieldName)) {
+                        builder.version = parser.longValue();
+                    }
+                }
+            }
+
+            return builder.build();
+        }
+
+        public static LocalGatewayState readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException {
+            LocalGatewayState.Builder builder = new Builder();
+            builder.version = in.readLong();
+            builder.metaData = MetaData.Builder.readFrom(in, globalSettings);
+            int size = in.readVInt();
+            for (int i = 0; i < size; i++) {
+                builder.shards.put(ShardId.readShardId(in), in.readLong());
+            }
+            return builder.build();
+        }
+
+        public static void writeTo(LocalGatewayState state, StreamOutput out) throws IOException {
+            out.writeLong(state.version());
+            MetaData.Builder.writeTo(state.metaData(), out);
+
+            out.writeVInt(state.shards.size());
+            for (Map.Entry<ShardId, Long> entry : state.shards.entrySet()) {
+                entry.getKey().writeTo(out);
+                out.writeLong(entry.getValue());
+            }
+        }
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java
new file mode 100644
index 00000000000..a7eb41f88bd
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gateway.local;
+
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.nodes.*;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.collect.Lists;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class TransportNodesListGatewayState extends TransportNodesOperationAction<TransportNodesListGatewayState.Request, TransportNodesListGatewayState.NodesLocalGatewayState, TransportNodesListGatewayState.NodeRequest, TransportNodesListGatewayState.NodeLocalGatewayState> {
+
+    private LocalGateway gateway;
+
+    @Inject public TransportNodesListGatewayState(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
+        super(settings, clusterName, threadPool, clusterService, transportService);
+    }
+
+    TransportNodesListGatewayState initGateway(LocalGateway gateway) {
+        this.gateway = gateway;
+        return this;
+    }
+
+    public ActionFuture<NodesLocalGatewayState> list(Set<String> nodesIds, @Nullable TimeValue timeout) {
+        return execute(new Request(nodesIds).timeout(timeout));
+    }
+
+    @Override protected String transportAction() {
+        return "/gateway/local/state";
+    }
+
+    @Override protected String transportNodeAction() {
+        return "/gateway/local/state/node";
+    }
+
+    @Override protected Request newRequest() {
+        return new Request();
+    }
+
+    @Override protected NodeRequest newNodeRequest() {
+        return new NodeRequest();
+    }
+
+    @Override protected NodeRequest newNodeRequest(String nodeId, Request request) {
+        return new NodeRequest(nodeId);
+    }
+
+    @Override protected NodeLocalGatewayState newNodeResponse() {
+        return new NodeLocalGatewayState();
+    }
+
+    @Override protected NodesLocalGatewayState newResponse(Request request, AtomicReferenceArray responses) {
+        final List<NodeLocalGatewayState> nodesList = Lists.newArrayList();
+        final List<FailedNodeException> failures = Lists.newArrayList();
+        for (int i = 0; i < responses.length(); i++) {
+            Object resp = responses.get(i);
+            if (resp instanceof NodeLocalGatewayState) { // will also filter out null response for unallocated ones
+                nodesList.add((NodeLocalGatewayState) resp);
+            } else if (resp instanceof FailedNodeException) {
+                failures.add((FailedNodeException) resp);
+            }
+        }
+        return new NodesLocalGatewayState(clusterName, nodesList.toArray(new NodeLocalGatewayState[nodesList.size()]),
+                failures.toArray(new FailedNodeException[failures.size()]));
+    }
+
+    @Override protected NodeLocalGatewayState nodeOperation(NodeRequest request) throws ElasticSearchException {
+        return new NodeLocalGatewayState(clusterService.state().nodes().localNode(), gateway.currentState());
+    }
+
+    @Override protected boolean accumulateExceptions() {
+        return true;
+    }
+
+    static class Request extends NodesOperationRequest {
+
+        public Request() {
+        }
+
+        public Request(Set<String> nodesIds) {
+            super(nodesIds.toArray(new String[nodesIds.size()]));
+        }
+
+        @Override public Request timeout(TimeValue timeout) {
+            super.timeout(timeout);
+            return this;
+        }
+
+        @Override public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+        }
+
+        @Override public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+        }
+    }
+
+    public static class NodesLocalGatewayState extends NodesOperationResponse<NodeLocalGatewayState> {
+
+        private FailedNodeException[] failures;
+
+        NodesLocalGatewayState() {
+        }
+
+        public NodesLocalGatewayState(ClusterName clusterName, NodeLocalGatewayState[] nodes, FailedNodeException[] failures) {
+            super(clusterName, nodes);
+            this.failures = failures;
+        }
+
+        public FailedNodeException[] failures() {
+            return failures;
+        }
+
+        @Override public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            nodes = new NodeLocalGatewayState[in.readVInt()];
+            for (int i = 0; i < nodes.length; i++) {
+                nodes[i] = new NodeLocalGatewayState();
+                nodes[i].readFrom(in);
+            }
+        }
+
+        @Override public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeVInt(nodes.length);
+            for (NodeLocalGatewayState response : nodes) {
+                response.writeTo(out);
+            }
+        }
+    }
+
+
+    static class NodeRequest extends NodeOperationRequest {
+
+        NodeRequest() {
+        }
+
+        NodeRequest(String nodeId) {
+            super(nodeId);
+        }
+
+        @Override public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+        }
+
+        @Override public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+        }
+    }
+
+    public static class NodeLocalGatewayState extends NodeOperationResponse {
+
+        private LocalGatewayState state;
+
+        NodeLocalGatewayState() {
+        }
+
+        public NodeLocalGatewayState(DiscoveryNode node, LocalGatewayState state) {
+            super(node);
+            this.state = state;
+        }
+
+        public LocalGatewayState state() {
+            return state;
+        }
+
+        @Override public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            if (in.readBoolean()) {
+                state = LocalGatewayState.Builder.readFrom(in, null);
+            }
+        }
+
+        @Override public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            if (state == null) {
+                out.writeBoolean(false);
+            } else {
+                out.writeBoolean(true);
+                LocalGatewayState.Builder.writeTo(state, out);
+            }
+        }
+    }
+}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java
index 8e51df2e37a..657da2adcc5 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java
@@ -69,17 +69,17 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start the primary shards");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start the replica shards");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(2));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
index 622d793a2a8..ee32fc0c2fe 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
@@ -72,7 +72,7 @@ public class FailedShardsRoutingTests {
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start the shards (primaries)");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -90,7 +90,7 @@ public class FailedShardsRoutingTests {
         }
 
         logger.info("Start the shards (backups)");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -112,7 +112,7 @@ public class FailedShardsRoutingTests {
         prevRoutingTable = routingTable;
         routingTable = strategy.reroute(clusterState);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(3));
@@ -123,11 +123,11 @@ public class FailedShardsRoutingTests {
         assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2));
 
         logger.info("Fail the shards on node 3");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(3));
@@ -139,7 +139,7 @@ public class FailedShardsRoutingTests {
         prevRoutingTable = routingTable;
         routingTable = strategy.reroute(clusterState);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(3));
@@ -150,11 +150,11 @@ public class FailedShardsRoutingTests {
         assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2));
 
         logger.info("Start the shards on node 3");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(3));
@@ -227,7 +227,7 @@ public class FailedShardsRoutingTests {
         }
 
         logger.info("Start the primary shards");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -250,12 +250,12 @@ public class FailedShardsRoutingTests {
         assertThat(prevRoutingTable == routingTable, equalTo(true));
 
         logger.info("Fail backup shards on node2");
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         List<MutableShardRouting> failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING);
         routingTable = strategy.applyFailedShards(clusterState, failedShards);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(10));
@@ -272,7 +272,7 @@ public class FailedShardsRoutingTests {
         // fail them again...
         routingTable = strategy.applyFailedShards(clusterState, failedShards);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(10));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
index 022e5fc66d4..1a6f4d40823 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
@@ -68,13 +68,13 @@ public class PrimaryElectionRoutingTests {
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start the primary shard (on node1)");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start the backup shard (on node2)");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -84,7 +84,7 @@ public class PrimaryElectionRoutingTests {
         prevRoutingTable = routingTable;
         routingTable = strategy.reroute(clusterState);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(1));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java
index 76698939cb7..01b31ccd45a 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java
@@ -68,7 +68,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests {
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start the primary shard (on node1)");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java
index d1b26ca48b6..5e0e7298eaa 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java
@@ -84,11 +84,11 @@ public class RebalanceAfterActiveTests {
         }
 
         logger.info("start all the primary shards, replicas will start initializing");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
             assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
@@ -103,7 +103,7 @@ public class RebalanceAfterActiveTests {
         prevRoutingTable = routingTable;
         routingTable = strategy.reroute(clusterState);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
             assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
@@ -112,33 +112,33 @@ public class RebalanceAfterActiveTests {
         }
 
         logger.info("start the replica shards, rebalancing should start");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         // we only allow one relocation at a time
         assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
         assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
 
         logger.info("complete relocation, other half of relocation should happen");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         // we now only relocate 3, since 2 remain where they are!
         assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
         assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
 
         logger.info("complete relocation, thats it!");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
         // make sure we have an even relocation
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java
index 968d1d79640..06b8fc35373 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java
@@ -87,7 +87,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
         assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue());
 
         logger.info("Start all the primary shards");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java
index 1239f034dcc..66483e4baf4 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java
@@ -92,9 +92,10 @@ public class SingleShardNoReplicasRoutingTests {
         clusterState = newClusterStateBuilder().state(clusterState).build();
         routingTable = strategy.reroute(clusterState);
         assertThat(routingTable == prevRoutingTable, equalTo(true));
+        clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Marking the shard as started");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -141,7 +142,7 @@ public class SingleShardNoReplicasRoutingTests {
         assertThat(routingTable == prevRoutingTable, equalTo(true));
 
         logger.info("Start the shard on node 2");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -190,7 +191,7 @@ public class SingleShardNoReplicasRoutingTests {
         assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1"));
 
         logger.info("Marking the shard as failed");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -255,7 +256,7 @@ public class SingleShardNoReplicasRoutingTests {
             int nodeIndex = Integer.parseInt(nodeId.substring("node".length()));
             assertThat(nodeIndex, lessThan(25));
         }
-        RoutingNodes routingNodes = routingTable.routingNodes(metaData);
+        RoutingNodes routingNodes = clusterState.routingNodes();
         Set<String> encounteredIndices = newHashSet();
         for (RoutingNode routingNode : routingNodes) {
             assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(0));
@@ -348,7 +349,7 @@ public class SingleShardNoReplicasRoutingTests {
             assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
             assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(INITIALIZING));
         }
-        RoutingNodes routingNodes = routingTable.routingNodes(metaData);
+        RoutingNodes routingNodes = clusterState.routingNodes();
         assertThat(routingNodes.numberOfShardsOfType(INITIALIZING), equalTo(numberOfIndices));
         assertThat(routingNodes.node("node1").numberOfShardsWithState(INITIALIZING), anyOf(equalTo(3), equalTo(4)));
         assertThat(routingNodes.node("node2").numberOfShardsWithState(INITIALIZING), anyOf(equalTo(3), equalTo(4)));
@@ -365,7 +366,7 @@ public class SingleShardNoReplicasRoutingTests {
 
         assertThat(prevRoutingTable == routingTable, equalTo(true));
 
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -377,7 +378,7 @@ public class SingleShardNoReplicasRoutingTests {
             assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
             assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), anyOf(equalTo(RELOCATING), equalTo(STARTED)));
         }
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
         assertThat("4 source shard routing are relocating", routingNodes.numberOfShardsOfType(RELOCATING), equalTo(4));
         assertThat("4 target shard routing are initializing", routingNodes.numberOfShardsOfType(INITIALIZING), equalTo(4));
 
@@ -394,7 +395,7 @@ public class SingleShardNoReplicasRoutingTests {
             assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
             assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), anyOf(equalTo(RELOCATING), equalTo(STARTED)));
         }
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
         assertThat(routingNodes.numberOfShardsOfType(STARTED), equalTo(numberOfIndices));
         for (RoutingNode routingNode : routingNodes) {
             assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(2));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java
index 2279a316d49..8bb796586e1 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java
@@ -95,7 +95,7 @@ public class SingleShardOneReplicaRoutingTests {
         assertThat(prevRoutingTable == routingTable, equalTo(true));
 
         logger.info("Start the primary shard (on node1)");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -118,7 +118,7 @@ public class SingleShardOneReplicaRoutingTests {
         assertThat(prevRoutingTable == routingTable, equalTo(true));
 
         logger.info("Start the backup shard");
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java
index 410f67876bb..faa5192c4d9 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java
@@ -99,7 +99,7 @@ public class TenShardsOneReplicaRoutingTests {
         assertThat(prevRoutingTable == routingTable, equalTo(true));
 
         logger.info("Start the primary shard (on node1)");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -123,11 +123,11 @@ public class TenShardsOneReplicaRoutingTests {
         assertThat(prevRoutingTable == routingTable, equalTo(true));
 
         logger.info("Start the backup shard");
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(10));
@@ -148,7 +148,7 @@ public class TenShardsOneReplicaRoutingTests {
         prevRoutingTable = routingTable;
         routingTable = strategy.reroute(clusterState);
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(10));
@@ -159,11 +159,11 @@ public class TenShardsOneReplicaRoutingTests {
         assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(6));
 
         logger.info("Start the shards on node 3");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
-        routingNodes = routingTable.routingNodes(metaData);
+        routingNodes = clusterState.routingNodes();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(10));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java
index b9fb86462e3..34fde7b2bde 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java
@@ -60,13 +60,13 @@ public class UpdateNumberOfReplicasTests {
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start all the primary shards");
-        RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+        RoutingNodes routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
 
         logger.info("Start all the replica shards");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -82,7 +82,7 @@ public class UpdateNumberOfReplicasTests {
         assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
 
         logger.info("add another replica");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(2).build();
         metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(2).build();
@@ -117,7 +117,7 @@ public class UpdateNumberOfReplicasTests {
         assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(INITIALIZING));
         assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
 
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
         clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@@ -134,7 +134,7 @@ public class UpdateNumberOfReplicasTests {
         assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
 
         logger.info("now remove a replica");
-        routingNodes = routingTable.routingNodes(clusterState.metaData());
+        routingNodes = clusterState.routingNodes();
         prevRoutingTable = routingTable;
         routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(1).build();
         metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(1).build();