move prefetch allocation to be a NodeAllocation

This commit is contained in:
kimchy 2010-08-23 17:22:44 +03:00
parent edbd586ee5
commit 1461da5b49
15 changed files with 62 additions and 87 deletions

View File

@ -263,15 +263,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (timeout != null) {
timeout.cancel();
}
// do prefetch here so we won't compute md5 and such on the cluster update state...
long prefetchTime = 0;
if (shardsAllocation.preferUnallocatedStrategy() != null) {
long start = System.currentTimeMillis();
shardsAllocation.preferUnallocatedStrategy().prefetch(response.indexMetaData(), clusterService.state().nodes());
prefetchTime = System.currentTimeMillis() - start;
}
final long fPrefetchTime = prefetchTime;
// do the reroute after indices have been created on all the other nodes so we can query them for some info (like shard allocation)
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
@ -287,7 +278,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
logger.info("[{}] created and added to cluster_state, prefetch_took [{}]", request.index, TimeValue.timeValueMillis(fPrefetchTime));
logger.info("[{}] created and added to cluster_state", request.index);
listener.onResponse(response);
}
});

View File

@ -33,22 +33,22 @@ public interface NodeAllocation {
enum Decision {
YES {
@Override boolean allocate() {
@Override public boolean allocate() {
return true;
}},
NO {
@Override boolean allocate() {
@Override public boolean allocate() {
return false;
}},
THROTTLE {
@Override boolean allocate() {
@Override public boolean allocate() {
return false;
}};
abstract boolean allocate();
public abstract boolean allocate();
}
boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes);
boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes);
Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes);
}

View File

@ -53,10 +53,10 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
for (NodeAllocation allocation : allocations) {
changed |= allocation.allocate(routingNodes, nodes);
changed |= allocation.allocate(nodeAllocations, routingNodes, nodes);
}
return changed;
}

View File

@ -39,7 +39,7 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends AbstractComponent i
super(settings);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}

View File

@ -39,7 +39,7 @@ public class SameShardNodeAllocation extends AbstractComponent implements NodeAl
super(settings);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}

View File

@ -42,7 +42,6 @@ public class ShardAllocationModule extends AbstractModule {
@Override protected void configure() {
bind(ShardsAllocation.class).asEagerSingleton();
bind(PreferUnallocatedStrategy.class).to(PreferUnallocatedShardUnassignedStrategy.class).asEagerSingleton();
Multibinder<NodeAllocation> allocationMultibinder = Multibinder.newSetBinder(binder(), NodeAllocation.class);
allocationMultibinder.addBinding().to(SameShardNodeAllocation.class);

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -43,25 +42,17 @@ public class ShardsAllocation extends AbstractComponent {
private final NodeAllocations nodeAllocations;
private final PreferUnallocatedStrategy preferUnallocatedStrategy;
public ShardsAllocation() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public ShardsAllocation(Settings settings) {
this(settings, new NodeAllocations(settings), null);
this(settings, new NodeAllocations(settings));
}
@Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations,
@Nullable PreferUnallocatedStrategy preferUnallocatedStrategy) {
@Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations) {
super(settings);
this.nodeAllocations = nodeAllocations;
this.preferUnallocatedStrategy = preferUnallocatedStrategy;
}
public PreferUnallocatedStrategy preferUnallocatedStrategy() {
return preferUnallocatedStrategy;
}
/**
@ -122,10 +113,7 @@ public class ShardsAllocation extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (routingNodes.hasUnassigned()) {
if (preferUnallocatedStrategy != null) {
changed |= preferUnallocatedStrategy.allocateUnassigned(routingNodes, nodes);
}
changed |= nodeAllocations.allocate(routingNodes, nodes);
changed |= nodeAllocations.allocate(nodeAllocations, routingNodes, nodes);
changed |= allocateUnassigned(routingNodes);
// elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimaries(routingNodes);

View File

@ -38,7 +38,7 @@ public class ThrottlingNodeAllocation extends AbstractComponent implements NodeA
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}

View File

@ -17,16 +17,17 @@
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
package org.elasticsearch.gateway.blobstore;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -34,7 +35,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
@ -42,50 +42,24 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
/**
* @author kimchy (shay.banon)
*/
public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent implements PreferUnallocatedStrategy {
public class BlobReuseExistingNodeAllocation extends AbstractComponent implements NodeAllocation {
private final IndicesService indicesService;
private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData;
private final NodeAllocations nodeAllocations;
@Inject public PreferUnallocatedShardUnassignedStrategy(Settings settings, IndicesService indicesService,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData,
NodeAllocations nodeAllocations) {
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings);
this.indicesService = indicesService;
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
this.nodeAllocations = nodeAllocations;
}
@Override public void prefetch(IndexMetaData index, DiscoveryNodes nodes) {
final CountDownLatch latch = new CountDownLatch(index.numberOfShards());
for (int shardId = 0; shardId < index.numberOfShards(); shardId++) {
transportNodesListShardStoreMetaData.list(new ShardId(index.index(), shardId), false, nodes.dataNodes().keySet(), new ActionListener<TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData>() {
@Override public void onResponse(TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData) {
latch.countDown();
}
@Override public void onFailure(Throwable e) {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
}
public boolean allocateUnassigned(RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
if (nodes.dataNodes().isEmpty()) {
@ -160,7 +134,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");
sb.append(" gateway_files:\n");
for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
sb.append(" [").append(fileInfo.name()).append("]/[" + fileInfo.physicalName() + "], size [").append(new ByteSizeValue(fileInfo.length())).append("]\n");
sb.append(" [").append(fileInfo.name()).append("]/[").append(fileInfo.physicalName()).append("], size [").append(new ByteSizeValue(fileInfo.length())).append("]\n");
}
sb.append(" node_files:\n");
for (StoreFileMetaData md : storeFilesMetaData) {
@ -247,4 +221,8 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
return changed;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
return Decision.YES;
}
}

View File

@ -17,19 +17,21 @@
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
package org.elasticsearch.gateway.blobstore;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
/**
* @author kimchy (shay.banon)
*/
// TODO move this to be a NodeAllocation (once we remove the md5 and make listing fast for Unassigned impl)
public interface PreferUnallocatedStrategy {
public abstract class BlobStoreGatewayModule extends AbstractModule implements PreProcessModule {
void prefetch(IndexMetaData index, DiscoveryNodes nodes);
boolean allocateUnassigned(RoutingNodes routingNodes, DiscoveryNodes nodes);
@Override public void processModule(Module module) {
if (module instanceof ShardAllocationModule) {
((ShardAllocationModule) module).addNodeAllocation(BlobReuseExistingNodeAllocation.class);
}
}
}

View File

@ -19,13 +19,13 @@
package org.elasticsearch.gateway.fs;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.blobstore.BlobStoreGatewayModule;
/**
* @author kimchy (shay.banon)
*/
public class FsGatewayModule extends AbstractModule {
public class FsGatewayModule extends BlobStoreGatewayModule {
@Override protected void configure() {
bind(Gateway.class).to(FsGateway.class).asEagerSingleton();

View File

@ -168,7 +168,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("]\n");
sb.append(" index : files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took[").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("]\n");
sb.append(" : recovered_files [").append(recoveryStatus.index().numberOfRecoveredFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().reusedTotalSize())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("]\n");
sb.append(" : recovered_files [").append(recoveryStatus.index().numberOfRecoveredFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().recoveredTotalSize())).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfReusedFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().reusedTotalSize())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.translog().time())).append("]");
logger.debug(sb.toString());

View File

@ -23,6 +23,9 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.status.IndexShardStatus;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.status.ShardStatus;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.collect.MapBuilder;
@ -256,6 +259,20 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
logger.info("--> checking count");
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(12345l));
logger.info("--> checking reuse / recovery status");
IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().execute().actionGet();
for (IndexShardStatus indexShardStatus : statusResponse.index("test")) {
for (ShardStatus shardStatus : indexShardStatus) {
if (shardStatus.shardRouting().primary()) {
if (fullRecovery) {
assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), equalTo(0l));
} else {
assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), greaterThan(shardStatus.gatewayRecoveryStatus().indexSize().bytes() - 4098 /* segments file */));
}
}
}
}
}
private String mappingSource() {

View File

@ -19,13 +19,13 @@
package org.elasticsearch.gateway.s3;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.blobstore.BlobStoreGatewayModule;
/**
* @author kimchy (shay.banon)
*/
public class S3GatewayModule extends AbstractModule {
public class S3GatewayModule extends BlobStoreGatewayModule {
@Override protected void configure() {
bind(Gateway.class).to(S3Gateway.class).asEagerSingleton();

View File

@ -19,13 +19,13 @@
package org.elasticsearch.gateway.hdfs;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.blobstore.BlobStoreGatewayModule;
/**
* @author kimchy (shay.banon)
*/
public class HdfsGatewayModule extends AbstractModule {
public class HdfsGatewayModule extends BlobStoreGatewayModule {
@Override protected void configure() {
bind(Gateway.class).to(HdfsGateway.class).asEagerSingleton();