do a reuse work prefetching not on the cluster state thread, so it won't block while fetching md5s from other nodes

This commit is contained in:
kimchy 2010-08-20 00:45:17 +03:00
parent a5c5f65da5
commit 4a2e076c6a
8 changed files with 117 additions and 28 deletions

View File

@ -95,6 +95,7 @@
<w>pluggable</w>
<w>plugins</w>
<w>porterstem</w>
<w>prefetch</w>
<w>proc</w>
<w>publishhost</w>
<w>queryparser</w>

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.strategy.PreferUnallocatedShardUnassignedStrategy;
import org.elasticsearch.cluster.routing.strategy.PreferUnallocatedStrategy;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.inject.AbstractModule;
@ -45,7 +46,7 @@ public class ClusterModule extends AbstractModule {
@Override
protected void configure() {
bind(PreferUnallocatedShardUnassignedStrategy.class).asEagerSingleton();
bind(PreferUnallocatedStrategy.class).to(PreferUnallocatedShardUnassignedStrategy.class).asEagerSingleton();
bind(ShardsRoutingStrategy.class).asEagerSingleton();
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();

View File

@ -178,10 +178,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
mappings.put(mapper.type(), mapper.mappingSource());
}
IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings);
final IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings);
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
indexMetaData.putMapping(entry.getKey(), entry.getValue());
indexMetaDataBuilder.putMapping(entry.getKey(), entry.getValue());
}
final IndexMetaData indexMetaData = indexMetaDataBuilder.build();
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
@ -193,14 +194,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size() - 1); // -1 since we added it on the master already
if (counter.get() == 0) {
// no nodes to add to
listener.onResponse(new Response(true));
listener.onResponse(new Response(true, indexMetaData));
} else {
final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
listener.onResponse(new Response(true, indexMetaData));
nodeIndexCreatedAction.remove(this);
}
}
@ -210,7 +211,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
listener.onResponse(new Response(false));
listener.onResponse(new Response(false, indexMetaData));
nodeIndexCreatedAction.remove(nodeIndexCreateListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
@ -262,6 +263,15 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (timeout != null) {
timeout.cancel();
}
// do prefetch here so we won't compute md5 and such on the cluster update state...
long prefetchTime = 0;
if (shardsRoutingStrategy.preferUnallocatedStrategy() != null) {
long start = System.currentTimeMillis();
shardsRoutingStrategy.preferUnallocatedStrategy().prefetch(response.indexMetaData(), clusterService.state().nodes());
prefetchTime = System.currentTimeMillis() - start;
}
final long fPrefetchTime = prefetchTime;
// do the reroute after indices have been created on all the other nodes so we can query them for some info (like shard allocation)
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
@ -277,6 +287,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
logger.info("[{}] created and added to cluster_state, prefetch_took [{}]", request.index, TimeValue.timeValueMillis(fPrefetchTime));
listener.onResponse(response);
}
});
@ -342,13 +353,19 @@ public class MetaDataCreateIndexService extends AbstractComponent {
public static class Response {
private final boolean acknowledged;
private final IndexMetaData indexMetaData;
public Response(boolean acknowledged) {
public Response(boolean acknowledged, IndexMetaData indexMetaData) {
this.acknowledged = acknowledged;
this.indexMetaData = indexMetaData;
}
public boolean acknowledged() {
return acknowledged;
}
public IndexMetaData indexMetaData() {
return indexMetaData;
}
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
@ -27,49 +29,67 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
/**
* @author kimchy (shay.banon)
*/
public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent {
public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent implements PreferUnallocatedStrategy {
private final ThreadPool threadPool;
private final IndicesService indicesService;
private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData;
@Inject public PreferUnallocatedShardUnassignedStrategy(Settings settings, IndicesService indicesService,
@Inject public PreferUnallocatedShardUnassignedStrategy(Settings settings, ThreadPool threadPool, IndicesService indicesService,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings);
this.threadPool = threadPool;
this.indicesService = indicesService;
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
}
@Override public void prefetch(IndexMetaData index, DiscoveryNodes nodes) {
final CountDownLatch latch = new CountDownLatch(index.numberOfShards());
for (int shardId = 0; shardId < index.numberOfShards(); shardId++) {
transportNodesListShardStoreMetaData.list(new ShardId(index.index(), shardId), false, nodes.dataNodes().keySet(), new ActionListener<TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData>() {
@Override public void onResponse(TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData) {
latch.countDown();
}
@Override public void onFailure(Throwable e) {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
}
public boolean allocateUnassigned(RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
Set<String> nodesIds = Sets.newHashSet();
for (DiscoveryNode node : nodes) {
if (node.dataNode()) {
nodesIds.add(node.id());
}
}
if (nodesIds.isEmpty()) {
if (nodes.dataNodes().isEmpty()) {
return changed;
}
@ -93,7 +113,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
}
}
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodesIds.toArray(new String[nodesIds.size()])).actionGet();
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet()).actionGet();
if (logger.isDebugEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
/**
* @author kimchy (shay.banon)
*/
public interface PreferUnallocatedStrategy {
void prefetch(IndexMetaData index, DiscoveryNodes nodes);
boolean allocateUnassigned(RoutingNodes routingNodes, DiscoveryNodes nodes);
}

View File

@ -41,15 +41,19 @@ import static org.elasticsearch.common.collect.Sets.*;
*/
public class ShardsRoutingStrategy extends AbstractComponent {
private final PreferUnallocatedShardUnassignedStrategy preferUnallocatedShardUnassignedStrategy;
private final PreferUnallocatedStrategy preferUnallocatedStrategy;
public ShardsRoutingStrategy() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, null);
}
@Inject public ShardsRoutingStrategy(Settings settings, @Nullable PreferUnallocatedShardUnassignedStrategy preferUnallocatedShardUnassignedStrategy) {
@Inject public ShardsRoutingStrategy(Settings settings, @Nullable PreferUnallocatedStrategy preferUnallocatedStrategy) {
super(settings);
this.preferUnallocatedShardUnassignedStrategy = preferUnallocatedShardUnassignedStrategy;
this.preferUnallocatedStrategy = preferUnallocatedStrategy;
}
public PreferUnallocatedStrategy preferUnallocatedStrategy() {
return preferUnallocatedStrategy;
}
/**
@ -110,8 +114,8 @@ public class ShardsRoutingStrategy extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (routingNodes.hasUnassigned()) {
if (preferUnallocatedShardUnassignedStrategy != null) {
changed |= preferUnallocatedShardUnassignedStrategy.allocateUnassigned(routingNodes, nodes);
if (preferUnallocatedStrategy != null) {
changed |= preferUnallocatedStrategy.allocateUnassigned(routingNodes, nodes);
}
changed |= allocateUnassigned(routingNodes);
// elect primaries again, in case this is needed with unassigned allocation

View File

@ -427,7 +427,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
indexShard.start();
return;
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e);
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover translog, can't read current index version", e);
}
if (!translogContainer.blobExists("translog-" + translogId)) {
// no recovery file found, start the shard and bail
@ -513,7 +513,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
indexShard.performRecoveryFinalization(true);
} catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", e);
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover translog", e);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.store;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
@ -40,6 +41,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -55,10 +57,14 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
this.indicesService = indicesService;
}
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, String[] nodesIds) {
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds) {
return execute(new Request(shardId, onlyUnallocated, nodesIds));
}
public void list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds, ActionListener<NodesStoreFilesMetaData> listener) {
execute(new Request(shardId, onlyUnallocated, nodesIds), listener);
}
@Override protected String transportAction() {
return "/cluster/nodes/indices/shard/store";
}
@ -123,6 +129,12 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
public Request() {
}
public Request(ShardId shardId, boolean unallocated, Set<String> nodesIds) {
super(nodesIds.toArray(new String[nodesIds.size()]));
this.shardId = shardId;
this.unallocated = unallocated;
}
public Request(ShardId shardId, boolean unallocated, String... nodesIds) {
super(nodesIds);
this.shardId = shardId;