improve gateway recovery and applying indices, also improve speed of index creation
This commit is contained in:
parent
287dc862e0
commit
c2ee6dd120
|
@ -79,7 +79,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
|
||||||
final AtomicReference<CreateIndexResponse> responseRef = new AtomicReference<CreateIndexResponse>();
|
final AtomicReference<CreateIndexResponse> responseRef = new AtomicReference<CreateIndexResponse>();
|
||||||
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
|
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.API, cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() {
|
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() {
|
||||||
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||||
responseRef.set(new CreateIndexResponse(response.acknowledged()));
|
responseRef.set(new CreateIndexResponse(response.acknowledged()));
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.Preconditions;
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.collect.ImmutableSet;
|
import org.elasticsearch.common.collect.ImmutableSet;
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
import org.elasticsearch.common.collect.MapBuilder;
|
||||||
import org.elasticsearch.common.compress.CompressedString;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
@ -243,13 +242,19 @@ public class IndexMetaData {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder putMapping(MappingMetaData mappingMd) {
|
public Builder putMapping(String type, String source) throws IOException {
|
||||||
mappings.put(mappingMd.type(), mappingMd);
|
XContentParser parser = XContentFactory.xContent(source).createParser(source);
|
||||||
|
try {
|
||||||
|
putMapping(new MappingMetaData(type, parser.map()));
|
||||||
|
} finally {
|
||||||
|
parser.close();
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder putMapping(String mappingType, String mappingSource) throws IOException {
|
public Builder putMapping(MappingMetaData mappingMd) {
|
||||||
return putMapping(new MappingMetaData(mappingType, new CompressedString(mappingSource)));
|
mappings.put(mappingMd.type(), mappingMd);
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder state(State state) {
|
public Builder state(State state) {
|
||||||
|
@ -308,13 +313,7 @@ public class IndexMetaData {
|
||||||
Map<String, Object> mapping = parser.map();
|
Map<String, Object> mapping = parser.map();
|
||||||
if (mapping.size() == 1) {
|
if (mapping.size() == 1) {
|
||||||
String mappingType = mapping.keySet().iterator().next();
|
String mappingType = mapping.keySet().iterator().next();
|
||||||
String mappingSource = XContentFactory.jsonBuilder().map(mapping).string();
|
builder.putMapping(new MappingMetaData(mappingType, mapping));
|
||||||
|
|
||||||
if (mappingSource == null) {
|
|
||||||
// crap, no mapping source, warn?
|
|
||||||
} else {
|
|
||||||
builder.putMapping(new MappingMetaData(mappingType, new CompressedString(mappingSource)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,16 +23,24 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.compress.CompressedString;
|
import org.elasticsearch.common.compress.CompressedString;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.xcontent.support.XContentMapValues.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class MappingMetaData {
|
public class MappingMetaData {
|
||||||
|
|
||||||
|
private static ESLogger logger = ESLoggerFactory.getLogger(MappingMetaData.class.getName());
|
||||||
|
|
||||||
public static class Routing {
|
public static class Routing {
|
||||||
|
|
||||||
public static final Routing EMPTY = new Routing(false, null);
|
public static final Routing EMPTY = new Routing(false, null);
|
||||||
|
@ -82,11 +90,31 @@ public class MappingMetaData {
|
||||||
this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path());
|
this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path());
|
||||||
}
|
}
|
||||||
|
|
||||||
public MappingMetaData(String type, CompressedString source) {
|
public MappingMetaData(String type, Map<String, Object> mapping) throws IOException {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
this.source = source;
|
this.source = new CompressedString(XContentFactory.jsonBuilder().map(mapping).string());
|
||||||
|
Map<String, Object> withoutType = mapping;
|
||||||
|
if (mapping.size() == 1 && mapping.containsKey(type)) {
|
||||||
|
withoutType = (Map<String, Object>) mapping.get(type);
|
||||||
|
}
|
||||||
|
if (withoutType.containsKey("_routing")) {
|
||||||
|
boolean required = false;
|
||||||
|
String path = null;
|
||||||
|
Map<String, Object> routingNode = (Map<String, Object>) withoutType.get("_routing");
|
||||||
|
for (Map.Entry<String, Object> entry : routingNode.entrySet()) {
|
||||||
|
String fieldName = Strings.toUnderscoreCase(entry.getKey());
|
||||||
|
Object fieldNode = entry.getValue();
|
||||||
|
if (fieldName.equals("required")) {
|
||||||
|
required = nodeBooleanValue(fieldNode);
|
||||||
|
} else if (fieldName.equals("path")) {
|
||||||
|
path = fieldNode.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.routing = new Routing(required, path);
|
||||||
|
} else {
|
||||||
this.routing = Routing.EMPTY;
|
this.routing = Routing.EMPTY;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MappingMetaData(String type, CompressedString source, Routing routing) {
|
MappingMetaData(String type, CompressedString source, Routing routing) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
|
|
|
@ -105,21 +105,17 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
@Override public ClusterState execute(ClusterState currentState) {
|
||||||
try {
|
try {
|
||||||
if (request.origin == Request.Origin.API) {
|
|
||||||
try {
|
try {
|
||||||
validate(request, currentState);
|
validate(request, currentState);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
List<IndexTemplateMetaData> templates = ImmutableList.of();
|
List<IndexTemplateMetaData> templates = ImmutableList.of();
|
||||||
// we only find a template when its an API call (a new index)
|
// we only find a template when its an API call (a new index)
|
||||||
if (request.origin == Request.Origin.API) {
|
|
||||||
// find templates, highest order are better matching
|
// find templates, highest order are better matching
|
||||||
templates = findTemplates(request, currentState);
|
templates = findTemplates(request, currentState);
|
||||||
}
|
|
||||||
|
|
||||||
// add the request mapping
|
// add the request mapping
|
||||||
Map<String, Map<String, Object>> mappings = Maps.newHashMap();
|
Map<String, Map<String, Object>> mappings = Maps.newHashMap();
|
||||||
|
@ -139,7 +135,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
// now add config level mappings
|
// now add config level mappings
|
||||||
if (request.origin == Request.Origin.API) {
|
|
||||||
File mappingsDir = new File(environment.configFile(), "mappings");
|
File mappingsDir = new File(environment.configFile(), "mappings");
|
||||||
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
|
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
|
||||||
// first index level
|
// first index level
|
||||||
|
@ -154,7 +149,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
addMappings(mappings, defaultMappingsDir);
|
addMappings(mappings, defaultMappingsDir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder();
|
ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder();
|
||||||
// apply templates, here, in reverse order, since first ones are better matching
|
// apply templates, here, in reverse order, since first ones are better matching
|
||||||
|
@ -249,7 +243,18 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
blocks.addIndexBlock(request.index, MetaDataStateIndexService.INDEX_CLOSED_BLOCK);
|
blocks.addIndexBlock(request.index, MetaDataStateIndexService.INDEX_CLOSED_BLOCK);
|
||||||
}
|
}
|
||||||
|
|
||||||
return newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build();
|
ClusterState updatedState = newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build();
|
||||||
|
|
||||||
|
if (request.state == State.OPEN) {
|
||||||
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable());
|
||||||
|
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
|
||||||
|
.initializeEmpty(updatedState.metaData().index(request.index), true);
|
||||||
|
routingTableBuilder.add(indexRoutingBuilder);
|
||||||
|
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
|
||||||
|
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
return updatedState;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("[{}] failed to create", e, request.index);
|
logger.warn("[{}] failed to create", e, request.index);
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
|
@ -258,29 +263,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||||
if (request.state == State.CLOSE) { // no need to do shard allocated when closed...
|
|
||||||
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!request.rerouteAfterCreation) {
|
|
||||||
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
|
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
|
||||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
|
|
||||||
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
|
|
||||||
.initializeEmpty(currentState.metaData().index(request.index), request.origin == Request.Origin.API);
|
|
||||||
routingTableBuilder.add(indexRoutingBuilder);
|
|
||||||
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
|
||||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
|
||||||
logger.info("[{}] created and added to cluster_state", request.index);
|
|
||||||
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -360,13 +343,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
|
|
||||||
public static class Request {
|
public static class Request {
|
||||||
|
|
||||||
public static enum Origin {
|
|
||||||
API,
|
|
||||||
GATEWAY
|
|
||||||
}
|
|
||||||
|
|
||||||
final Origin origin;
|
|
||||||
|
|
||||||
final String cause;
|
final String cause;
|
||||||
|
|
||||||
final String index;
|
final String index;
|
||||||
|
@ -381,10 +357,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
|
|
||||||
Set<ClusterBlock> blocks = Sets.newHashSet();
|
Set<ClusterBlock> blocks = Sets.newHashSet();
|
||||||
|
|
||||||
boolean rerouteAfterCreation = true;
|
public Request(String cause, String index) {
|
||||||
|
|
||||||
public Request(Origin origin, String cause, String index) {
|
|
||||||
this.origin = origin;
|
|
||||||
this.cause = cause;
|
this.cause = cause;
|
||||||
this.index = index;
|
this.index = index;
|
||||||
}
|
}
|
||||||
|
@ -427,11 +400,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Request rerouteAfterCreation(boolean rerouteAfterCreation) {
|
|
||||||
this.rerouteAfterCreation = rerouteAfterCreation;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Response {
|
public static class Response {
|
||||||
|
|
|
@ -24,10 +24,7 @@ import org.elasticsearch.cluster.*;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.*;
|
||||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
@ -40,15 +37,12 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.DiscoveryService;
|
import org.elasticsearch.discovery.DiscoveryService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.ClusterState.*;
|
import static org.elasticsearch.cluster.ClusterState.*;
|
||||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
|
@ -223,9 +217,16 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void onSuccess(final ClusterState recoveredState) {
|
@Override public void onSuccess(final ClusterState recoveredState) {
|
||||||
final AtomicInteger indicesCounter = new AtomicInteger(recoveredState.metaData().indices().size());
|
|
||||||
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
@Override public ClusterState execute(ClusterState currentState) {
|
||||||
|
assert currentState.metaData().indices().isEmpty();
|
||||||
|
|
||||||
|
// remove the block, since we recovered from gateway
|
||||||
|
ClusterBlocks.Builder blocks = ClusterBlocks.builder()
|
||||||
|
.blocks(currentState.blocks())
|
||||||
|
.blocks(recoveredState.blocks())
|
||||||
|
.removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK);
|
||||||
|
|
||||||
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
||||||
.metaData(currentState.metaData());
|
.metaData(currentState.metaData());
|
||||||
|
|
||||||
|
@ -234,59 +235,39 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
metaDataBuilder.put(entry.getValue());
|
metaDataBuilder.put(entry.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
return newClusterStateBuilder().state(currentState)
|
for (IndexMetaData indexMetaData : recoveredState.metaData()) {
|
||||||
|
metaDataBuilder.put(indexMetaData);
|
||||||
|
if (indexMetaData.state() == IndexMetaData.State.CLOSE) {
|
||||||
|
blocks.addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update the state to reflect the new metadata and routing
|
||||||
|
ClusterState updatedState = newClusterStateBuilder().state(currentState)
|
||||||
.version(recoveredState.version())
|
.version(recoveredState.version())
|
||||||
.metaData(metaDataBuilder).build();
|
.blocks(blocks)
|
||||||
|
.metaData(metaDataBuilder)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// initialize all index routing tables as empty
|
||||||
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable());
|
||||||
|
for (IndexMetaData indexMetaData : updatedState.metaData().indices().values()) {
|
||||||
|
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
|
||||||
|
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
|
||||||
|
.initializeEmpty(updatedState.metaData().index(indexMetaData.index()), false /*not from API*/);
|
||||||
|
routingTableBuilder.add(indexRoutingBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, reroute
|
||||||
|
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
|
||||||
|
|
||||||
|
return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||||
if (recoveredState.metaData().indices().isEmpty()) {
|
logger.info("recovered [{}] indices into cluster_state, allocating", clusterState.metaData().indices().size());
|
||||||
markMetaDataAsReadFromGateway("success");
|
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
return;
|
|
||||||
}
|
|
||||||
// go over the meta data and create indices, we don't really need to copy over
|
|
||||||
// the meta data per index, since we create the index and it will be added automatically
|
|
||||||
|
|
||||||
// also, don't reroute (or even initialize the routing table) for the indices created, we will do it
|
|
||||||
// in one batch once creating those indices is done
|
|
||||||
for (final IndexMetaData indexMetaData : recoveredState.metaData()) {
|
|
||||||
try {
|
|
||||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index())
|
|
||||||
.settings(indexMetaData.settings())
|
|
||||||
.mappingsMetaData(indexMetaData.mappings())
|
|
||||||
.state(indexMetaData.state())
|
|
||||||
.rerouteAfterCreation(false)
|
|
||||||
.timeout(timeValueSeconds(30)),
|
|
||||||
|
|
||||||
new MetaDataCreateIndexService.Listener() {
|
|
||||||
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
|
||||||
if (indicesCounter.decrementAndGet() == 0) {
|
|
||||||
markMetaDataAsReadFromGateway("success");
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void onFailure(Throwable t) {
|
|
||||||
logger.error("failed to create index [{}]", t, indexMetaData.index());
|
|
||||||
// we report success on index creation failure and do nothing
|
|
||||||
// should we disable writing the updated metadata?
|
|
||||||
if (indicesCounter.decrementAndGet() == 0) {
|
|
||||||
markMetaDataAsReadFromGateway("success");
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("failed to create index [{}]", e, indexMetaData.index());
|
|
||||||
// we report success on index creation failure and do nothing
|
|
||||||
// should we disable writing the updated metadata?
|
|
||||||
if (indicesCounter.decrementAndGet() == 0) {
|
|
||||||
markMetaDataAsReadFromGateway("success");
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -296,34 +277,4 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
logger.error("failed recover state, blocking...", t);
|
logger.error("failed recover state, blocking...", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markMetaDataAsReadFromGateway(String reason) {
|
|
||||||
clusterService.submitStateUpdateTask("gateway (marked as read, reroute, reason=" + reason + ")", new ProcessedClusterStateUpdateTask() {
|
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
|
||||||
if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
// remove the block, since we recovered from gateway
|
|
||||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK);
|
|
||||||
|
|
||||||
// initialize all index routing tables as empty
|
|
||||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
|
|
||||||
for (IndexMetaData indexMetaData : currentState.metaData().indices().values()) {
|
|
||||||
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
|
|
||||||
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
|
|
||||||
.initializeEmpty(currentState.metaData().index(indexMetaData.index()), false);
|
|
||||||
routingTableBuilder.add(indexRoutingBuilder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
|
||||||
|
|
||||||
return newClusterStateBuilder().state(currentState).blocks(blocks).routingResult(routingResult).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
|
||||||
logger.info("all indices created and rerouting has begun");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -221,6 +221,7 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE, we also parse this in MappingMetaData
|
||||||
private RoutingFieldMapper.Builder parseRoutingField(Map<String, Object> routingNode, XContentMapper.TypeParser.ParserContext parserContext) {
|
private RoutingFieldMapper.Builder parseRoutingField(Map<String, Object> routingNode, XContentMapper.TypeParser.ParserContext parserContext) {
|
||||||
RoutingFieldMapper.Builder builder = routing();
|
RoutingFieldMapper.Builder builder = routing();
|
||||||
parseField(builder, builder.name, routingNode, parserContext);
|
parseField(builder, builder.name, routingNode, parserContext);
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
|
||||||
|
|
||||||
logger.info("--> trying to index into a closed index ...");
|
logger.info("--> trying to index into a closed index ...");
|
||||||
try {
|
try {
|
||||||
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
|
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet();
|
||||||
assert false;
|
assert false;
|
||||||
} catch (ClusterBlockException e) {
|
} catch (ClusterBlockException e) {
|
||||||
// all is well
|
// all is well
|
||||||
|
@ -200,7 +200,7 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
|
||||||
|
|
||||||
logger.info("--> trying to index into a closed index ...");
|
logger.info("--> trying to index into a closed index ...");
|
||||||
try {
|
try {
|
||||||
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
|
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet();
|
||||||
assert false;
|
assert false;
|
||||||
} catch (ClusterBlockException e) {
|
} catch (ClusterBlockException e) {
|
||||||
// all is well
|
// all is well
|
||||||
|
|
Loading…
Reference in New Issue