more work on inexers

This commit is contained in:
kimchy 2010-09-17 16:36:12 +02:00
parent ff081240eb
commit 3c95d6a215
17 changed files with 120 additions and 141 deletions

View File

@ -64,6 +64,10 @@ public class ClusterChangedEvent {
return state.metaData() != previousState.metaData();
}
public boolean blocksChanged() {
return state.blocks() != previousState.blocks();
}
public boolean localNodeMaster() {
return state.nodes().localNodeMaster();
}

View File

@ -120,6 +120,7 @@ public class MetaDataMappingService extends AbstractComponent {
throw new IndexMissingException(new Index("_all"));
}
logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType);
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (String indexName : request.indices) {
if (currentState.metaData().hasIndex(indexName)) {

View File

@ -21,9 +21,6 @@ package org.elasticsearch.indexer;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
* @author kimchy (shay.banon)
@ -34,16 +31,13 @@ public class AbstractIndexerComponent implements IndexerComponent {
protected final IndexerName indexerName;
protected final Settings settings;
protected final IndexerSettings settings;
protected final Map<String, Object> indexerSettings;
protected AbstractIndexerComponent(IndexerName indexerName, Settings settings, @IndexerSettings Map<String, Object> indexerSettings) {
protected AbstractIndexerComponent(IndexerName indexerName, IndexerSettings settings) {
this.indexerName = indexerName;
this.settings = settings;
this.indexerSettings = indexerSettings;
this.logger = Loggers.getLogger(getClass(), settings, indexerName);
this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), indexerName);
}
@Override public IndexerName indexerName() {
@ -51,6 +45,6 @@ public class AbstractIndexerComponent implements IndexerComponent {
}
public String nodeName() {
return settings.get("name", "");
return settings.globalSettings().get("name", "");
}
}

View File

@ -24,5 +24,5 @@ package org.elasticsearch.indexer;
*/
public interface Indexer extends IndexerComponent {
void close(boolean delete);
void close();
}

View File

@ -54,7 +54,7 @@ public class IndexerModule extends AbstractModule implements SpawnModules {
}
@Override protected void configure() {
bind(Map.class).annotatedWith(IndexerSettings.class).toInstance(settings);
bind(IndexerSettings.class).toInstance(new IndexerSettings(globalSettings, settings));
}
private Class<? extends Module> loadTypeModule(String type, String prefixPackage, String suffixClassName) {

View File

@ -19,22 +19,30 @@
package org.elasticsearch.indexer;
import org.elasticsearch.common.inject.BindingAnnotation;
import org.elasticsearch.common.settings.Settings;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.*;
import java.util.Map;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shayy.banon)
*/
@BindingAnnotation
@Target({FIELD, PARAMETER})
@Retention(RUNTIME)
@Documented
public @interface IndexerSettings {
public class IndexerSettings {
private final Settings globalSettings;
private final Map<String, Object> settings;
public IndexerSettings(Settings globalSettings, Map<String, Object> settings) {
this.globalSettings = globalSettings;
this.settings = settings;
}
public Settings globalSettings() {
return globalSettings;
}
public Map<String, Object> settings() {
return settings;
}
}

View File

@ -84,7 +84,7 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
deleteIndexer(indexerName, false);
closeIndexer(indexerName);
} catch (Exception e) {
logger.warn("failed to delete indexer on stop [{}]/[{}]", e, indexerName.type(), indexerName.name());
} finally {
@ -121,28 +121,15 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
return indexer;
}
public synchronized void cleanIndexer(IndexerName indexerName) throws ElasticSearchException {
deleteIndexer(indexerName, false);
}
public synchronized void deleteIndexer(IndexerName indexerName) throws ElasticSearchException {
deleteIndexer(indexerName, true);
}
private void deleteIndexer(IndexerName indexerName, boolean delete) {
public synchronized void closeIndexer(IndexerName indexerName) throws ElasticSearchException {
Injector indexerInjector;
Indexer indexer;
synchronized (this) {
indexerInjector = indexersInjectors.remove(indexerName);
if (indexerInjector == null) {
if (!delete) {
return;
}
throw new IndexerException(indexerName, "missing");
}
if (delete) {
logger.debug("deleting indexer [{}][{}]", indexerName.type(), indexerName.name());
}
logger.debug("closing indexer [{}][{}]", indexerName.type(), indexerName.name());
Map<IndexerName, Indexer> tmpMap = Maps.newHashMap(indexers);
indexer = tmpMap.remove(indexerName);
@ -153,9 +140,7 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
// indexerInjector.getInstance(closeable).close(delete);
// }
indexer.close(delete);
indexerInjector.getInstance(Indexer.class).close(delete);
indexer.close();
Injectors.close(injector);
}
@ -170,7 +155,7 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
IndexerRouting routing = state.routing().routing(indexerName);
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
cleanIndexer(indexerName);
closeIndexer(indexerName);
}
}

View File

@ -40,8 +40,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
*/
public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClusterService> {
private final TransportService transportService;
private final ClusterService clusterService;
private final PublishIndexerClusterStateAction publishAction;
@ -54,7 +52,6 @@ public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClu
@Inject public IndexerClusterService(Settings settings, TransportService transportService, ClusterService clusterService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.publishAction = new PublishIndexerClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener());

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indexer.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.indexer.IndexerName;
/**
* @author kimchy (shay.banon)
@ -41,4 +42,13 @@ public class IndexerNodeHelper {
// there is at least one indexer settings, we need it
return true;
}
public static boolean isIndexerNode(DiscoveryNode node, IndexerName indexerName) {
if (!isIndexerNode(node)) {
return false;
}
String indexer = node.attributes().get("indexer");
// by default, if not set, its an indexer node (better OOB exp)
return indexer == null || indexer.contains(indexerName.type()) || indexer.contains(indexerName.name());
}
}

View File

@ -19,25 +19,23 @@
package org.elasticsearch.indexer.dummy;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.indexer.AbstractIndexerComponent;
import org.elasticsearch.indexer.Indexer;
import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.IndexerSettings;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class DummyIndexer extends AbstractIndexerComponent implements Indexer {
public DummyIndexer(IndexerName indexerName, Settings settings, @IndexerSettings Map<String, Object> indexerSettings) {
super(indexerName, settings, indexerSettings);
logger.info("created");
@Inject public DummyIndexer(IndexerName indexerName, IndexerSettings settings) {
super(indexerName, settings);
logger.info("create");
}
@Override public void close(boolean delete) {
logger.info("delete, actual_delete [{}]", delete);
@Override public void close() {
logger.info("close");
}
}

View File

@ -34,16 +34,13 @@ public class IndexerRouting implements Streamable {
private IndexerName indexerName;
private IndexerRoutingState state;
private DiscoveryNode node;
private IndexerRouting() {
}
IndexerRouting(IndexerName indexerName, IndexerRoutingState state, DiscoveryNode node) {
IndexerRouting(IndexerName indexerName, DiscoveryNode node) {
this.indexerName = indexerName;
this.state = state;
this.node = node;
}
@ -58,8 +55,8 @@ public class IndexerRouting implements Streamable {
return node;
}
public IndexerRoutingState state() {
return this.state;
void node(DiscoveryNode node) {
this.node = node;
}
public static IndexerRouting readIndexerRouting(StreamInput in) throws IOException {
@ -70,7 +67,6 @@ public class IndexerRouting implements Streamable {
@Override public void readFrom(StreamInput in) throws IOException {
indexerName = new IndexerName(in.readUTF(), in.readUTF());
state = IndexerRoutingState.fromValue(in.readByte());
if (in.readBoolean()) {
node = DiscoveryNode.readNode(in);
}
@ -79,7 +75,6 @@ public class IndexerRouting implements Streamable {
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(indexerName.type());
out.writeUTF(indexerName.name());
out.writeByte(state.value());
if (node == null) {
out.writeBoolean(false);
} else {

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.routing;
import org.elasticsearch.ElasticSearchIllegalStateException;
/**
* The state of the indexer as defined by the cluster.
*
* @author kimchy (shay.banon)
*/
public enum IndexerRoutingState {
/**
* The indexer is not assigned to any node.
*/
UNASSIGNED((byte) 1),
/**
* The indexer is initializing.
*/
INITIALIZING((byte) 2),
/**
* The indexer is started.
*/
STARTED((byte) 3);
private byte value;
IndexerRoutingState(byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
public static IndexerRoutingState fromValue(byte value) {
switch (value) {
case 1:
return UNASSIGNED;
case 2:
return INITIALIZING;
case 3:
return STARTED;
default:
throw new ElasticSearchIllegalStateException("No should routing state mapped for [" + value + "]");
}
}
}

View File

@ -25,7 +25,11 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
@ -35,7 +39,10 @@ import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask;
import org.elasticsearch.indexer.cluster.IndexerNodeHelper;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
@ -70,7 +77,7 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
if (!event.localNodeMaster()) {
return;
}
if (event.nodesChanged() || event.metaDataChanged()) {
if (event.nodesChanged() || event.metaDataChanged() || event.blocksChanged()) {
indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) {
if (!event.state().metaData().hasIndex(indexerIndexName)) {
@ -91,18 +98,21 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
if (!currentState.routing().hasIndexerByName(mappingType)) {
// no indexer, we need to add it to the routing with no node allocation
try {
client.admin().indices().prepareRefresh(indexerIndexName).execute().actionGet();
GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet();
if (getResponse.exists()) {
String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null);
if (indexerType == null) {
logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName);
} else {
routingBuilder.put(new IndexerRouting(new IndexerName(mappingType, indexerType), IndexerRoutingState.UNASSIGNED, null));
routingBuilder.put(new IndexerRouting(new IndexerName(indexerType, mappingType), null));
dirty = true;
}
}
} catch (ClusterBlockException e) {
// ignore, we will get it next time
} catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", mappingType);
logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
}
}
}
@ -114,11 +124,50 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
}
}
// now, allocate indexers
// build a list from nodes to indexers
Map<DiscoveryNode, List<IndexerRouting>> nodesToIndexers = Maps.newHashMap();
// see if we can relocate indexers (we can simply first unassign then, then publish) and then, next round, they will be assigned
// but, we need to make sure that there will *be* next round of this is the logic
for (DiscoveryNode node : event.state().nodes()) {
if (IndexerNodeHelper.isIndexerNode(node)) {
nodesToIndexers.put(node, Lists.<IndexerRouting>newArrayList());
}
}
List<IndexerRouting> unassigned = Lists.newArrayList();
for (IndexerRouting routing : routingBuilder.build()) {
if (routing.node() == null) {
unassigned.add(routing);
} else {
List<IndexerRouting> l = nodesToIndexers.get(routing.node());
if (l == null) {
l = Lists.newArrayList();
nodesToIndexers.put(routing.node(), l);
}
l.add(routing);
}
}
for (Iterator<IndexerRouting> it = unassigned.iterator(); it.hasNext();) {
IndexerRouting routing = it.next();
DiscoveryNode smallest = null;
int smallestSize = Integer.MAX_VALUE;
for (Map.Entry<DiscoveryNode, List<IndexerRouting>> entry : nodesToIndexers.entrySet()) {
if (IndexerNodeHelper.isIndexerNode(entry.getKey(), routing.indexerName())) {
if (entry.getValue().size() < smallestSize) {
smallestSize = entry.getValue().size();
smallest = entry.getKey();
}
}
}
if (smallest != null) {
dirty = true;
it.remove();
routing.node(smallest);
nodesToIndexers.get(smallest).add(routing);
}
}
// add relocation logic...
if (dirty) {
return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build();

View File

@ -236,6 +236,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (seenMappings.containsKey(new Tuple<String, String>(index, documentMapper.type())) && !mappings.containsKey(documentMapper.type())) {
// we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it
mapperService.remove(documentMapper.type());
seenMappings.remove(new Tuple<String, String>(index, documentMapper.type()));
}
}
}

View File

@ -54,6 +54,8 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.indexer.IndexerManager;
import org.elasticsearch.indexer.IndexersModule;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@ -132,7 +134,7 @@ public final class InternalNode implements Node {
if (settings.getAsBoolean("http.enabled", true)) {
modules.add(new HttpServerModule(settings));
}
// modules.add(new IndexersModule(settings));
modules.add(new IndexersModule(settings));
modules.add(new IndicesModule(settings));
modules.add(new SearchModule());
modules.add(new TransportActionModule());
@ -169,7 +171,7 @@ public final class InternalNode implements Node {
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
// injector.getInstance(IndexerManager.class).start();
injector.getInstance(IndexerManager.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
@ -207,7 +209,7 @@ public final class InternalNode implements Node {
injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
// injector.getInstance(IndexerManager.class).stop();
injector.getInstance(IndexerManager.class).stop();
injector.getInstance(IndicesClusterStateService.class).stop();
injector.getInstance(IndicesService.class).stop();
injector.getInstance(RestController.class).stop();
@ -254,7 +256,7 @@ public final class InternalNode implements Node {
stopWatch.stop().start("search");
injector.getInstance(SearchService.class).close();
stopWatch.stop().start("indexers");
// injector.getInstance(IndexerManager.class).close();
injector.getInstance(IndexerManager.class).close();
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");

View File

@ -48,7 +48,7 @@ public class RestDeleteMappingAction extends BaseRestHandler {
@Inject public RestDeleteMappingAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(DELETE, "/{index}/{type}/_mapping", this);
controller.registerHandler(DELETE, "/{index}/{type}/", this);
controller.registerHandler(DELETE, "/{index}/{type}", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test
@Test(enabled = false)
public class BlockingThreadPoolTest {
@Test public void testBlocking() throws Exception {