more indexer work

This commit is contained in:
kimchy 2010-09-16 13:22:09 +02:00
parent 0a6600818a
commit aef4704fed
4 changed files with 107 additions and 1 deletions

View File

@ -20,6 +20,8 @@
package org.elasticsearch.indexer;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.MapBuilder;
@ -30,6 +32,12 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterChangedEvent;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateListener;
import org.elasticsearch.indexer.metadata.IndexerMetaData;
import org.elasticsearch.indexer.routing.IndexerRouting;
import org.elasticsearch.indexer.settings.IndexerSettingsModule;
import org.elasticsearch.threadpool.ThreadPool;
@ -45,16 +53,20 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Injector injector;
private final Map<IndexerName, Injector> indexersInjectors = Maps.newHashMap();
private volatile ImmutableMap<IndexerName, Indexer> indexers = ImmutableMap.of();
@Inject public IndexersService(Settings settings, ThreadPool threadPool, Injector injector) {
@Inject public IndexersService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.injector = injector;
indexerClusterService.add(new ApplyIndexers());
}
@Override protected void doStart() throws ElasticSearchException {
@ -149,6 +161,38 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
indexerInjector.getInstance(Indexer.class).close(delete);
Injectors.close(injector);
}
private class ApplyIndexers implements IndexerClusterStateListener {
@Override public void indexerClusterChanged(IndexerClusterChangedEvent event) {
DiscoveryNode localNode = clusterService.localNode();
IndexerClusterState state = event.state();
// first, go over and delete ones that either don't exists or are not allocated
for (IndexerName indexerName : indexers.keySet()) {
// if its not on the metadata, it was deleted, delete it
IndexerMetaData indexerMetaData = state.metaData().indexer(indexerName);
if (indexerMetaData == null) {
deleteIndexer(indexerName);
}
IndexerRouting routing = state.routing().routing(indexerName);
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
cleanIndexer(indexerName);
}
}
for (IndexerRouting routing : state.routing()) {
// only apply changes to the local node
if (!routing.node().equals(localNode)) {
continue;
}
IndexerMetaData indexerMetaData = state.metaData().indexer(routing.indexerName());
createIndexer(indexerMetaData.indexerName(), indexerMetaData.settings());
}
}
}
}

View File

@ -49,6 +49,10 @@ public class IndexersMetaData implements Iterable<IndexerMetaData> {
return indexers.values().iterator();
}
public IndexerMetaData indexer(IndexerName indexerName) {
return indexers.get(indexerName);
}
public boolean recoveredFromGateway() {
return recoveredFromGateway;
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.routing;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask;
/**
* @author kimchy (shay.banon)
*/
public class IndexersRouter extends AbstractComponent implements ClusterStateListener {
private final IndexerClusterService indexerClusterService;
@Inject public IndexersRouter(Settings settings, ClusterService clusterService, IndexerClusterService indexerClusterService) {
super(settings);
this.indexerClusterService = indexerClusterService;
clusterService.add(this);
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
if (event.nodesChanged()) {
indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
});
}
}
}

View File

@ -40,6 +40,10 @@ public class IndexersRouting implements Iterable<IndexerRouting> {
this.indexers = indexers;
}
public IndexerRouting routing(IndexerName indexerName) {
return indexers.get(indexerName);
}
@Override public Iterator<IndexerRouting> iterator() {
return indexers.values().iterator();
}