change index creation / deletion logic not to wait for it to be created on other nodes since now, they might not be created...
This commit is contained in:
parent
69b8b0f437
commit
a4eea0aeaa
|
@ -21,9 +21,7 @@ package org.elasticsearch.cluster.metadata;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
|
@ -39,8 +37,6 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.timer.Timeout;
|
||||
import org.elasticsearch.common.timer.TimerTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -52,15 +48,12 @@ import org.elasticsearch.indices.IndexAlreadyExistsException;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.river.RiverIndexName;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
|
||||
|
@ -74,31 +67,25 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
private final Environment environment;
|
||||
|
||||
private final TimerService timerService;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
private final NodeIndexCreatedAction nodeIndexCreatedAction;
|
||||
|
||||
private final String riverIndexName;
|
||||
|
||||
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, IndicesService indicesService,
|
||||
ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, @RiverIndexName String riverIndexName) {
|
||||
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService,
|
||||
ShardsAllocation shardsAllocation, @RiverIndexName String riverIndexName) {
|
||||
super(settings);
|
||||
this.environment = environment;
|
||||
this.timerService = timerService;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
|
||||
this.riverIndexName = riverIndexName;
|
||||
}
|
||||
|
||||
public void createIndex(final Request request, final Listener userListener) {
|
||||
public void createIndex(final Request request, final Listener listener) {
|
||||
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
|
||||
for (Map.Entry<String, String> entry : request.settings.getAsMap().entrySet()) {
|
||||
if (!entry.getKey().startsWith("index.")) {
|
||||
|
@ -109,9 +96,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
request.settings(updatedSettingsBuilder.build());
|
||||
|
||||
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
final CreateIndexListener listener = new CreateIndexListener(request, userListener);
|
||||
try {
|
||||
if (currentState.routingTable().hasIndex(request.index)) {
|
||||
listener.onFailure(new IndexAlreadyExistsException(new Index(request.index)));
|
||||
|
@ -218,33 +204,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size() - 1); // -1 since we added it on the master already
|
||||
if (counter.get() == 0) {
|
||||
// no nodes to add to
|
||||
listener.onResponse(new Response(true, indexMetaData));
|
||||
} else {
|
||||
|
||||
final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() {
|
||||
@Override public void onNodeIndexCreated(String index, String nodeId) {
|
||||
if (index.equals(request.index)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true, indexMetaData));
|
||||
nodeIndexCreatedAction.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
nodeIndexCreatedAction.add(nodeIndexCreateListener);
|
||||
|
||||
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
|
||||
@Override public void run(Timeout timeout) throws Exception {
|
||||
listener.onResponse(new Response(false, indexMetaData));
|
||||
nodeIndexCreatedAction.remove(nodeIndexCreateListener);
|
||||
}
|
||||
}, request.timeout, TimerService.ExecutionType.THREADED);
|
||||
listener.timeout = timeoutTask;
|
||||
}
|
||||
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
if (!request.blocks.isEmpty()) {
|
||||
for (ClusterBlock block : request.blocks) {
|
||||
|
@ -258,6 +217,27 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
|
||||
for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) {
|
||||
routingTableBuilder.add(indexRoutingTable);
|
||||
}
|
||||
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
|
||||
.initializeEmpty(currentState.metaData().index(request.index));
|
||||
routingTableBuilder.add(indexRoutingBuilder);
|
||||
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
logger.info("[{}] created and added to cluster_state", request.index);
|
||||
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -277,58 +257,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
class CreateIndexListener implements Listener {
|
||||
|
||||
private AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final Request request;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
volatile Timeout timeout;
|
||||
|
||||
private CreateIndexListener(Request request, Listener listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
// do the reroute after indices have been created on all the other nodes so we can query them for some info (like shard allocation)
|
||||
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
|
||||
for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) {
|
||||
routingTableBuilder.add(indexRoutingTable);
|
||||
}
|
||||
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
|
||||
.initializeEmpty(currentState.metaData().index(request.index));
|
||||
routingTableBuilder.add(indexRoutingBuilder);
|
||||
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
logger.info("[{}] created and added to cluster_state", request.index);
|
||||
listener.onResponse(response);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
|
|
@ -21,8 +21,7 @@ package org.elasticsearch.cluster.metadata;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
|
@ -31,15 +30,9 @@ import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.timer.Timeout;
|
||||
import org.elasticsearch.common.timer.TimerTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||
|
@ -49,27 +42,19 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
|
|||
*/
|
||||
public class MetaDataDeleteIndexService extends AbstractComponent {
|
||||
|
||||
private final TimerService timerService;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsAllocation shardsAllocation,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction) {
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) {
|
||||
super(settings);
|
||||
this.timerService = timerService;
|
||||
this.clusterService = clusterService;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
}
|
||||
|
||||
public void deleteIndex(final Request request, final Listener userListener) {
|
||||
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() {
|
||||
public void deleteIndex(final Request request, final Listener listener) {
|
||||
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
final DeleteIndexListener listener = new DeleteIndexListener(request, userListener);
|
||||
try {
|
||||
RoutingTable routingTable = currentState.routingTable();
|
||||
if (!routingTable.hasIndex(request.index)) {
|
||||
|
@ -95,73 +80,19 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
|
||||
|
||||
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
|
||||
@Override public void onNodeIndexDeleted(String index, String nodeId) {
|
||||
if (index.equals(request.index)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeIndexDeletedAction.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
|
||||
|
||||
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
|
||||
@Override public void run(Timeout timeout) throws Exception {
|
||||
listener.onResponse(new Response(false));
|
||||
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
|
||||
}
|
||||
}, request.timeout, TimerService.ExecutionType.THREADED);
|
||||
listener.timeout = timeoutTask;
|
||||
|
||||
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class DeleteIndexListener implements Listener {
|
||||
|
||||
private AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final Request request;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
volatile Timeout timeout;
|
||||
|
||||
private DeleteIndexListener(Request request, Listener listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
|
|
@ -22,14 +22,12 @@ package org.elasticsearch.cluster.metadata;
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
|
||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.timer.Timeout;
|
||||
import org.elasticsearch.common.timer.TimerTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
|
@ -39,19 +37,14 @@ import org.elasticsearch.index.mapper.MergeMappingException;
|
|||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||
import static org.elasticsearch.common.collect.Maps.*;
|
||||
import static org.elasticsearch.common.collect.Sets.*;
|
||||
import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*;
|
||||
|
||||
/**
|
||||
|
@ -63,17 +56,10 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final TimerService timerService;
|
||||
|
||||
private final NodeMappingCreatedAction nodeMappingCreatedAction;
|
||||
|
||||
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService,
|
||||
TimerService timerService, NodeMappingCreatedAction nodeMappingCreatedAction) {
|
||||
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.timerService = timerService;
|
||||
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
|
||||
}
|
||||
|
||||
public void updateMapping(final String index, final String type, final CompressedString mappingSource) throws IOException {
|
||||
|
@ -114,7 +100,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void removeMapping(final RemoveRequest request) {
|
||||
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
if (request.indices.length == 0) {
|
||||
throw new IndexMissingException(new Index("_all"));
|
||||
|
@ -130,13 +116,16 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
return ClusterState.builder().state(currentState).metaData(builder).build();
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
// TODO add a listener here!
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void putMapping(final PutRequest request, final Listener userListener) {
|
||||
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ClusterStateUpdateTask() {
|
||||
public void putMapping(final PutRequest request, final Listener listener) {
|
||||
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
final PutMappingListener listener = new PutMappingListener(request, userListener);
|
||||
try {
|
||||
if (request.indices.length == 0) {
|
||||
throw new IndexMissingException(new Index("_all"));
|
||||
|
@ -181,7 +170,6 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
|
||||
final Map<String, Tuple<String, CompressedString>> mappings = newHashMap();
|
||||
int expectedReplies = 0;
|
||||
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
|
||||
String index = entry.getKey();
|
||||
// do the actual merge here on the master, and update the mapping source
|
||||
|
@ -197,7 +185,6 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
if (existingSource.equals(updatedSource)) {
|
||||
// same source, no changes, ignore it
|
||||
} else {
|
||||
expectedReplies += (currentState.nodes().size() - 1); // for this index, on update, don't include the master, since we update it already
|
||||
// use the merged mapping source
|
||||
mappings.put(index, new Tuple<String, CompressedString>(existingMapper.type(), updatedSource));
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -207,7 +194,6 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
expectedReplies += currentState.nodes().size();
|
||||
CompressedString newSource = newMapper.mappingSource();
|
||||
mappings.put(index, new Tuple<String, CompressedString>(newMapper.type(), newSource));
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -236,76 +222,19 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
if (expectedReplies == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
} else {
|
||||
final AtomicInteger counter = new AtomicInteger(expectedReplies);
|
||||
final Set<String> indicesSet = newHashSet(request.indices);
|
||||
final NodeMappingCreatedAction.Listener nodeMappingListener = new NodeMappingCreatedAction.Listener() {
|
||||
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
|
||||
if (indicesSet.contains(response.index()) && response.type().equals(request.mappingType)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeMappingCreatedAction.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
nodeMappingCreatedAction.add(nodeMappingListener);
|
||||
|
||||
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
|
||||
@Override public void run(Timeout timeout) throws Exception {
|
||||
listener.onResponse(new Response(false));
|
||||
nodeMappingCreatedAction.remove(nodeMappingListener);
|
||||
}
|
||||
}, request.timeout, TimerService.ExecutionType.THREADED);
|
||||
listener.timeout = timeoutTask;
|
||||
}
|
||||
|
||||
|
||||
return newClusterStateBuilder().state(currentState).metaData(builder).build();
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class PutMappingListener implements Listener {
|
||||
|
||||
private AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final PutRequest request;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
volatile Timeout timeout;
|
||||
|
||||
private PutMappingListener(PutRequest request, Listener listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.cluster.metadata;
|
|||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -55,7 +55,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
final Settings settings = updatedSettingsBuilder.build();
|
||||
clusterService.submitStateUpdateTask("update-settings", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("update-settings", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
try {
|
||||
boolean changed = false;
|
||||
|
@ -77,14 +77,16 @@ public class MetaDataUpdateSettingsService extends AbstractComponent {
|
|||
|
||||
logger.info("Updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
|
||||
|
||||
listener.onSuccess();
|
||||
|
||||
return ClusterState.builder().state(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder).build();
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
listener.onSuccess();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -37,10 +37,8 @@ import org.elasticsearch.common.trove.TObjectIntIterator;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.service.InternalIndexService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
||||
|
@ -56,8 +54,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
|||
*/
|
||||
public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final TransportNodesListGatewayStartedShards listGatewayStartedShards;
|
||||
|
||||
private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
|
||||
|
@ -68,10 +64,9 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
|||
|
||||
private final String initialShards;
|
||||
|
||||
@Inject public LocalGatewayNodeAllocation(Settings settings, IndicesService indicesService,
|
||||
@Inject public LocalGatewayNodeAllocation(Settings settings,
|
||||
TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) {
|
||||
super(settings);
|
||||
this.indicesService = indicesService;
|
||||
this.listGatewayStartedShards = listGatewayStartedShards;
|
||||
this.listShardStoreMetaData = listShardStoreMetaData;
|
||||
|
||||
|
@ -250,14 +245,6 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
|||
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
MutableShardRouting shard = unassignedIterator.next();
|
||||
InternalIndexService indexService = (InternalIndexService) indicesService.indexService(shard.index());
|
||||
if (indexService == null) {
|
||||
continue;
|
||||
}
|
||||
// if the store is not persistent, it makes no sense to test for special allocation
|
||||
if (!indexService.store().persistent()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
|
||||
boolean canBeAllocatedToAtLeastOneNode = false;
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.index.shard.recovery.RecoveryTarget;
|
|||
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
||||
import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
|
||||
/**
|
||||
|
@ -49,7 +48,6 @@ public class IndicesModule extends AbstractModule {
|
|||
bind(RecoverySource.class).asEagerSingleton();
|
||||
|
||||
bind(IndicesClusterStateService.class).asEagerSingleton();
|
||||
bind(IndicesMemoryCleaner.class).asEagerSingleton();
|
||||
bind(IndexingMemoryBufferController.class).asEagerSingleton();
|
||||
bind(IndicesAnalysisService.class).asEagerSingleton();
|
||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||
|
|
|
@ -1,248 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.memory;
|
||||
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.common.collect.Sets.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class IndicesMemoryCleaner extends AbstractComponent {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
@Inject public IndicesMemoryCleaner(Settings settings, IndicesService indicesService) {
|
||||
super(settings);
|
||||
this.indicesService = indicesService;
|
||||
}
|
||||
|
||||
public TranslogCleanResult cleanTranslog(int translogNumberOfOperationsThreshold) {
|
||||
int totalShards = 0;
|
||||
int cleanedShards = 0;
|
||||
long cleaned = 0;
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
if (indexShard.state() != IndexShardState.STARTED) {
|
||||
continue;
|
||||
}
|
||||
totalShards++;
|
||||
Translog translog = ((InternalIndexShard) indexShard).translog();
|
||||
if (translog.size() > translogNumberOfOperationsThreshold) {
|
||||
try {
|
||||
indexShard.flush(new Engine.Flush());
|
||||
cleanedShards++;
|
||||
cleaned = indexShard.estimateFlushableMemorySize().bytes();
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore this exception, we are not allowed to perform flush
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return new TranslogCleanResult(totalShards, cleanedShards, new ByteSizeValue(cleaned, ByteSizeUnit.BYTES));
|
||||
}
|
||||
|
||||
public void cacheClearUnreferenced() {
|
||||
for (IndexService indexService : indicesService) {
|
||||
indexService.cache().clearUnreferenced();
|
||||
}
|
||||
}
|
||||
|
||||
public void cacheClear() {
|
||||
for (IndexService indexService : indicesService) {
|
||||
indexService.cache().clear();
|
||||
}
|
||||
}
|
||||
|
||||
public void fullMemoryClean() {
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
try {
|
||||
indexShard.flush(new Engine.Flush().full(true));
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore this one, its temporal
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
// ignore this one as well
|
||||
} catch (Exception e) {
|
||||
logger.warn(indexShard.shardId() + ": Failed to force flush in order to clean memory", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void forceCleanMemory(Set<ShardId> shardsToIgnore) {
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
if (!shardsToIgnore.contains(indexShard.shardId())) {
|
||||
try {
|
||||
indexShard.flush(new Engine.Flush().full(false));
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore this one, its temporal
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
// ignore this one as well
|
||||
} catch (Exception e) {
|
||||
logger.warn(indexShard.shardId() + ": Failed to force flush in order to clean memory", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if memory needs to be cleaned and cleans it. Returns the amount of memory cleaned.
|
||||
*/
|
||||
public MemoryCleanResult cleanMemory(long memoryToClean, ByteSizeValue minimumFlushableSizeToClean) {
|
||||
int totalShards = 0;
|
||||
long estimatedFlushableSize = 0;
|
||||
ArrayList<Tuple<ByteSizeValue, IndexShard>> shards = new ArrayList<Tuple<ByteSizeValue, IndexShard>>();
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
if (indexShard.state() != IndexShardState.STARTED) {
|
||||
continue;
|
||||
}
|
||||
totalShards++;
|
||||
ByteSizeValue estimatedSize = indexShard.estimateFlushableMemorySize();
|
||||
estimatedFlushableSize += estimatedSize.bytes();
|
||||
if (estimatedSize != null) {
|
||||
shards.add(new Tuple<ByteSizeValue, IndexShard>(estimatedSize, indexShard));
|
||||
}
|
||||
}
|
||||
}
|
||||
Collections.sort(shards, new Comparator<Tuple<ByteSizeValue, IndexShard>>() {
|
||||
@Override public int compare(Tuple<ByteSizeValue, IndexShard> o1, Tuple<ByteSizeValue, IndexShard> o2) {
|
||||
return (int) (o1.v1().bytes() - o2.v1().bytes());
|
||||
}
|
||||
});
|
||||
int cleanedShards = 0;
|
||||
long cleaned = 0;
|
||||
Set<ShardId> shardsCleaned = newHashSet();
|
||||
for (Tuple<ByteSizeValue, IndexShard> tuple : shards) {
|
||||
if (tuple.v1().bytes() < minimumFlushableSizeToClean.bytes()) {
|
||||
// we passed the minimum threshold, don't flush
|
||||
break;
|
||||
}
|
||||
try {
|
||||
tuple.v2().flush(new Engine.Flush());
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore this one, its temporal
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
// ignore this one as well
|
||||
} catch (Exception e) {
|
||||
logger.warn(tuple.v2().shardId() + ": Failed to flush in order to clean memory", e);
|
||||
}
|
||||
shardsCleaned.add(tuple.v2().shardId());
|
||||
cleanedShards++;
|
||||
cleaned += tuple.v1().bytes();
|
||||
if (cleaned > memoryToClean) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return new MemoryCleanResult(totalShards, cleanedShards, new ByteSizeValue(estimatedFlushableSize), new ByteSizeValue(cleaned), shardsCleaned);
|
||||
}
|
||||
|
||||
public static class TranslogCleanResult {
|
||||
private final int totalShards;
|
||||
private final int cleanedShards;
|
||||
private final ByteSizeValue cleaned;
|
||||
|
||||
public TranslogCleanResult(int totalShards, int cleanedShards, ByteSizeValue cleaned) {
|
||||
this.totalShards = totalShards;
|
||||
this.cleanedShards = cleanedShards;
|
||||
this.cleaned = cleaned;
|
||||
}
|
||||
|
||||
public int totalShards() {
|
||||
return totalShards;
|
||||
}
|
||||
|
||||
public int cleanedShards() {
|
||||
return cleanedShards;
|
||||
}
|
||||
|
||||
public ByteSizeValue cleaned() {
|
||||
return cleaned;
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "cleaned [" + cleaned + "], cleaned_shards [" + cleanedShards + "], total_shards [" + totalShards + "]";
|
||||
}
|
||||
}
|
||||
|
||||
public static class MemoryCleanResult {
|
||||
private final int totalShards;
|
||||
private final int cleanedShards;
|
||||
private final ByteSizeValue estimatedFlushableSize;
|
||||
private final ByteSizeValue cleaned;
|
||||
private final Set<ShardId> shardsCleaned;
|
||||
|
||||
public MemoryCleanResult(int totalShards, int cleanedShards, ByteSizeValue estimatedFlushableSize, ByteSizeValue cleaned, Set<ShardId> shardsCleaned) {
|
||||
this.totalShards = totalShards;
|
||||
this.cleanedShards = cleanedShards;
|
||||
this.estimatedFlushableSize = estimatedFlushableSize;
|
||||
this.cleaned = cleaned;
|
||||
this.shardsCleaned = shardsCleaned;
|
||||
}
|
||||
|
||||
public int totalShards() {
|
||||
return totalShards;
|
||||
}
|
||||
|
||||
public int cleanedShards() {
|
||||
return cleanedShards;
|
||||
}
|
||||
|
||||
public ByteSizeValue estimatedFlushableSize() {
|
||||
return estimatedFlushableSize;
|
||||
}
|
||||
|
||||
public ByteSizeValue cleaned() {
|
||||
return cleaned;
|
||||
}
|
||||
|
||||
public Set<ShardId> shardsCleaned() {
|
||||
return this.shardsCleaned;
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "cleaned [" + cleaned + "], estimated_flushable_size [" + estimatedFlushableSize + "], cleaned_shards [" + cleanedShards + "], total_shards [" + totalShards + "]";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -172,7 +172,8 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
|
|||
DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().prepareDelete("test").execute().actionGet();
|
||||
assertThat(deleteIndexResponse.acknowledged(), equalTo(true));
|
||||
|
||||
Thread.sleep(200);
|
||||
Thread.sleep(500); // wait till the cluster state gets published
|
||||
|
||||
clusterState2 = clusterService2.state();
|
||||
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
|
||||
assertThat(routingNodeEntry2, nullValue());
|
||||
|
@ -303,6 +304,8 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
|
|||
DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet();
|
||||
assertThat(deleteIndexResponse.acknowledged(), equalTo(true));
|
||||
|
||||
Thread.sleep(500); // wait till the cluster state gets published
|
||||
|
||||
clusterState2 = clusterService2.state();
|
||||
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
|
||||
assertThat(routingNodeEntry2, nullValue());
|
||||
|
|
Loading…
Reference in New Issue