diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java index 99c5b0ab2b6..299d12bce65 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java @@ -20,6 +20,8 @@ package org.elasticsearch.indexer; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.MapBuilder; @@ -30,6 +32,12 @@ import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indexer.cluster.IndexerClusterChangedEvent; +import org.elasticsearch.indexer.cluster.IndexerClusterService; +import org.elasticsearch.indexer.cluster.IndexerClusterState; +import org.elasticsearch.indexer.cluster.IndexerClusterStateListener; +import org.elasticsearch.indexer.metadata.IndexerMetaData; +import org.elasticsearch.indexer.routing.IndexerRouting; import org.elasticsearch.indexer.settings.IndexerSettingsModule; import org.elasticsearch.threadpool.ThreadPool; @@ -45,16 +53,20 @@ public class IndexersService extends AbstractLifecycleComponent private final ThreadPool threadPool; + private final ClusterService clusterService; + private final Injector injector; private final Map indexersInjectors = Maps.newHashMap(); private volatile ImmutableMap indexers = ImmutableMap.of(); - @Inject public IndexersService(Settings settings, ThreadPool threadPool, Injector injector) { + @Inject public IndexersService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) { super(settings); this.threadPool = threadPool; + this.clusterService = clusterService; this.injector = injector; + indexerClusterService.add(new ApplyIndexers()); } @Override protected void doStart() throws ElasticSearchException { @@ -149,6 +161,38 @@ public class IndexersService extends AbstractLifecycleComponent indexerInjector.getInstance(Indexer.class).close(delete); Injectors.close(injector); + } + private class ApplyIndexers implements IndexerClusterStateListener { + @Override public void indexerClusterChanged(IndexerClusterChangedEvent event) { + DiscoveryNode localNode = clusterService.localNode(); + IndexerClusterState state = event.state(); + + // first, go over and delete ones that either don't exists or are not allocated + for (IndexerName indexerName : indexers.keySet()) { + // if its not on the metadata, it was deleted, delete it + IndexerMetaData indexerMetaData = state.metaData().indexer(indexerName); + if (indexerMetaData == null) { + deleteIndexer(indexerName); + } + + IndexerRouting routing = state.routing().routing(indexerName); + if (routing == null || !localNode.equals(routing.node())) { + // not routed at all, and not allocated here, clean it (we delete the relevant ones before) + cleanIndexer(indexerName); + } + } + + for (IndexerRouting routing : state.routing()) { + // only apply changes to the local node + if (!routing.node().equals(localNode)) { + continue; + } + + IndexerMetaData indexerMetaData = state.metaData().indexer(routing.indexerName()); + + createIndexer(indexerMetaData.indexerName(), indexerMetaData.settings()); + } + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java index 8036607407a..fd21e4ee692 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java @@ -49,6 +49,10 @@ public class IndexersMetaData implements Iterable { return indexers.values().iterator(); } + public IndexerMetaData indexer(IndexerName indexerName) { + return indexers.get(indexerName); + } + public boolean recoveredFromGateway() { return recoveredFromGateway; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java new file mode 100644 index 00000000000..6097eadcd86 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.routing; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indexer.cluster.IndexerClusterService; +import org.elasticsearch.indexer.cluster.IndexerClusterState; +import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask; + +/** + * @author kimchy (shay.banon) + */ +public class IndexersRouter extends AbstractComponent implements ClusterStateListener { + + private final IndexerClusterService indexerClusterService; + + @Inject public IndexersRouter(Settings settings, ClusterService clusterService, IndexerClusterService indexerClusterService) { + super(settings); + this.indexerClusterService = indexerClusterService; + clusterService.add(this); + } + + @Override public void clusterChanged(final ClusterChangedEvent event) { + if (event.nodesChanged()) { + indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() { + @Override public IndexerClusterState execute(IndexerClusterState currentState) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + }); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java index cfd27343fd1..dc0fc8d3d74 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java @@ -40,6 +40,10 @@ public class IndexersRouting implements Iterable { this.indexers = indexers; } + public IndexerRouting routing(IndexerName indexerName) { + return indexers.get(indexerName); + } + @Override public Iterator iterator() { return indexers.values().iterator(); }