improve allocation of shards based on existing work location, wait with index creation on other nodes, so listing their respective existing location will work

This commit is contained in:
kimchy 2010-07-12 01:33:38 +03:00
parent 6722e05418
commit fe50a6f64e
10 changed files with 355 additions and 291 deletions

View File

@ -63,13 +63,15 @@ public class ElasticSearchException extends RuntimeException {
public String getDetailedMessage() {
if (getCause() != null) {
StringBuilder sb = new StringBuilder();
if (super.getMessage() != null) {
sb.append(super.getMessage()).append("; ");
sb.append(toString()).append("; ");
if (getCause() instanceof ElasticSearchException) {
sb.append(((ElasticSearchException) getCause()).getDetailedMessage());
} else {
sb.append(getCause());
}
sb.append("nested exception is ").append(getCause());
return sb.toString();
} else {
return super.getMessage();
return super.toString();
}
}

View File

@ -87,6 +87,8 @@ public class MetaDataService extends AbstractComponent {
private final NodeMappingCreatedAction nodeMappingCreatedAction;
private final Object mutex = new Object();
@Inject public MetaDataService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService, ShardsRoutingStrategy shardsRoutingStrategy,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction) {
@ -102,7 +104,8 @@ public class MetaDataService extends AbstractComponent {
// TODO should find nicer solution than sync here, since we block for timeout (same for other ops)
public synchronized IndicesAliasesResult indicesAliases(final List<AliasAction> aliasActions) {
public IndicesAliasesResult indicesAliases(final List<AliasAction> aliasActions) {
synchronized (mutex) {
ClusterState clusterState = clusterService.state();
for (AliasAction aliasAction : aliasActions) {
@ -138,8 +141,18 @@ public class MetaDataService extends AbstractComponent {
return new IndicesAliasesResult();
}
}
public synchronized CreateIndexResult createIndex(final String cause, final String index, final Settings indexSettings, Map<String, String> mappings, TimeValue timeout) throws IndexAlreadyExistsException {
public CreateIndexResult createIndex(final String cause, final String index, final Settings indexSettings, Map<String, String> mappings, TimeValue timeout) throws IndexAlreadyExistsException {
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size());
NodeIndexCreatedAction.Listener nodeCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String mIndex, String nodeId) {
if (index.equals(mIndex)) {
latch.countDown();
}
}
};
synchronized (mutex) {
ClusterState clusterState = clusterService.state();
if (clusterState.routingTable().hasIndex(index)) {
@ -190,14 +203,6 @@ public class MetaDataService extends AbstractComponent {
final Map<String, String> fMappings = mappings;
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size());
NodeIndexCreatedAction.Listener nodeCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String mIndex, String nodeId) {
if (index.equals(mIndex)) {
latch.countDown();
}
}
};
nodeIndexCreatedAction.add(nodeCreatedListener);
clusterService.submitStateUpdateTask("create-index [" + index + "], cause [" + cause + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
@ -223,6 +228,7 @@ public class MetaDataService extends AbstractComponent {
return newClusterStateBuilder().state(currentState).metaData(newMetaData).build();
}
});
}
boolean acknowledged;
try {
@ -279,16 +285,8 @@ public class MetaDataService extends AbstractComponent {
}
}
public synchronized DeleteIndexResult deleteIndex(final String index, TimeValue timeout) throws IndexMissingException {
public DeleteIndexResult deleteIndex(final String index, TimeValue timeout) throws IndexMissingException {
ClusterState clusterState = clusterService.state();
RoutingTable routingTable = clusterState.routingTable();
if (!routingTable.hasIndex(index)) {
throw new IndexMissingException(new Index(index));
}
logger.info("[{}] deleting index", index);
final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size());
NodeIndexDeletedAction.Listener listener = new NodeIndexDeletedAction.Listener() {
@Override public void onNodeIndexDeleted(String fIndex, String nodeId) {
@ -298,6 +296,15 @@ public class MetaDataService extends AbstractComponent {
}
};
nodeIndexDeletedAction.add(listener);
synchronized (mutex) {
RoutingTable routingTable = clusterState.routingTable();
if (!routingTable.hasIndex(index)) {
throw new IndexMissingException(new Index(index));
}
logger.info("[{}] deleting index", index);
clusterService.submitStateUpdateTask("delete-index [" + index + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
@ -316,6 +323,8 @@ public class MetaDataService extends AbstractComponent {
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build();
}
});
}
boolean acknowledged;
try {
acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS);
@ -327,7 +336,8 @@ public class MetaDataService extends AbstractComponent {
return new DeleteIndexResult(acknowledged);
}
public synchronized void updateMapping(final String index, final String type, final String mappingSource) {
public void updateMapping(final String index, final String type, final String mappingSource) {
synchronized (mutex) {
MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
DocumentMapper existingMapper = mapperService.documentMapper(type);
@ -356,9 +366,21 @@ public class MetaDataService extends AbstractComponent {
}
});
}
}
public synchronized PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreConflicts, TimeValue timeout) throws ElasticSearchException {
public PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreConflicts, TimeValue timeout) throws ElasticSearchException {
ClusterState clusterState = clusterService.state();
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length);
final Set<String> indicesSet = newHashSet(indices);
final String fMappingType = mappingType;
NodeMappingCreatedAction.Listener listener = new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (indicesSet.contains(response.index()) && response.type().equals(fMappingType)) {
latch.countDown();
}
}
};
synchronized (mutex) {
if (indices.length == 0) {
throw new IndexMissingException(new Index("_all"));
}
@ -423,16 +445,6 @@ public class MetaDataService extends AbstractComponent {
}
}
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length);
final Set<String> indicesSet = newHashSet(indices);
final String fMappingType = mappingType;
NodeMappingCreatedAction.Listener listener = new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (indicesSet.contains(response.index()) && response.type().equals(fMappingType)) {
latch.countDown();
}
}
};
nodeMappingCreatedAction.add(listener);
clusterService.submitStateUpdateTask("put-mapping [" + mappingType + "]", new ClusterStateUpdateTask() {
@ -449,6 +461,7 @@ public class MetaDataService extends AbstractComponent {
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
}
boolean acknowledged;
try {

View File

@ -20,14 +20,17 @@
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.store.IndexStore;
@ -36,6 +39,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import java.util.Iterator;
import java.util.Set;
/**
* @author kimchy (shay.banon)
@ -53,9 +57,20 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
}
public boolean allocateUnassigned(RoutingNodes routingNodes) {
public boolean allocateUnassigned(RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
Set<String> nodesIds = Sets.newHashSet();
for (DiscoveryNode node : nodes) {
if (node.dataNode()) {
nodesIds.add(node.id());
}
}
if (nodesIds.isEmpty()) {
return changed;
}
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
@ -68,7 +83,25 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
continue;
}
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false).actionGet();
if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active
MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
if (primary == null || !primary.active()) {
continue;
}
}
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodesIds.toArray(new String[nodesIds.size()])).actionGet();
if (logger.isWarnEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:\n");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
sb.append(i).append(". ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()).append("\n");
}
logger.warn(sb.toString());
}
}
long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null;
@ -76,6 +109,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
DiscoveryNode discoNode = nodeStoreFilesMetaData.node();
logger.trace("{}: checking node [{}]", shard, discoNode);
IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreFilesMetaData.storeFilesMetaData();
if (storeFilesMetaData == null) {
@ -104,16 +138,39 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
try {
ImmutableMap<String, BlobMetaData> indexBlobsMetaData = indexGateway.listIndexBlobs(shard.id());
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");
sb.append(" gateway_files:\n");
for (BlobMetaData md : indexBlobsMetaData.values()) {
sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n");
}
sb.append(" node_files:\n");
for (StoreFileMetaData md : storeFilesMetaData) {
sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n");
}
logger.debug(sb.toString());
}
logger.trace("{}: checking for pre_allocation (gateway) on node [{}]\n gateway files", shard, discoNode, indexBlobsMetaData.keySet());
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (indexBlobsMetaData.containsKey(storeFileMetaData.name()) && indexBlobsMetaData.get(storeFileMetaData.name()).md5().equals(storeFileMetaData.md5())) {
if (indexBlobsMetaData.containsKey(storeFileMetaData.name())) {
if (indexBlobsMetaData.get(storeFileMetaData.name()).md5().equals(storeFileMetaData.md5())) {
logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway (same md5) with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.sizeInBytes()));
sizeMatched += storeFileMetaData.sizeInBytes();
} else {
logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different md5, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.md5(), indexBlobsMetaData.get(storeFileMetaData.name()).md5());
}
} else {
logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name());
}
}
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
lastNodeMatched = node;
logger.trace("{}: node elected for pre_allocation [{}], total_size_matched [{}]", shard, discoNode, new ByteSizeValue(sizeMatched));
} else {
logger.trace("{}: node ignored for pre_allocation [{}], total_size_matched [{}] smaller than last_size_matched [{}]", shard, discoNode, new ByteSizeValue(sizeMatched), new ByteSizeValue(lastSizeMatched));
}
continue;
@ -152,7 +209,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
if (lastNodeMatched != null) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] allocating to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), lastDiscoNodeMatched);
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we found a match
changed = true;

View File

@ -111,7 +111,7 @@ public class ShardsRoutingStrategy extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (routingNodes.hasUnassigned()) {
if (preferUnallocatedShardUnassignedStrategy != null) {
changed |= preferUnallocatedShardUnassignedStrategy.allocateUnassigned(routingNodes);
changed |= preferUnallocatedShardUnassignedStrategy.allocateUnassigned(routingNodes, nodes);
}
changed |= allocateUnassigned(routingNodes);
// elect primaries again, in case this is needed with unassigned allocation

View File

@ -247,14 +247,18 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : fMetaData) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(10));
metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueSeconds(30));
} catch (Exception e) {
logger.error("failed to create index [" + indexMetaData.index() + "]", e);
} finally {
latch.countDown();
}
}
});
}
clusterService.submitStateUpdateTask("gateway (remove block)", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NOT_RECOVERED_FROM_GATEWAY_BLOCK);

View File

@ -269,8 +269,8 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
} else if (cause instanceof IgnoreRecoveryException) {
throw (IgnoreRecoveryException) cause;
} else if (cause instanceof NodeNotConnectedException) {
throw new IgnoreRecoveryException("Ignore recovery attemot, remote node not connected", e);
} else if ((cause instanceof NodeNotConnectedException) || (cause instanceof NodeDisconnectedException)) {
throw new IgnoreRecoveryException("Ignore recovery attempt, remote node not connected", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
} catch (Exception e) {

View File

@ -21,14 +21,12 @@ package org.elasticsearch.indices.store;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -42,7 +40,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -58,15 +55,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
this.indicesService = indicesService;
}
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated) {
ClusterState state = clusterService.state();
Set<String> nodesIds = Sets.newHashSet();
for (DiscoveryNode node : state.nodes()) {
if (node.dataNode()) {
nodesIds.add(node.id());
}
}
return execute(new Request(shardId, onlyUnallocated, nodesIds.toArray(new String[nodesIds.size()])));
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, String[] nodesIds) {
return execute(new Request(shardId, onlyUnallocated, nodesIds));
}
@Override protected String transportAction() {
@ -93,27 +83,19 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
return new NodeStoreFilesMetaData();
}
// only list stores on data node
@Override protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
Set<String> onlyDataNodeIds = Sets.newHashSet();
for (String nodeId : nodesIds) {
if (nodes.nodeExists(nodeId) && nodes.get(nodeId).dataNode()) {
onlyDataNodeIds.add(nodeId);
}
}
return onlyDataNodeIds.toArray(new String[onlyDataNodeIds.size()]);
}
@Override protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeStoreFilesMetaData> nodeStoreFilesMetaDatas = Lists.newArrayList();
final List<FailedNodeException> failures = Lists.newArrayList();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeStoreFilesMetaData) { // will also filter out null response for unallocated ones
nodeStoreFilesMetaDatas.add((NodeStoreFilesMetaData) resp);
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
}
}
return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]));
return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
}
@Override protected NodeStoreFilesMetaData nodeOperation(NodeRequest request) throws ElasticSearchException {
@ -129,7 +111,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
}
@Override protected boolean accumulateExceptions() {
return false;
return true;
}
static class Request extends NodesOperationRequest {
@ -162,11 +144,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
public static class NodesStoreFilesMetaData extends NodesOperationResponse<NodeStoreFilesMetaData> {
private FailedNodeException[] failures;
NodesStoreFilesMetaData() {
}
public NodesStoreFilesMetaData(ClusterName clusterName, NodeStoreFilesMetaData[] nodes) {
public NodesStoreFilesMetaData(ClusterName clusterName, NodeStoreFilesMetaData[] nodes, FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
}
public FailedNodeException[] failures() {
return failures;
}
@Override public void readFrom(StreamInput in) throws IOException {

View File

@ -24,9 +24,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public class NodeDisconnectedTransportException extends ConnectTransportException {
public class NodeDisconnectedException extends ConnectTransportException {
public NodeDisconnectedTransportException(DiscoveryNode node, String action) {
public NodeDisconnectedException(DiscoveryNode node, String action) {
super(node, "disconnected", action, null);
}

View File

@ -286,7 +286,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// want handlers to worry about stack overflows
threadPool.execute(new Runnable() {
@Override public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedTransportException(node, holderToNotify.action()));
holderToNotify.handler().handleException(new NodeDisconnectedException(node, holderToNotify.action()));
}
});
}

View File

@ -76,9 +76,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
boolean isRequest = isRequest(status);
if (isRequest) {
TransportRequestHandler handler = handleRequest(event, streamIn, requestId);
String action = handleRequest(event, streamIn, requestId);
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and handler {}, resetting", requestId, handler);
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
buffer.readerIndex(expectedIndexReader);
}
} else {
@ -160,7 +160,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
private TransportRequestHandler handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException {
private String handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException {
final String action = buffer.readUTF();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId);
@ -190,7 +190,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
//noinspection unchecked
handler.messageReceived(streamable, transportChannel);
}
return handler;
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
@ -199,7 +198,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
logger.warn("Actual Exception", e1);
}
}
return null;
return action;
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {