Auto import dangling indices, closes #2067.

This commit is contained in:
Shay Banon 2012-06-29 01:01:26 +02:00
parent 07454243e3
commit f2e39e4ee2
13 changed files with 609 additions and 146 deletions

View File

@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
@ -233,6 +235,25 @@ public class ClusterBlocks {
return this;
}
public Builder addBlocks(IndexMetaData indexMetaData) {
if (indexMetaData.state() == IndexMetaData.State.CLOSE) {
addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_READ_ONLY, false)) {
addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_ONLY_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_READ, false)) {
addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_WRITE, false)) {
addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_WRITE_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_METADATA, false)) {
addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_METADATA_BLOCK);
}
return this;
}
public Builder addGlobalBlock(ClusterBlock block) {
global.add(block);
return this;

View File

@ -410,6 +410,11 @@ public class IndexMetaData {
return index;
}
public Builder index(String name) {
this.index = index;
return this;
}
public Builder numberOfShards(int numberOfShards) {
settings = settingsBuilder().put(settings).put(SETTING_NUMBER_OF_SHARDS, numberOfShards).build();
return this;

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -319,10 +318,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
ClusterState updatedState = newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build();
if (request.state == State.OPEN) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable());
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
.initializeEmpty(updatedState.metaData().index(request.index), true);
routingTableBuilder.add(indexRoutingBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable())
.add(updatedState.metaData().index(request.index), true);
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -127,10 +126,8 @@ public class MetaDataStateIndexService extends AbstractComponent {
ClusterState updatedState = ClusterState.builder().state(currentState).metaData(mdBuilder).blocks(blocks).build();
RoutingTable.Builder rtBuilder = RoutingTable.builder().routingTable(updatedState.routingTable());
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
.initializeEmpty(updatedState.metaData().index(request.index), false);
rtBuilder.add(indexRoutingBuilder);
RoutingTable.Builder rtBuilder = RoutingTable.builder().routingTable(updatedState.routingTable())
.add(updatedState.metaData().index(request.index), false);
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build());

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import com.google.common.collect.*;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -340,6 +341,15 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return this;
}
public Builder add(IndexMetaData indexMetaData, boolean fromApi) {
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
.initializeEmpty(indexMetaData, fromApi);
add(indexRoutingBuilder);
}
return this;
}
public Builder add(IndexRoutingTable indexRoutingTable) {
indexRoutingTable.validate();
indicesRouting.put(indexRoutingTable.index(), indexRoutingTable);

View File

@ -26,9 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -256,21 +254,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
for (IndexMetaData indexMetaData : recoveredState.metaData()) {
metaDataBuilder.put(indexMetaData, false);
if (indexMetaData.state() == IndexMetaData.State.CLOSE) {
blocks.addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_READ_ONLY, false)) {
blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_ONLY_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_READ, false)) {
blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_READ_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_WRITE, false)) {
blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_WRITE_BLOCK);
}
if (indexMetaData.settings().getAsBoolean(IndexMetaData.SETTING_BLOCKS_METADATA, false)) {
blocks.addIndexBlock(indexMetaData.index(), IndexMetaData.INDEX_METADATA_BLOCK);
}
blocks.addBlocks(indexMetaData);
}
// update the state to reflect the new metadata and routing
@ -282,11 +266,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// initialize all index routing tables as empty
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(updatedState.routingTable());
for (IndexMetaData indexMetaData : updatedState.metaData().indices().values()) {
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
.initializeEmpty(updatedState.metaData().index(indexMetaData.index()), false /*not from API*/);
routingTableBuilder.add(indexRoutingBuilder);
}
routingTableBuilder.add(indexMetaData, false /* not from API */);
}
// start with 0 based versions for routing table
routingTableBuilder.version(0);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.local.state.meta.LocalAllocateDangledIndices;
import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState;
import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaState;
import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState;
@ -41,6 +42,7 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(LocalGatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
bind(LocalAllocateDangledIndices.class).asEagerSingleton();
}
@Override

View File

@ -0,0 +1,233 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local.state.meta;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
/**
*/
public class LocalAllocateDangledIndices extends AbstractComponent {
private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService;
@Inject
public LocalAllocateDangledIndices(Settings settings, TransportService transportService, ClusterService clusterService, AllocationService allocationService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.allocationService = allocationService;
transportService.registerHandler(new AllocateDangledRequestHandler());
}
public void allocateDangled(IndexMetaData[] indices, final Listener listener) {
ClusterState clusterState = clusterService.state();
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) {
listener.onFailure(new MasterNotDiscoveredException("no master to send allocate dangled request"));
return;
}
AllocateDangledRequest request = new AllocateDangledRequest(clusterState.nodes().localNode(), indices);
transportService.sendRequest(masterNode, AllocateDangledRequestHandler.ACTION, request, new TransportResponseHandler<AllocateDangledResponse>() {
@Override
public AllocateDangledResponse newInstance() {
return new AllocateDangledResponse();
}
@Override
public void handleResponse(AllocateDangledResponse response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
public static interface Listener {
void onResponse(AllocateDangledResponse response);
void onFailure(Throwable e);
}
class AllocateDangledRequestHandler implements ActionTransportRequestHandler<AllocateDangledRequest> {
public static final String ACTION = "/gateway/local/allocate_dangled";
@Override
public String action() {
return ACTION;
}
@Override
public AllocateDangledRequest newInstance() {
return new AllocateDangledRequest();
}
@Override
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel) throws Exception {
String[] indexNames = new String[request.indices.length];
for (int i = 0; i < request.indices.length; i++) {
indexNames[i] = request.indices[i].index();
}
clusterService.submitStateUpdateTask("allocation dangled indices " + Arrays.toString(indexNames), new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.blocks().disableStatePersistence()) {
return currentState;
}
MetaData.Builder metaData = MetaData.builder()
.metaData(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
boolean importNeeded = false;
StringBuilder sb = new StringBuilder();
for (IndexMetaData indexMetaData : request.indices) {
if (currentState.metaData().hasIndex(indexMetaData.index())) {
continue;
}
importNeeded = true;
metaData.put(indexMetaData, false);
blocks.addBlocks(indexMetaData);
routingTableBuilder.add(indexMetaData, false);
sb.append("[").append(indexMetaData.index()).append("/").append(indexMetaData.state()).append("]");
}
if (!importNeeded) {
return currentState;
}
logger.info("auto importing dangled indices {} from [{}]", sb.toString(), request.fromNode);
ClusterState updatedState = ClusterState.builder().state(currentState).metaData(metaData).blocks(blocks).routingTable(routingTableBuilder).build();
// now, reroute
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
return ClusterState.builder().state(updatedState).routingResult(routingResult).build();
}
@Override
public void clusterStateProcessed(ClusterState clusterState) {
try {
channel.sendResponse(new AllocateDangledResponse(true));
} catch (IOException e) {
logger.error("failed send response for allocating dangled", e);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
static class AllocateDangledRequest implements Streamable {
DiscoveryNode fromNode;
IndexMetaData[] indices;
AllocateDangledRequest() {
}
AllocateDangledRequest(DiscoveryNode fromNode, IndexMetaData[] indices) {
this.fromNode = fromNode;
this.indices = indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
fromNode = DiscoveryNode.readNode(in);
indices = new IndexMetaData[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = IndexMetaData.Builder.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
fromNode.writeTo(out);
out.writeVInt(indices.length);
for (IndexMetaData indexMetaData : indices) {
IndexMetaData.Builder.writeTo(indexMetaData, out);
}
}
}
public static class AllocateDangledResponse implements Streamable {
private boolean ack;
AllocateDangledResponse() {
}
AllocateDangledResponse(boolean ack) {
this.ack = ack;
}
public boolean ack() {
return ack;
}
@Override
public void readFrom(StreamInput in) throws IOException {
ack = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(ack);
}
}
}

View File

@ -19,8 +19,10 @@
package org.elasticsearch.gateway.local.state.meta;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
@ -46,6 +48,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
@ -55,25 +58,65 @@ import java.util.concurrent.ScheduledFuture;
*/
public class LocalGatewayMetaState extends AbstractComponent implements ClusterStateListener {
static enum AutoImportDangledState {
NO() {
@Override
public boolean shouldImport() {
return false;
}
},
YES() {
@Override
public boolean shouldImport() {
return true;
}
},
CLOSED() {
@Override
public boolean shouldImport() {
return true;
}
};
public abstract boolean shouldImport();
public static AutoImportDangledState fromString(String value) {
if ("no".equalsIgnoreCase(value)) {
return NO;
} else if ("yes".equalsIgnoreCase(value)) {
return YES;
} else if ("closed".equalsIgnoreCase(value)) {
return CLOSED;
} else {
throw new ElasticSearchIllegalArgumentException("failed to parse [" + value + "], not a valid auto dangling import type");
}
}
}
private final NodeEnvironment nodeEnv;
private final ThreadPool threadPool;
private final LocalAllocateDangledIndices allocateDangledIndices;
private volatile MetaData currentMetaData;
private final XContentType format;
private final ToXContent.Params formatParams;
private final AutoImportDangledState autoImportDangled;
private final TimeValue danglingTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();
@Inject
public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv,
TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.format = XContentType.fromRestContentType(settings.get("format", "smile"));
this.allocateDangledIndices = allocateDangledIndices;
nodesListGatewayMetaState.init(this);
if (this.format == XContentType.SMILE) {
@ -84,8 +127,11 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
formatParams = ToXContent.EMPTY_PARAMS;
}
this.autoImportDangled = AutoImportDangledState.fromString(settings.get("gateway.local.auto_import_dangled", AutoImportDangledState.YES.toString()));
this.danglingTimeout = settings.getAsTime("gateway.local.dangling_timeout", TimeValue.timeValueHours(2));
logger.debug("using gateway.local.auto_import_dangled [{}], with gateway.local.dangling_timeout [{}]", this.autoImportDangled, this.danglingTimeout);
if (DiscoveryNode.masterNode(settings)) {
try {
pre019Upgrade();
@ -113,47 +159,44 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
return;
}
if (!event.state().nodes().localNode().masterNode()) {
return;
}
// we don't check if metaData changed, since we might be called several times and we need to check dangling...
if (!event.metaDataChanged()) {
return;
}
// check if the global state changed?
boolean success = true;
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) {
try {
writeGlobalState("changed", event.state().metaData(), currentMetaData);
} catch (Exception e) {
success = false;
// only applied to master node, writing the global and index level states
if (event.state().nodes().localNode().masterNode()) {
// check if the global state changed?
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) {
try {
writeGlobalState("changed", event.state().metaData(), currentMetaData);
} catch (Exception e) {
success = false;
}
}
// check and write changes in indices
for (IndexMetaData indexMetaData : event.state().metaData()) {
String writeReason = null;
IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index());
if (currentIndexMetaData == null) {
writeReason = "freshly created";
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
}
// we update the writeReason only if we really need to write it
if (writeReason == null) {
continue;
}
try {
writeIndex(writeReason, indexMetaData, currentIndexMetaData);
} catch (Exception e) {
success = false;
}
}
}
// check and write changes in indices
for (IndexMetaData indexMetaData : event.state().metaData()) {
String writeReason = null;
IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index());
if (currentIndexMetaData == null) {
writeReason = "freshly created";
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
}
// we update the writeReason only if we really need to write it
if (writeReason == null) {
continue;
}
try {
writeIndex(writeReason, indexMetaData, currentIndexMetaData);
} catch (Exception e) {
success = false;
}
}
// handle dangling indices
// handle dangling indices, we handle those for all nodes that have a node file (data or master)
if (nodeEnv.hasNodeFile()) {
if (danglingTimeout.millis() >= 0) {
synchronized (danglingMutex) {
@ -179,7 +222,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName)));
} else {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}]", indexName, danglingTimeout);
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
}
}
@ -188,14 +231,49 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
}
}
}
if (autoImportDangled.shouldImport() && !danglingIndices.isEmpty()) {
final List<IndexMetaData> dangled = Lists.newArrayList();
for (String indexName : danglingIndices.keySet()) {
IndexMetaData indexMetaData = loadIndex(indexName);
// we might have someone copying over an index, renaming the directory, handle that
if (!indexMetaData.index().equals(indexName)) {
logger.info("dangled index directory name is [{}], state name is [{}], renaming to directory name", indexName, indexMetaData.index());
indexMetaData = IndexMetaData.newIndexMetaDataBuilder(indexMetaData).index(indexName).build();
}
if (autoImportDangled == AutoImportDangledState.CLOSED) {
indexMetaData = IndexMetaData.newIndexMetaDataBuilder(indexMetaData).state(IndexMetaData.State.CLOSE).build();
}
if (indexMetaData != null) {
dangled.add(indexMetaData);
}
}
IndexMetaData[] dangledIndices = dangled.toArray(new IndexMetaData[dangled.size()]);
try {
allocateDangledIndices.allocateDangled(dangledIndices, new LocalAllocateDangledIndices.Listener() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {
logger.trace("allocated dangled");
}
@Override
public void onFailure(Throwable e) {
logger.info("failed to send allocated dangled", e);
}
});
} catch (Exception e) {
logger.warn("failed to send allocate dangled", e);
}
}
}
// delete indices that are no longer there...
if (currentMetaData != null) {
for (IndexMetaData current : currentMetaData) {
if (event.state().metaData().index(current.index()) == null) {
if (!danglingIndices.containsKey(current.index())) {
deleteIndex(current.index());
if (event.state().nodes().localNode().masterNode()) {
// delete indices that are no longer there..., allocated dangled ones
if (currentMetaData != null) {
for (IndexMetaData current : currentMetaData) {
if (event.state().metaData().index(current.index()) == null) {
if (!danglingIndices.containsKey(current.index())) {
deleteIndex(current.index());
}
}
}
}

View File

@ -66,7 +66,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
try {
pre019Upgrade();
long start = System.currentTimeMillis();
loadStartedShards();
currentState = loadShardsStateInfo();
logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
} catch (Exception e) {
logger.error("failed to read local state (started shards), exiting...", e);
@ -79,6 +79,10 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
return this.currentState;
}
public ShardStateInfo loadShardInfo(ShardId shardId) throws Exception {
return loadShardStateInfo(shardId);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
@ -166,65 +170,65 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
this.currentState = newState;
}
private void loadStartedShards() throws Exception {
private Map<ShardId, ShardStateInfo> loadShardsStateInfo() throws Exception {
Set<ShardId> shardIds = nodeEnv.findAllShardIds();
long highestVersion = -1;
Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
for (ShardId shardId : shardIds) {
long highestShardVersion = -1;
ShardStateInfo highestShardState = null;
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state");
if (!shardStateDir.exists() || !shardStateDir.isDirectory()) {
continue;
}
// now, iterate over the current versions, and find latest one
File[] stateFiles = shardStateDir.listFiles();
if (stateFiles == null) {
continue;
}
for (File stateFile : stateFiles) {
if (!stateFile.getName().startsWith("state-")) {
continue;
}
try {
long version = Long.parseLong(stateFile.getName().substring("state-".length()));
if (version > highestShardVersion) {
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
if (data.length == 0) {
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
ShardStateInfo readState = readShardState(data);
if (readState == null) {
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
assert readState.version == version;
highestShardState = readState;
highestShardVersion = version;
}
} catch (Exception e) {
logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id());
}
}
}
// did we find a state file?
if (highestShardState == null) {
ShardStateInfo shardStateInfo = loadShardStateInfo(shardId);
if (shardStateInfo == null) {
continue;
}
shardsState.put(shardId, highestShardState);
shardsState.put(shardId, shardStateInfo);
// update the global version
if (highestShardVersion > highestVersion) {
highestVersion = highestShardVersion;
if (shardStateInfo.version > highestVersion) {
highestVersion = shardStateInfo.version;
}
}
// update the current started shards only if there is data there...
if (highestVersion != -1) {
currentState = shardsState;
return shardsState;
}
private ShardStateInfo loadShardStateInfo(ShardId shardId) {
long highestShardVersion = -1;
ShardStateInfo highestShardState = null;
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state");
if (!shardStateDir.exists() || !shardStateDir.isDirectory()) {
continue;
}
// now, iterate over the current versions, and find latest one
File[] stateFiles = shardStateDir.listFiles();
if (stateFiles == null) {
continue;
}
for (File stateFile : stateFiles) {
if (!stateFile.getName().startsWith("state-")) {
continue;
}
try {
long version = Long.parseLong(stateFile.getName().substring("state-".length()));
if (version > highestShardVersion) {
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
if (data.length == 0) {
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
ShardStateInfo readState = readShardState(data);
if (readState == null) {
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
assert readState.version == version;
highestShardState = readState;
highestShardVersion = version;
}
} catch (Exception e) {
logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id());
}
}
}
return highestShardState;
}
@Nullable

View File

@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -117,15 +116,15 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
@Override
protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException {
Map<ShardId, ShardStateInfo> shardsStateInfo = shardsState.currentStartedShards();
if (shardsStateInfo != null) {
for (Map.Entry<ShardId, ShardStateInfo> entry : shardsStateInfo.entrySet()) {
if (entry.getKey().equals(request.shardId)) {
return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue().version);
}
try {
ShardStateInfo shardStateInfo = shardsState.loadShardInfo(request.shardId);
if (shardStateInfo != null) {
return new NodeLocalGatewayStartedShards(clusterService.localNode(), shardStateInfo.version);
}
return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1);
} catch (Exception e) {
throw new ElasticSearchException("failed to load started shards", e);
}
return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1);
}
@Override

View File

@ -149,7 +149,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
continue;
}
// only delete an unallocated shard if all (other shards) are started
if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) {
int startedShardsCount = indexShardRoutingTable.countWithState(ShardRoutingState.STARTED);
if (startedShardsCount > 0 && startedShardsCount == indexShardRoutingTable.size()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] deleting unallocated shard", indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id());
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
@ -325,15 +326,19 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
}
@Test
public void testDanglingIndices() throws Exception {
public void testDanglingIndicesAutoImportYes() throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "yes")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
@ -357,13 +362,139 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
// we need to wait for the allocate dangled to kick in
Thread.sleep(500);
logger.info("--> verify that the dangling index exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(true));
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify the doc is there");
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
@Test
public void testDanglingIndicesAutoImportClose() throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "closed")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true));
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
// we need to wait for the allocate dangled to kick in
Thread.sleep(500);
logger.info("--> verify that the dangling index exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(true));
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify the index state is closed");
assertThat(client("node1").admin().cluster().prepareState().execute().actionGet().state().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
logger.info("--> open the index");
client("node1").admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify the doc is there");
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
@Test
public void testDanglingIndicesNoAutoImport() throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true));
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
// we need to wait for the allocate dangled to kick in (even though in this case its disabled)
// just to make sure
Thread.sleep(500);
logger.info("--> verify that the dangling index does not exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(false));
@ -372,8 +503,8 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
closeNode("node2");
logger.info("--> start the nodes back, but make sure we do recovery only after we have 2 nodes in the cluster");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("gateway.recover_after_nodes", 2).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("gateway.recover_after_nodes", 2).build());
startNode("node1", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build());
startNode("node2", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build());
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
@ -386,15 +517,20 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
}
@Test
public void testDanglingIndicesStillDanglingAndCreatingSameIndex() throws Exception {
public void testDanglingIndicesNoAutoImportStillDanglingAndCreatingSameIndex() throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
@ -417,8 +553,8 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();