diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index 63b7242d8c6..2c8c693ee1d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -26,23 +26,26 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasAction; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + /** * @author kimchy (shay.banon) */ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationAction { - private final MetaDataService metaDataService; + private final MetaDataIndexAliasesService indexAliasesService; @Inject public TransportIndicesAliasesAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, MetaDataService metaDataService) { + ThreadPool threadPool, MetaDataIndexAliasesService indexAliasesService) { super(settings, transportService, clusterService, threadPool); - this.metaDataService = metaDataService; + this.indexAliasesService = indexAliasesService; } @Override protected String transportAction() { @@ -64,7 +67,35 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA } @Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request, ClusterState state) throws ElasticSearchException { - MetaDataService.IndicesAliasesResult indicesAliasesResult = metaDataService.indicesAliases(request.aliasActions()); - return new IndicesAliasesResponse(); + final AtomicReference responseRef = new AtomicReference(); + final AtomicReference failureRef = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()])), new MetaDataIndexAliasesService.Listener() { + @Override public void onResponse(MetaDataIndexAliasesService.Response response) { + responseRef.set(new IndicesAliasesResponse()); + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failureRef.set(t); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + failureRef.set(e); + } + + if (failureRef.get() != null) { + if (failureRef.get() instanceof ElasticSearchException) { + throw (ElasticSearchException) failureRef.get(); + } else { + throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get()); + } + } + + return responseRef.get(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 30fedfa73be..c6a1b91b61e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -25,12 +25,15 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + /** * Create index action. * @@ -38,12 +41,12 @@ import org.elasticsearch.transport.TransportService; */ public class TransportCreateIndexAction extends TransportMasterNodeOperationAction { - private final MetaDataService metaDataService; + private final MetaDataCreateIndexService createIndexService; @Inject public TransportCreateIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, MetaDataService metaDataService) { + ThreadPool threadPool, MetaDataCreateIndexService createIndexService) { super(settings, transportService, clusterService, threadPool); - this.metaDataService = metaDataService; + this.createIndexService = createIndexService; } @Override protected String transportAction() { @@ -67,7 +70,36 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi if (cause.length() == 0) { cause = "api"; } - MetaDataService.CreateIndexResult createIndexResult = metaDataService.createIndex(cause, request.index(), request.settings(), request.mappings(), request.timeout()); - return new CreateIndexResponse(createIndexResult.acknowledged()); + + final AtomicReference responseRef = new AtomicReference(); + final AtomicReference failureRef = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() { + @Override public void onResponse(MetaDataCreateIndexService.Response response) { + responseRef.set(new CreateIndexResponse(response.acknowledged())); + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failureRef.set(t); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + failureRef.set(e); + } + + if (failureRef.get() != null) { + if (failureRef.get() instanceof ElasticSearchException) { + throw (ElasticSearchException) failureRef.get(); + } else { + throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get()); + } + } + + return responseRef.get(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 4b25ad214b7..cb1f0d18fbd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -25,12 +25,15 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + /** * Delete index action. * @@ -38,12 +41,12 @@ import org.elasticsearch.transport.TransportService; */ public class TransportDeleteIndexAction extends TransportMasterNodeOperationAction { - private final MetaDataService metaDataService; + private final MetaDataDeleteIndexService deleteIndexService; @Inject public TransportDeleteIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, MetaDataService metaDataService) { + ThreadPool threadPool, MetaDataDeleteIndexService deleteIndexService) { super(settings, transportService, clusterService, threadPool); - this.metaDataService = metaDataService; + this.deleteIndexService = deleteIndexService; } @Override protected String transportAction() { @@ -63,7 +66,35 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi } @Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, ClusterState state) throws ElasticSearchException { - MetaDataService.DeleteIndexResult deleteIndexResult = metaDataService.deleteIndex(request.index(), request.timeout()); - return new DeleteIndexResponse(deleteIndexResult.acknowledged()); + final AtomicReference responseRef = new AtomicReference(); + final AtomicReference failureRef = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(request.index()).timeout(request.timeout()), new MetaDataDeleteIndexService.Listener() { + @Override public void onResponse(MetaDataDeleteIndexService.Response response) { + responseRef.set(new DeleteIndexResponse(response.acknowledged())); + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failureRef.set(t); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + failureRef.set(e); + } + + if (failureRef.get() != null) { + if (failureRef.get() instanceof ElasticSearchException) { + throw (ElasticSearchException) failureRef.get(); + } else { + throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get()); + } + } + + return responseRef.get(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 18f0e7aa930..fdf89282b07 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -25,12 +25,15 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + /** * Put mapping action. * @@ -38,12 +41,12 @@ import org.elasticsearch.transport.TransportService; */ public class TransportPutMappingAction extends TransportMasterNodeOperationAction { - private final MetaDataService metaDataService; + private final MetaDataMappingService metaDataMappingService; @Inject public TransportPutMappingAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, MetaDataService metaDataService) { + ThreadPool threadPool, MetaDataMappingService metaDataMappingService) { super(settings, transportService, clusterService, threadPool); - this.metaDataService = metaDataService; + this.metaDataMappingService = metaDataMappingService; } @@ -75,7 +78,35 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio request.indices(clusterState.metaData().concreteIndices(request.indices())); final String[] indices = request.indices(); - MetaDataService.PutMappingResult result = metaDataService.putMapping(indices, request.type(), request.source(), request.ignoreConflicts(), request.timeout()); - return new PutMappingResponse(result.acknowledged()); + final AtomicReference responseRef = new AtomicReference(); + final AtomicReference failureRef = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + metaDataMappingService.putMapping(new MetaDataMappingService.Request(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()), new MetaDataMappingService.Listener() { + @Override public void onResponse(MetaDataMappingService.Response response) { + responseRef.set(new PutMappingResponse(response.acknowledged())); + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failureRef.set(t); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + failureRef.set(e); + } + + if (failureRef.get() != null) { + if (failureRef.get() instanceof ElasticSearchException) { + throw (ElasticSearchException) failureRef.get(); + } else { + throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get()); + } + } + + return responseRef.get(); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java index f6b6a384779..0dffbe4d68e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -24,7 +24,10 @@ import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; +import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService; +import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.strategy.PreferUnallocatedShardUnassignedStrategy; import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy; @@ -33,7 +36,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class ClusterModule extends AbstractModule { @@ -49,7 +52,11 @@ public class ClusterModule extends AbstractModule { bind(ShardsRoutingStrategy.class).asEagerSingleton(); bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton(); - bind(MetaDataService.class).asEagerSingleton(); + bind(MetaDataCreateIndexService.class).asEagerSingleton(); + bind(MetaDataDeleteIndexService.class).asEagerSingleton(); + bind(MetaDataMappingService.class).asEagerSingleton(); + bind(MetaDataIndexAliasesService.class).asEagerSingleton(); + bind(RoutingService.class).asEagerSingleton(); bind(ShardStateAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index a90cbcf020a..50c7fc18ea3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -40,16 +40,16 @@ import java.io.IOException; * Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated * in the cluster state meta data (and broadcast to all members). * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class MappingUpdatedAction extends TransportMasterNodeOperationAction { - private final MetaDataService metaDataService; + private final MetaDataMappingService metaDataMappingService; @Inject public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - MetaDataService metaDataService) { + MetaDataMappingService metaDataMappingService) { super(settings, transportService, clusterService, threadPool); - this.metaDataService = metaDataService; + this.metaDataMappingService = metaDataMappingService; } @Override protected String transportAction() { @@ -65,7 +65,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction mappings = Maps.newHashMap(); + File mappingsDir = new File(environment.configFile(), "mappings"); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + File defaultMappingsDir = new File(mappingsDir, "_default"); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + addMappings(mappings, defaultMappingsDir); + } + File indexMappingsDir = new File(mappingsDir, request.index); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + addMappings(mappings, indexMappingsDir); + } + } + // TODO add basic mapping validation + + // put this last so index level mappings can override default mappings + mappings.putAll(request.mappings); + + ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(request.settings); + if (request.settings.get(SETTING_NUMBER_OF_SHARDS) == null) { + indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); + } + if (request.settings.get(SETTING_NUMBER_OF_REPLICAS) == null) { + indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); + } + Settings actualIndexSettings = indexSettingsBuilder.build(); + + IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings); + for (Map.Entry entry : mappings.entrySet()) { + indexMetaData.putMapping(entry.getKey(), entry.getValue()); + } + MetaData newMetaData = newMetaDataBuilder() + .metaData(currentState.metaData()) + .put(indexMetaData) + .build(); + + logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); + + final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); + + final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() { + @Override public void onNodeIndexCreated(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true)); + nodeIndexCreatedAction.remove(this); + } + } + } + }; + nodeIndexCreatedAction.add(nodeIndexCreateListener); + + Timeout timeoutTask = timerService.newTimeout(new TimerTask() { + @Override public void run(Timeout timeout) throws Exception { + listener.onResponse(new Response(false)); + nodeIndexCreatedAction.remove(nodeIndexCreateListener); + } + }, request.timeout, TimerService.ExecutionType.THREADED); + listener.timeout = timeoutTask; + + return newClusterStateBuilder().state(currentState).metaData(newMetaData).build(); + } catch (Exception e) { + listener.onFailure(e); + return currentState; + } + } + }); + } + + private void addMappings(Map mappings, File mappingsDir) { + File[] mappingsFiles = mappingsDir.listFiles(); + for (File mappingFile : mappingsFiles) { + String fileNameNoSuffix = mappingFile.getName().substring(0, mappingFile.getName().lastIndexOf('.')); + if (mappings.containsKey(fileNameNoSuffix)) { + // if we have the mapping defined, ignore it + continue; + } + try { + mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile))); + } catch (IOException e) { + logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e); + } + } + } + + class CreateIndexListener implements Listener { + + private AtomicBoolean notified = new AtomicBoolean(); + + private final Request request; + + private final Listener listener; + + volatile Timeout timeout; + + private CreateIndexListener(Request request, Listener listener) { + this.request = request; + this.listener = listener; + } + + @Override public void onResponse(final Response response) { + if (notified.compareAndSet(false, true)) { + if (timeout != null) { + timeout.cancel(); + } + // do the reroute after indices have been created on all the other nodes so we can query them for some info (like shard allocation) + clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); + for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { + routingTableBuilder.add(indexRoutingTable); + } + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) + .initializeEmpty(currentState.metaData().index(request.index)); + routingTableBuilder.add(indexRoutingBuilder); + RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); + return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + listener.onResponse(response); + } + }); + } + } + + @Override public void onFailure(Throwable t) { + if (notified.compareAndSet(false, true)) { + if (timeout != null) { + timeout.cancel(); + } + listener.onFailure(t); + } + } + } + + public static interface Listener { + + void onResponse(Response response); + + void onFailure(Throwable t); + } + + public static class Request { + + final String cause; + + final String index; + + Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; + + Map mappings = Maps.newHashMap(); + + TimeValue timeout = TimeValue.timeValueSeconds(5); + + public Request(String cause, String index) { + this.cause = cause; + this.index = index; + } + + public Request settings(Settings settings) { + this.settings = settings; + return this; + } + + public Request mappings(Map mappings) { + this.mappings.putAll(mappings); + return this; + } + + public Request timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + } + + public static class Response { + private final boolean acknowledged; + + public Response(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + public boolean acknowledged() { + return acknowledged; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java new file mode 100644 index 00000000000..23b149e1982 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.timer.Timeout; +import org.elasticsearch.common.timer.TimerTask; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.timer.TimerService; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; + +/** + * @author kimchy (shay.banon) + */ +public class MetaDataDeleteIndexService extends AbstractComponent { + + private final TimerService timerService; + + private final ClusterService clusterService; + + private final ShardsRoutingStrategy shardsRoutingStrategy; + + private final NodeIndexDeletedAction nodeIndexDeletedAction; + + @Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy, + NodeIndexDeletedAction nodeIndexDeletedAction) { + super(settings); + this.timerService = timerService; + this.clusterService = clusterService; + this.shardsRoutingStrategy = shardsRoutingStrategy; + this.nodeIndexDeletedAction = nodeIndexDeletedAction; + } + + public void deleteIndex(final Request request, final Listener userListener) { + clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + final DeleteIndexListener listener = new DeleteIndexListener(request, userListener); + try { + RoutingTable routingTable = currentState.routingTable(); + if (!routingTable.hasIndex(request.index)) { + listener.onFailure(new IndexMissingException(new Index(request.index))); + return currentState; + } + + logger.info("[{}] deleting index", request.index); + + RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); + for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { + if (!indexRoutingTable.index().equals(request.index)) { + routingTableBuilder.add(indexRoutingTable); + } + } + MetaData newMetaData = newMetaDataBuilder() + .metaData(currentState.metaData()) + .remove(request.index) + .build(); + + RoutingTable newRoutingTable = shardsRoutingStrategy.reroute( + newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); + + final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); + + final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() { + @Override public void onNodeIndexDeleted(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true)); + nodeIndexDeletedAction.remove(this); + } + } + } + }; + nodeIndexDeletedAction.add(nodeIndexDeleteListener); + + Timeout timeoutTask = timerService.newTimeout(new TimerTask() { + @Override public void run(Timeout timeout) throws Exception { + listener.onResponse(new Response(false)); + nodeIndexDeletedAction.remove(nodeIndexDeleteListener); + } + }, request.timeout, TimerService.ExecutionType.THREADED); + listener.timeout = timeoutTask; + + + return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build(); + } catch (Exception e) { + listener.onFailure(e); + return currentState; + } + } + }); + } + + class DeleteIndexListener implements Listener { + + private AtomicBoolean notified = new AtomicBoolean(); + + private final Request request; + + private final Listener listener; + + volatile Timeout timeout; + + private DeleteIndexListener(Request request, Listener listener) { + this.request = request; + this.listener = listener; + } + + @Override public void onResponse(final Response response) { + if (notified.compareAndSet(false, true)) { + if (timeout != null) { + timeout.cancel(); + } + listener.onResponse(response); + } + } + + @Override public void onFailure(Throwable t) { + if (notified.compareAndSet(false, true)) { + if (timeout != null) { + timeout.cancel(); + } + listener.onFailure(t); + } + } + } + + + public static interface Listener { + + void onResponse(Response response); + + void onFailure(Throwable t); + } + + public static class Request { + + final String index; + + TimeValue timeout = TimeValue.timeValueSeconds(10); + + public Request(String index) { + this.index = index; + } + + public Request timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + } + + public static class Response { + private final boolean acknowledged; + + public Response(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + public boolean acknowledged() { + return acknowledged; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java new file mode 100644 index 00000000000..be2968de06c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.IndexMissingException; + +import java.util.Set; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.common.collect.Sets.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; + +/** + * @author kimchy (shay.banon) + */ +public class MetaDataIndexAliasesService extends AbstractComponent { + + private final ClusterService clusterService; + + @Inject public MetaDataIndexAliasesService(Settings settings, ClusterService clusterService) { + super(settings); + this.clusterService = clusterService; + } + + public void indicesAliases(final Request request, final Listener listener) { + clusterService.submitStateUpdateTask("index-aliases", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + + for (AliasAction aliasAction : request.actions) { + if (!currentState.metaData().hasIndex(aliasAction.index())) { + listener.onFailure(new IndexMissingException(new Index(aliasAction.index()))); + return currentState; + } + } + + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + for (AliasAction aliasAction : request.actions) { + IndexMetaData indexMetaData = builder.get(aliasAction.index()); + if (indexMetaData == null) { + throw new IndexMissingException(new Index(aliasAction.index())); + } + Set indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases")); + if (aliasAction.actionType() == AliasAction.Type.ADD) { + indexAliases.add(aliasAction.alias()); + } else if (aliasAction.actionType() == AliasAction.Type.REMOVE) { + indexAliases.remove(aliasAction.alias()); + } + + Settings settings = settingsBuilder().put(indexMetaData.settings()) + .putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()])) + .build(); + + builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings)); + } + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + listener.onResponse(new Response()); + } + }); + } + + public static interface Listener { + + void onResponse(Response response); + + void onFailure(Throwable t); + } + + public static class Request { + + final AliasAction[] actions; + + public Request(AliasAction[] actions) { + this.actions = actions; + } + } + + public static class Response { + + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java new file mode 100644 index 00000000000..d1a45b50c77 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -0,0 +1,301 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.timer.Timeout; +import org.elasticsearch.common.timer.TimerTask; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.InvalidTypeNameException; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MergeMappingException; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.timer.TimerService; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.common.collect.Maps.*; +import static org.elasticsearch.common.collect.Sets.*; +import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*; + +/** + * @author kimchy (shay.banon) + */ +public class MetaDataMappingService extends AbstractComponent { + + private final ClusterService clusterService; + + private final IndicesService indicesService; + + private final TimerService timerService; + + private final NodeMappingCreatedAction nodeMappingCreatedAction; + + @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, + TimerService timerService, NodeMappingCreatedAction nodeMappingCreatedAction) { + super(settings); + this.clusterService = clusterService; + this.indicesService = indicesService; + this.timerService = timerService; + this.nodeMappingCreatedAction = nodeMappingCreatedAction; + } + + public void updateMapping(final String index, final String type, final String mappingSource) { + clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MapperService mapperService = indicesService.indexServiceSafe(index).mapperService(); + + DocumentMapper existingMapper = mapperService.documentMapper(type); + // parse the updated one + DocumentMapper updatedMapper = mapperService.parse(type, mappingSource); + if (existingMapper == null) { + existingMapper = updatedMapper; + } else { + // merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones) + existingMapper.merge(updatedMapper, mergeFlags().simulate(false)); + } + // build the updated mapping source + final String updatedMappingSource = existingMapper.buildSource(); + if (logger.isDebugEnabled()) { + logger.debug("[{}] update mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource); + } else if (logger.isInfoEnabled()) { + logger.info("[{}] update mapping [{}] (dynamic)", index, type); + } + + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + IndexMetaData indexMetaData = currentState.metaData().index(index); + builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource)); + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } + }); + } + + public void putMapping(final Request request, final Listener userListener) { + clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + final PutMappingListener listener = new PutMappingListener(request, userListener); + try { + if (request.indices.length == 0) { + throw new IndexMissingException(new Index("_all")); + } + for (String index : request.indices) { + if (!currentState.metaData().hasIndex(index)) { + listener.onFailure(new IndexMissingException(new Index(index))); + } + } + + Map newMappers = newHashMap(); + Map existingMappers = newHashMap(); + for (String index : request.indices) { + IndexService indexService = indicesService.indexService(index); + if (indexService != null) { + // try and parse it (no need to add it here) so we can bail early in case of parsing exception + DocumentMapper newMapper = indexService.mapperService().parse(request.mappingType, request.mappingSource); + newMappers.put(index, newMapper); + DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.mappingType); + if (existingMapper != null) { + // first, simulate + DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true)); + // if we have conflicts, and we are not supposed to ignore them, throw an exception + if (!request.ignoreConflicts && mergeResult.hasConflicts()) { + throw new MergeMappingException(mergeResult.conflicts()); + } + existingMappers.put(index, existingMapper); + } + } else { + throw new IndexMissingException(new Index(index)); + } + } + + String mappingType = request.mappingType; + if (mappingType == null) { + mappingType = newMappers.values().iterator().next().type(); + } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { + throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition"); + } + if (mappingType.charAt(0) == '_') { + throw new InvalidTypeNameException("Document mapping type name can't start with '_'"); + } + + final Map> mappings = newHashMap(); + for (Map.Entry entry : newMappers.entrySet()) { + Tuple mapping; + String index = entry.getKey(); + // do the actual merge here on the master, and update the mapping source + DocumentMapper newMapper = entry.getValue(); + if (existingMappers.containsKey(entry.getKey())) { + // we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source + DocumentMapper existingMapper = existingMappers.get(entry.getKey()); + existingMapper.merge(newMapper, mergeFlags().simulate(false)); + // use the merged mapping source + mapping = new Tuple(existingMapper.type(), existingMapper.buildSource()); + } else { + mapping = new Tuple(newMapper.type(), newMapper.buildSource()); + } + mappings.put(index, mapping); + if (logger.isDebugEnabled()) { + logger.debug("[{}] put_mapping [{}] with source [{}]", index, mapping.v1(), mapping.v2()); + } else if (logger.isInfoEnabled()) { + logger.info("[{}] put_mapping [{}]", index, mapping.v1()); + } + } + + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + for (String indexName : request.indices) { + IndexMetaData indexMetaData = currentState.metaData().index(indexName); + if (indexMetaData == null) { + throw new IndexMissingException(new Index(indexName)); + } + Tuple mapping = mappings.get(indexName); + builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2())); + } + + + final AtomicInteger counter = new AtomicInteger(clusterService.state().nodes().size() * request.indices.length); + final Set indicesSet = newHashSet(request.indices); + final NodeMappingCreatedAction.Listener nodeMappingListener = new NodeMappingCreatedAction.Listener() { + @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { + if (indicesSet.contains(response.index()) && response.type().equals(request.mappingType)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true)); + nodeMappingCreatedAction.remove(this); + } + } + } + }; + nodeMappingCreatedAction.add(nodeMappingListener); + + Timeout timeoutTask = timerService.newTimeout(new TimerTask() { + @Override public void run(Timeout timeout) throws Exception { + listener.onResponse(new Response(false)); + nodeMappingCreatedAction.remove(nodeMappingListener); + } + }, request.timeout, TimerService.ExecutionType.THREADED); + listener.timeout = timeoutTask; + + + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } catch (Exception e) { + listener.onFailure(e); + return currentState; + } + } + }); + } + + class PutMappingListener implements Listener { + + private AtomicBoolean notified = new AtomicBoolean(); + + private final Request request; + + private final Listener listener; + + volatile Timeout timeout; + + private PutMappingListener(Request request, Listener listener) { + this.request = request; + this.listener = listener; + } + + @Override public void onResponse(final Response response) { + if (notified.compareAndSet(false, true)) { + if (timeout != null) { + timeout.cancel(); + } + listener.onResponse(response); + } + } + + @Override public void onFailure(Throwable t) { + if (notified.compareAndSet(false, true)) { + if (timeout != null) { + timeout.cancel(); + } + listener.onFailure(t); + } + } + } + + public static interface Listener { + + void onResponse(Response response); + + void onFailure(Throwable t); + } + + public static class Request { + + final String[] indices; + + final String mappingType; + + final String mappingSource; + + boolean ignoreConflicts = false; + + TimeValue timeout = TimeValue.timeValueSeconds(10); + + public Request(String[] indices, String mappingType, String mappingSource) { + this.indices = indices; + this.mappingType = mappingType; + this.mappingSource = mappingSource; + } + + public Request ignoreConflicts(boolean ignoreConflicts) { + this.ignoreConflicts = ignoreConflicts; + return this; + } + + public Request timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + } + + public static class Response { + private final boolean acknowledged; + + public Response(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + public boolean acknowledged() { + return acknowledged; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java deleted file mode 100644 index 816e018b4e5..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java +++ /dev/null @@ -1,525 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.metadata; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; -import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Maps; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.env.Environment; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.InvalidTypeNameException; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.MergeMappingException; -import org.elasticsearch.index.service.IndexService; -import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.InvalidIndexNameException; - -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.cluster.metadata.IndexMetaData.*; -import static org.elasticsearch.cluster.metadata.MetaData.*; -import static org.elasticsearch.common.collect.Maps.*; -import static org.elasticsearch.common.collect.Sets.*; -import static org.elasticsearch.common.settings.ImmutableSettings.*; -import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*; - -/** - * @author kimchy (shay.banon) - */ -public class MetaDataService extends AbstractComponent { - - private final Environment environment; - - private final ClusterService clusterService; - - private final ShardsRoutingStrategy shardsRoutingStrategy; - - private final IndicesService indicesService; - - private final NodeIndexCreatedAction nodeIndexCreatedAction; - - private final NodeIndexDeletedAction nodeIndexDeletedAction; - - private final NodeMappingCreatedAction nodeMappingCreatedAction; - - private final Object mutex = new Object(); - - @Inject public MetaDataService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService, ShardsRoutingStrategy shardsRoutingStrategy, - NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction, - NodeMappingCreatedAction nodeMappingCreatedAction) { - super(settings); - this.environment = environment; - this.clusterService = clusterService; - this.indicesService = indicesService; - this.shardsRoutingStrategy = shardsRoutingStrategy; - this.nodeIndexCreatedAction = nodeIndexCreatedAction; - this.nodeIndexDeletedAction = nodeIndexDeletedAction; - this.nodeMappingCreatedAction = nodeMappingCreatedAction; - } - - // TODO should find nicer solution than sync here, since we block for timeout (same for other ops) - - public IndicesAliasesResult indicesAliases(final List aliasActions) { - synchronized (mutex) { - ClusterState clusterState = clusterService.state(); - - for (AliasAction aliasAction : aliasActions) { - if (!clusterState.metaData().hasIndex(aliasAction.index())) { - throw new IndexMissingException(new Index(aliasAction.index())); - } - } - - clusterService.submitStateUpdateTask("index-aliases", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - for (AliasAction aliasAction : aliasActions) { - IndexMetaData indexMetaData = builder.get(aliasAction.index()); - if (indexMetaData == null) { - throw new IndexMissingException(new Index(aliasAction.index())); - } - Set indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases")); - if (aliasAction.actionType() == AliasAction.Type.ADD) { - indexAliases.add(aliasAction.alias()); - } else if (aliasAction.actionType() == AliasAction.Type.REMOVE) { - indexAliases.remove(aliasAction.alias()); - } - - Settings settings = settingsBuilder().put(indexMetaData.settings()) - .putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()])) - .build(); - - builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings)); - } - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } - }); - - return new IndicesAliasesResult(); - } - } - - public CreateIndexResult createIndex(final String cause, final String index, final Settings indexSettings, Map mappings, TimeValue timeout) throws IndexAlreadyExistsException { - final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size()); - NodeIndexCreatedAction.Listener nodeCreatedListener = new NodeIndexCreatedAction.Listener() { - @Override public void onNodeIndexCreated(String mIndex, String nodeId) { - if (index.equals(mIndex)) { - latch.countDown(); - } - } - }; - synchronized (mutex) { - ClusterState clusterState = clusterService.state(); - - if (clusterState.routingTable().hasIndex(index)) { - throw new IndexAlreadyExistsException(new Index(index)); - } - if (clusterState.metaData().hasIndex(index)) { - throw new IndexAlreadyExistsException(new Index(index)); - } - if (index.contains(" ")) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain whitespace"); - } - if (index.contains(",")) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain ',"); - } - if (index.contains("#")) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain '#"); - } - if (index.charAt(0) == '_') { - throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'"); - } - if (!index.toLowerCase().equals(index)) { - throw new InvalidIndexNameException(new Index(index), index, "must be lowercase"); - } - if (!Strings.validFileName(index)) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); - } - if (clusterState.metaData().aliases().contains(index)) { - throw new InvalidIndexNameException(new Index(index), index, "an alias with the same name already exists"); - } - - // add to the mappings files that exists within the config/mappings location - if (mappings == null) { - mappings = Maps.newHashMap(); - } else { - mappings = Maps.newHashMap(mappings); - } - File mappingsDir = new File(environment.configFile(), "mappings"); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - File defaultMappingsDir = new File(mappingsDir, "_default"); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - addMappings(mappings, defaultMappingsDir); - } - File indexMappingsDir = new File(mappingsDir, index); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - addMappings(mappings, indexMappingsDir); - } - } - - final Map fMappings = mappings; - - nodeIndexCreatedAction.add(nodeCreatedListener); - clusterService.submitStateUpdateTask("create-index [" + index + "], cause [" + cause + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(indexSettings); - if (indexSettings.get(SETTING_NUMBER_OF_SHARDS) == null) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); - } - if (indexSettings.get(SETTING_NUMBER_OF_REPLICAS) == null) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); - } - Settings actualIndexSettings = indexSettingsBuilder.build(); - - IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings); - for (Map.Entry entry : fMappings.entrySet()) { - indexMetaData.putMapping(entry.getKey(), entry.getValue()); - } - MetaData newMetaData = newMetaDataBuilder() - .metaData(currentState.metaData()) - .put(indexMetaData) - .build(); - - logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet()); - return newClusterStateBuilder().state(currentState).metaData(newMetaData).build(); - } - }); - } - - boolean acknowledged; - try { - acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - acknowledged = false; - } finally { - nodeIndexCreatedAction.remove(nodeCreatedListener); - } - - final CountDownLatch latch2 = new CountDownLatch(1); - // do the reroute after indices have been created on all the other nodes so we can query them for some info - clusterService.submitStateUpdateTask("reroute after index [" + index + "] creation", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); - for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { - routingTableBuilder.add(indexRoutingTable); - } - IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(index) - .initializeEmpty(currentState.metaData().index(index)); - routingTableBuilder.add(indexRoutingBuilder); - RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - latch2.countDown(); - } - }); - - // wait till it got processed (on the master, we are the master) - try { - latch2.await(timeout.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - } - - return new CreateIndexResult(acknowledged); - } - - private void addMappings(Map mappings, File mappingsDir) { - File[] mappingsFiles = mappingsDir.listFiles(); - for (File mappingFile : mappingsFiles) { - String fileNameNoSuffix = mappingFile.getName().substring(0, mappingFile.getName().lastIndexOf('.')); - if (mappings.containsKey(fileNameNoSuffix)) { - // if we have the mapping defined, ignore it - continue; - } - try { - mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile))); - } catch (IOException e) { - logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e); - } - } - } - - public DeleteIndexResult deleteIndex(final String index, TimeValue timeout) throws IndexMissingException { - ClusterState clusterState = clusterService.state(); - final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size()); - NodeIndexDeletedAction.Listener listener = new NodeIndexDeletedAction.Listener() { - @Override public void onNodeIndexDeleted(String fIndex, String nodeId) { - if (fIndex.equals(index)) { - latch.countDown(); - } - } - }; - nodeIndexDeletedAction.add(listener); - synchronized (mutex) { - - RoutingTable routingTable = clusterState.routingTable(); - if (!routingTable.hasIndex(index)) { - throw new IndexMissingException(new Index(index)); - } - - logger.info("[{}] deleting index", index); - - clusterService.submitStateUpdateTask("delete-index [" + index + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); - for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { - if (!indexRoutingTable.index().equals(index)) { - routingTableBuilder.add(indexRoutingTable); - } - } - MetaData newMetaData = newMetaDataBuilder() - .metaData(currentState.metaData()) - .remove(index) - .build(); - - RoutingTable newRoutingTable = shardsRoutingStrategy.reroute( - newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build(); - } - }); - } - - boolean acknowledged; - try { - acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - acknowledged = false; - } finally { - nodeIndexDeletedAction.remove(listener); - } - return new DeleteIndexResult(acknowledged); - } - - public void updateMapping(final String index, final String type, final String mappingSource) { - synchronized (mutex) { - MapperService mapperService = indicesService.indexServiceSafe(index).mapperService(); - - DocumentMapper existingMapper = mapperService.documentMapper(type); - // parse the updated one - DocumentMapper updatedMapper = mapperService.parse(type, mappingSource); - if (existingMapper == null) { - existingMapper = updatedMapper; - } else { - // merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones) - existingMapper.merge(updatedMapper, mergeFlags().simulate(false)); - } - // build the updated mapping source - final String updatedMappingSource = existingMapper.buildSource(); - if (logger.isDebugEnabled()) { - logger.debug("[{}] update mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] update mapping [{}] (dynamic)", index, type); - } - // publish the new mapping - clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - IndexMetaData indexMetaData = currentState.metaData().index(index); - builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource)); - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } - }); - } - } - - public PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreConflicts, TimeValue timeout) throws ElasticSearchException { - ClusterState clusterState = clusterService.state(); - final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length); - final Set indicesSet = newHashSet(indices); - final String fMappingType = mappingType; - NodeMappingCreatedAction.Listener listener = new NodeMappingCreatedAction.Listener() { - @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { - if (indicesSet.contains(response.index()) && response.type().equals(fMappingType)) { - latch.countDown(); - } - } - }; - synchronized (mutex) { - if (indices.length == 0) { - throw new IndexMissingException(new Index("_all")); - } - for (String index : indices) { - if (!clusterState.metaData().hasIndex(index)) { - throw new IndexMissingException(new Index(index)); - } - } - - Map newMappers = newHashMap(); - Map existingMappers = newHashMap(); - for (String index : indices) { - IndexService indexService = indicesService.indexService(index); - if (indexService != null) { - // try and parse it (no need to add it here) so we can bail early in case of parsing exception - DocumentMapper newMapper = indexService.mapperService().parse(mappingType, mappingSource); - newMappers.put(index, newMapper); - DocumentMapper existingMapper = indexService.mapperService().documentMapper(mappingType); - if (existingMapper != null) { - // first, simulate - DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true)); - // if we have conflicts, and we are not supposed to ignore them, throw an exception - if (!ignoreConflicts && mergeResult.hasConflicts()) { - throw new MergeMappingException(mergeResult.conflicts()); - } - existingMappers.put(index, existingMapper); - } - } else { - throw new IndexMissingException(new Index(index)); - } - } - - if (mappingType == null) { - mappingType = newMappers.values().iterator().next().type(); - } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { - throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition"); - } - if (mappingType.charAt(0) == '_') { - throw new InvalidTypeNameException("Document mapping type name can't start with '_'"); - } - - final Map> mappings = newHashMap(); - for (Map.Entry entry : newMappers.entrySet()) { - Tuple mapping; - String index = entry.getKey(); - // do the actual merge here on the master, and update the mapping source - DocumentMapper newMapper = entry.getValue(); - if (existingMappers.containsKey(entry.getKey())) { - // we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source - DocumentMapper existingMapper = existingMappers.get(entry.getKey()); - existingMapper.merge(newMapper, mergeFlags().simulate(false)); - // use the merged mapping source - mapping = new Tuple(existingMapper.type(), existingMapper.buildSource()); - } else { - mapping = new Tuple(newMapper.type(), newMapper.buildSource()); - } - mappings.put(index, mapping); - if (logger.isDebugEnabled()) { - logger.debug("[{}] put_mapping [{}] with source [{}]", index, mapping.v1(), mapping.v2()); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] put_mapping [{}]", index, mapping.v1()); - } - } - - nodeMappingCreatedAction.add(listener); - - clusterService.submitStateUpdateTask("put-mapping [" + mappingType + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - for (String indexName : indices) { - IndexMetaData indexMetaData = currentState.metaData().index(indexName); - if (indexMetaData == null) { - throw new IndexMissingException(new Index(indexName)); - } - Tuple mapping = mappings.get(indexName); - builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2())); - } - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } - }); - } - - boolean acknowledged; - try { - acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - acknowledged = false; - } finally { - nodeMappingCreatedAction.remove(listener); - } - - return new PutMappingResult(acknowledged); - } - - /** - * The result of a putting mapping. - */ - public static class PutMappingResult { - - private final boolean acknowledged; - - public PutMappingResult(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - - public static class CreateIndexResult { - - private final boolean acknowledged; - - public CreateIndexResult(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - - public static class DeleteIndexResult { - - private final boolean acknowledged; - - public DeleteIndexResult(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - - public static class IndicesAliasesResult { - - public IndicesAliasesResult() { - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index ebc68ae6ab5..0722fbe9e5e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -26,7 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -63,7 +63,7 @@ public class GatewayService extends AbstractLifecycleComponent i private final DiscoveryService discoveryService; - private final MetaDataService metaDataService; + private final MetaDataCreateIndexService createIndexService; private final TimeValue initialStateTimeout; @@ -75,13 +75,13 @@ public class GatewayService extends AbstractLifecycleComponent i private final AtomicBoolean readFromGateway = new AtomicBoolean(); @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, - ThreadPool threadPool, MetaDataService metaDataService) { + ThreadPool threadPool, MetaDataCreateIndexService createIndexService) { super(settings); this.gateway = gateway; this.clusterService = clusterService; this.discoveryService = discoveryService; this.threadPool = threadPool; - this.metaDataService = metaDataService; + this.createIndexService = createIndexService; this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30)); // allow to control a delay of when indices will get created this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null); @@ -247,15 +247,13 @@ public class GatewayService extends AbstractLifecycleComponent i // go over the meta data and create indices, we don't really need to copy over // the meta data per index, since we create the index and it will be added automatically for (final IndexMetaData indexMetaData : fMetaData) { - threadPool.execute(new Runnable() { - @Override public void run() { - try { - metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueSeconds(30)); - } catch (Exception e) { - logger.error("failed to create index [" + indexMetaData.index() + "]", e); - } finally { - latch.countDown(); - } + createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappings(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() { + @Override public void onResponse(MetaDataCreateIndexService.Response response) { + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + logger.error("failed to create index [{}]", indexMetaData.index(), t); } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index f87f5c88acb..59872aaa752 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterNameModule; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleComponent; @@ -241,43 +242,67 @@ public final class InternalNode implements Node { ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); logger.info("{{}}[{}]: closing ...", Version.full(), JvmInfo.jvmInfo().pid()); + StopWatch stopWatch = new StopWatch("node_close"); + stopWatch.start("http"); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).close(); } + stopWatch.stop().start("client"); injector.getInstance(Client.class).close(); + stopWatch.stop().start("routing"); injector.getInstance(RoutingService.class).close(); + stopWatch.stop().start("cluster"); injector.getInstance(ClusterService.class).close(); + stopWatch.stop().start("discovery"); injector.getInstance(DiscoveryService.class).close(); + stopWatch.stop().start("monitor"); injector.getInstance(MonitorService.class).close(); + stopWatch.stop().start("gateway"); injector.getInstance(GatewayService.class).close(); + stopWatch.stop().start("search"); injector.getInstance(SearchService.class).close(); + stopWatch.stop().start("indices_cluster"); injector.getInstance(IndicesClusterStateService.class).close(); + stopWatch.stop().start("indices"); injector.getInstance(IndicesService.class).close(); + stopWatch.stop().start("rest"); injector.getInstance(RestController.class).close(); + stopWatch.stop().start("transport"); injector.getInstance(TransportService.class).close(); + stopWatch.stop().start("http_client"); injector.getInstance(HttpClientService.class).close(); for (Class plugin : pluginsService.services()) { + stopWatch.stop().start("plugin(" + plugin.getName() + ")"); injector.getInstance(plugin).close(); } + stopWatch.stop().start("node_cache"); injector.getInstance(NodeCache.class).close(); + stopWatch.stop().start("timer"); injector.getInstance(TimerService.class).close(); + stopWatch.stop().start("thread_pool"); injector.getInstance(ThreadPool.class).shutdown(); try { injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } + stopWatch.stop().start("thread_pool_force_shutdown"); try { injector.getInstance(ThreadPool.class).shutdownNow(); } catch (Exception e) { // ignore } + stopWatch.stop(); ThreadLocals.clearReferencesThreadLocals(); + if (logger.isTraceEnabled()) { + logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()); + } + logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid()); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 3905c36896d..1bf438e07ee 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -317,7 +317,7 @@ public class SearchService extends AbstractLifecycleComponent { } context.keepAlive(keepAlive); context.accessed(timerService.estimatedTimeInMillis()); - context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive)); + context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive, TimerService.ExecutionType.DEFAULT)); return context; } @@ -424,7 +424,7 @@ public class SearchService extends AbstractLifecycleComponent { freeContext(context.id()); } else { // Read occurred before the timeout - set a new timeout with shorter delay. - context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS)); + context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS, TimerService.ExecutionType.DEFAULT)); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java index e9356a7f5c7..8ef0d191fae 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java @@ -74,17 +74,25 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit); } + @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, TimeValue interval) { + return scheduleWithFixedDelay(command, interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); + } + @Override public void shutdown() { started = false; - logger.debug("Shutting down {} thread pool", getType()); + logger.debug("shutting down {} thread pool", getType()); executorService.shutdown(); scheduledExecutorService.shutdown(); } @Override public void shutdownNow() { started = false; - executorService.shutdownNow(); - scheduledExecutorService.shutdownNow(); + if (!executorService.isTerminated()) { + executorService.shutdownNow(); + } + if (!executorService.isTerminated()) { + scheduledExecutorService.shutdownNow(); + } } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { @@ -121,10 +129,6 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th return schedule(command, delay.millis(), TimeUnit.MILLISECONDS); } - @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, TimeValue interval) { - return scheduleWithFixedDelay(command, interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); - } - @Override public void execute(Runnable command) { executorService.execute(command); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java index 48bc7d3d9b6..5a02371c143 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java @@ -41,6 +41,11 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*; */ public class TimerService extends AbstractComponent { + public static enum ExecutionType { + DEFAULT, + THREADED + } + private final ThreadPool threadPool; private final TimeEstimator timeEstimator; @@ -79,14 +84,41 @@ public class TimerService extends AbstractComponent { return timeEstimator.time(); } - public Timeout newTimeout(TimerTask task, TimeValue delay) { - return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS); + public Timeout newTimeout(TimerTask task, TimeValue delay, ExecutionType executionType) { + return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS, executionType); } - public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit, ExecutionType executionType) { + if (executionType == ExecutionType.THREADED) { + task = new ThreadedTimerTask(threadPool, task); + } return timer.newTimeout(task, delay, unit); } + private class ThreadedTimerTask implements TimerTask { + + private final ThreadPool threadPool; + + private final TimerTask task; + + private ThreadedTimerTask(ThreadPool threadPool, TimerTask task) { + this.threadPool = threadPool; + this.task = task; + } + + @Override public void run(final Timeout timeout) throws Exception { + threadPool.execute(new Runnable() { + @Override public void run() { + try { + task.run(timeout); + } catch (Exception e) { + logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", e); + } + } + }); + } + } + private static class TimeEstimator implements Runnable { private long time = System.currentTimeMillis(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index a53606b725e..72434fde42d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -177,7 +177,7 @@ public class TransportService extends AbstractLifecycleComponent(handler, node, action, timeoutX)); transport.sendRequest(node, requestId, action, message); @@ -313,13 +313,7 @@ public class TransportService extends AbstractLifecycleComponent