fsync translog when closing and not deleting, allocate failed shard to another started shard

This commit is contained in:
kimchy 2010-08-30 16:55:57 +03:00
parent d9979f8dfe
commit 908fba44e7
3 changed files with 70 additions and 9 deletions

View File

@ -29,8 +29,6 @@ import org.elasticsearch.common.settings.Settings;
import java.io.File;
import java.io.IOException;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (shay.banon)
*/
@ -40,11 +38,7 @@ public class NodeEnvironment extends AbstractComponent {
private final Lock lock;
public NodeEnvironment(File nodeFile) {
super(EMPTY_SETTINGS);
this.nodeFile = nodeFile;
this.lock = null;
}
private final int localNodeId;
@Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);
@ -53,11 +47,13 @@ public class NodeEnvironment extends AbstractComponent {
!settings.getAsBoolean("node.master", true)) {
nodeFile = null;
lock = null;
localNodeId = -1;
return;
}
Lock lock = null;
File dir = null;
int localNodeId = -1;
for (int i = 0; i < 100; i++) {
dir = new File(new File(environment.workWithClusterFile(), "nodes"), Integer.toString(i));
if (!dir.exists()) {
@ -69,6 +65,7 @@ public class NodeEnvironment extends AbstractComponent {
boolean obtained = tmpLock.obtain();
if (obtained) {
lock = tmpLock;
localNodeId = i;
break;
}
} catch (IOException e) {
@ -78,13 +75,18 @@ public class NodeEnvironment extends AbstractComponent {
if (lock == null) {
throw new IOException("Failed to obtain node lock");
}
this.localNodeId = localNodeId;
this.lock = lock;
this.nodeFile = dir;
if (logger.isDebugEnabled()) {
logger.debug("using node location [{}]", dir);
logger.debug("using node location [{}], local_node_id [{}]", dir, localNodeId);
}
}
public int localNodeId() {
return this.localNodeId;
}
public boolean hasNodeFile() {
return nodeFile != null && lock != null;
}

View File

@ -36,6 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
/**
* @author kimchy (shay.banon)
*/
@ -49,7 +51,60 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
// TODO when a shard failed and we in the initial allocation, find an existing one
for (ShardRouting failedShard : failedShards) {
IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(failedShard.index());
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
continue;
}
// we are still in the initial allocation, find another node with existing shards
// all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(nodes.dataNodes().keySet());
nodesIds.addAll(nodes.masterNodes().keySet());
TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet();
// make a list of ShardId to Node, each one from the latest version
Tuple<DiscoveryNode, Long> t = null;
for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) {
// we don't want to reallocate to the node we failed on
if (nodeState.node().id().equals(failedShard.currentNodeId())) {
continue;
}
// go and find
for (Map.Entry<ShardId, Long> entry : nodeState.state().shards().entrySet()) {
if (entry.getKey().equals(failedShard.shardId())) {
if (t == null || entry.getValue() > t.v2().longValue()) {
t = new Tuple<DiscoveryNode, Long>(nodeState.node(), entry.getValue());
}
}
}
}
if (t != null) {
// we found a node to allocate to, do it
RoutingNode currentRoutingNode = routingNodes.nodesToShards().get(failedShard.currentNodeId());
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
continue;
}
// find the shard and cancel relocation
Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shard.deassignNode();
shards.remove();
break;
}
}
RoutingNode targetNode = routingNodes.nodesToShards().get(t.v1().id());
targetNode.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
targetNode.nodeId(), failedShard.relocatingNodeId(),
failedShard.primary(), INITIALIZING));
}
}
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {

View File

@ -19,6 +19,8 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.io.FileSystemUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -60,6 +62,8 @@ public class RafReference {
raf.close();
if (delete) {
file.delete();
} else {
FileSystemUtils.syncFile(file);
}
} catch (IOException e) {
// ignore