diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 95036541225..aac5e7f4dbe 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -64,6 +64,10 @@ public class ClusterChangedEvent { return state.metaData() != previousState.metaData(); } + public boolean blocksChanged() { + return state.blocks() != previousState.blocks(); + } + public boolean localNodeMaster() { return state.nodes().localNodeMaster(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 18e39a83493..96ac707401e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -120,6 +120,7 @@ public class MetaDataMappingService extends AbstractComponent { throw new IndexMissingException(new Index("_all")); } + logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType); MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); for (String indexName : request.indices) { if (currentState.metaData().hasIndex(indexName)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java index 1789aa27917..c92c0fa58bb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java @@ -21,9 +21,6 @@ package org.elasticsearch.indexer; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; - -import java.util.Map; /** * @author kimchy (shay.banon) @@ -34,16 +31,13 @@ public class AbstractIndexerComponent implements IndexerComponent { protected final IndexerName indexerName; - protected final Settings settings; + protected final IndexerSettings settings; - protected final Map indexerSettings; - - protected AbstractIndexerComponent(IndexerName indexerName, Settings settings, @IndexerSettings Map indexerSettings) { + protected AbstractIndexerComponent(IndexerName indexerName, IndexerSettings settings) { this.indexerName = indexerName; this.settings = settings; - this.indexerSettings = indexerSettings; - this.logger = Loggers.getLogger(getClass(), settings, indexerName); + this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), indexerName); } @Override public IndexerName indexerName() { @@ -51,6 +45,6 @@ public class AbstractIndexerComponent implements IndexerComponent { } public String nodeName() { - return settings.get("name", ""); + return settings.globalSettings().get("name", ""); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java index 5e283b2084c..0934932d706 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java @@ -24,5 +24,5 @@ package org.elasticsearch.indexer; */ public interface Indexer extends IndexerComponent { - void close(boolean delete); + void close(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java index 87dba17b4c2..54c6053a763 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java @@ -54,7 +54,7 @@ public class IndexerModule extends AbstractModule implements SpawnModules { } @Override protected void configure() { - bind(Map.class).annotatedWith(IndexerSettings.class).toInstance(settings); + bind(IndexerSettings.class).toInstance(new IndexerSettings(globalSettings, settings)); } private Class loadTypeModule(String type, String prefixPackage, String suffixClassName) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java index 0e7f685eeb3..1fdd460e089 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java @@ -19,22 +19,30 @@ package org.elasticsearch.indexer; -import org.elasticsearch.common.inject.BindingAnnotation; +import org.elasticsearch.common.settings.Settings; -import java.lang.annotation.Documented; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; - -import static java.lang.annotation.ElementType.*; -import static java.lang.annotation.RetentionPolicy.*; +import java.util.Map; /** - * @author kimchy (Shay Banon) + * @author kimchy (shayy.banon) */ -@BindingAnnotation -@Target({FIELD, PARAMETER}) -@Retention(RUNTIME) -@Documented -public @interface IndexerSettings { +public class IndexerSettings { + + private final Settings globalSettings; + + private final Map settings; + + public IndexerSettings(Settings globalSettings, Map settings) { + this.globalSettings = globalSettings; + this.settings = settings; + } + + public Settings globalSettings() { + return globalSettings; + } + + public Map settings() { + return settings; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java index f6f8671c366..52c4ae21c07 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java @@ -84,7 +84,7 @@ public class IndexersService extends AbstractLifecycleComponent threadPool.cached().execute(new Runnable() { @Override public void run() { try { - deleteIndexer(indexerName, false); + closeIndexer(indexerName); } catch (Exception e) { logger.warn("failed to delete indexer on stop [{}]/[{}]", e, indexerName.type(), indexerName.name()); } finally { @@ -121,28 +121,15 @@ public class IndexersService extends AbstractLifecycleComponent return indexer; } - public synchronized void cleanIndexer(IndexerName indexerName) throws ElasticSearchException { - deleteIndexer(indexerName, false); - } - - public synchronized void deleteIndexer(IndexerName indexerName) throws ElasticSearchException { - deleteIndexer(indexerName, true); - } - - private void deleteIndexer(IndexerName indexerName, boolean delete) { + public synchronized void closeIndexer(IndexerName indexerName) throws ElasticSearchException { Injector indexerInjector; Indexer indexer; synchronized (this) { indexerInjector = indexersInjectors.remove(indexerName); if (indexerInjector == null) { - if (!delete) { - return; - } throw new IndexerException(indexerName, "missing"); } - if (delete) { - logger.debug("deleting indexer [{}][{}]", indexerName.type(), indexerName.name()); - } + logger.debug("closing indexer [{}][{}]", indexerName.type(), indexerName.name()); Map tmpMap = Maps.newHashMap(indexers); indexer = tmpMap.remove(indexerName); @@ -153,9 +140,7 @@ public class IndexersService extends AbstractLifecycleComponent // indexerInjector.getInstance(closeable).close(delete); // } - indexer.close(delete); - - indexerInjector.getInstance(Indexer.class).close(delete); + indexer.close(); Injectors.close(injector); } @@ -170,7 +155,7 @@ public class IndexersService extends AbstractLifecycleComponent IndexerRouting routing = state.routing().routing(indexerName); if (routing == null || !localNode.equals(routing.node())) { // not routed at all, and not allocated here, clean it (we delete the relevant ones before) - cleanIndexer(indexerName); + closeIndexer(indexerName); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java index 20988e54ec9..d96e9d9021d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java @@ -40,8 +40,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*; */ public class IndexerClusterService extends AbstractLifecycleComponent { - private final TransportService transportService; - private final ClusterService clusterService; private final PublishIndexerClusterStateAction publishAction; @@ -54,7 +52,6 @@ public class IndexerClusterService extends AbstractLifecycleComponent indexerSettings) { - super(indexerName, settings, indexerSettings); - logger.info("created"); + @Inject public DummyIndexer(IndexerName indexerName, IndexerSettings settings) { + super(indexerName, settings); + logger.info("create"); } - @Override public void close(boolean delete) { - logger.info("delete, actual_delete [{}]", delete); + @Override public void close() { + logger.info("close"); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java index 7b648e8a06e..ec9cb28795e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java @@ -34,16 +34,13 @@ public class IndexerRouting implements Streamable { private IndexerName indexerName; - private IndexerRoutingState state; - private DiscoveryNode node; private IndexerRouting() { } - IndexerRouting(IndexerName indexerName, IndexerRoutingState state, DiscoveryNode node) { + IndexerRouting(IndexerName indexerName, DiscoveryNode node) { this.indexerName = indexerName; - this.state = state; this.node = node; } @@ -58,8 +55,8 @@ public class IndexerRouting implements Streamable { return node; } - public IndexerRoutingState state() { - return this.state; + void node(DiscoveryNode node) { + this.node = node; } public static IndexerRouting readIndexerRouting(StreamInput in) throws IOException { @@ -70,7 +67,6 @@ public class IndexerRouting implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { indexerName = new IndexerName(in.readUTF(), in.readUTF()); - state = IndexerRoutingState.fromValue(in.readByte()); if (in.readBoolean()) { node = DiscoveryNode.readNode(in); } @@ -79,7 +75,6 @@ public class IndexerRouting implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeUTF(indexerName.type()); out.writeUTF(indexerName.name()); - out.writeByte(state.value()); if (node == null) { out.writeBoolean(false); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java deleted file mode 100644 index f69f521bf4d..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indexer.routing; - -import org.elasticsearch.ElasticSearchIllegalStateException; - -/** - * The state of the indexer as defined by the cluster. - * - * @author kimchy (shay.banon) - */ -public enum IndexerRoutingState { - /** - * The indexer is not assigned to any node. - */ - UNASSIGNED((byte) 1), - /** - * The indexer is initializing. - */ - INITIALIZING((byte) 2), - /** - * The indexer is started. - */ - STARTED((byte) 3); - - private byte value; - - IndexerRoutingState(byte value) { - this.value = value; - } - - public byte value() { - return this.value; - } - - public static IndexerRoutingState fromValue(byte value) { - switch (value) { - case 1: - return UNASSIGNED; - case 2: - return INITIALIZING; - case 3: - return STARTED; - default: - throw new ElasticSearchIllegalStateException("No should routing state mapped for [" + value + "]"); - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java index 19b02c34a35..ae9093fc5de 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java @@ -25,7 +25,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -35,7 +39,10 @@ import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.indexer.cluster.IndexerClusterState; import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask; +import org.elasticsearch.indexer.cluster.IndexerNodeHelper; +import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -70,7 +77,7 @@ public class IndexersRouter extends AbstractLifecycleComponent i if (!event.localNodeMaster()) { return; } - if (event.nodesChanged() || event.metaDataChanged()) { + if (event.nodesChanged() || event.metaDataChanged() || event.blocksChanged()) { indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() { @Override public IndexerClusterState execute(IndexerClusterState currentState) { if (!event.state().metaData().hasIndex(indexerIndexName)) { @@ -91,18 +98,21 @@ public class IndexersRouter extends AbstractLifecycleComponent i if (!currentState.routing().hasIndexerByName(mappingType)) { // no indexer, we need to add it to the routing with no node allocation try { + client.admin().indices().prepareRefresh(indexerIndexName).execute().actionGet(); GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet(); if (getResponse.exists()) { String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null); if (indexerType == null) { logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName); } else { - routingBuilder.put(new IndexerRouting(new IndexerName(mappingType, indexerType), IndexerRoutingState.UNASSIGNED, null)); + routingBuilder.put(new IndexerRouting(new IndexerName(indexerType, mappingType), null)); dirty = true; } } + } catch (ClusterBlockException e) { + // ignore, we will get it next time } catch (Exception e) { - logger.warn("failed to get/parse _meta for [{}]", mappingType); + logger.warn("failed to get/parse _meta for [{}]", e, mappingType); } } } @@ -114,11 +124,50 @@ public class IndexersRouter extends AbstractLifecycleComponent i } } - // now, allocate indexers + // build a list from nodes to indexers + Map> nodesToIndexers = Maps.newHashMap(); - // see if we can relocate indexers (we can simply first unassign then, then publish) and then, next round, they will be assigned - // but, we need to make sure that there will *be* next round of this is the logic + for (DiscoveryNode node : event.state().nodes()) { + if (IndexerNodeHelper.isIndexerNode(node)) { + nodesToIndexers.put(node, Lists.newArrayList()); + } + } + List unassigned = Lists.newArrayList(); + for (IndexerRouting routing : routingBuilder.build()) { + if (routing.node() == null) { + unassigned.add(routing); + } else { + List l = nodesToIndexers.get(routing.node()); + if (l == null) { + l = Lists.newArrayList(); + nodesToIndexers.put(routing.node(), l); + } + l.add(routing); + } + } + for (Iterator it = unassigned.iterator(); it.hasNext();) { + IndexerRouting routing = it.next(); + DiscoveryNode smallest = null; + int smallestSize = Integer.MAX_VALUE; + for (Map.Entry> entry : nodesToIndexers.entrySet()) { + if (IndexerNodeHelper.isIndexerNode(entry.getKey(), routing.indexerName())) { + if (entry.getValue().size() < smallestSize) { + smallestSize = entry.getValue().size(); + smallest = entry.getKey(); + } + } + } + if (smallest != null) { + dirty = true; + it.remove(); + routing.node(smallest); + nodesToIndexers.get(smallest).add(routing); + } + } + + + // add relocation logic... if (dirty) { return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 998adb150df..19469240740 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -236,6 +236,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(index, documentMapper.type())) && !mappings.containsKey(documentMapper.type())) { // we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it mapperService.remove(documentMapper.type()); + seenMappings.remove(new Tuple(index, documentMapper.type())); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 15bad8ee2f4..91d82d754ed 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -54,6 +54,8 @@ import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.http.HttpServer; import org.elasticsearch.http.HttpServerModule; +import org.elasticsearch.indexer.IndexerManager; +import org.elasticsearch.indexer.IndexersModule; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -132,7 +134,7 @@ public final class InternalNode implements Node { if (settings.getAsBoolean("http.enabled", true)) { modules.add(new HttpServerModule(settings)); } -// modules.add(new IndexersModule(settings)); + modules.add(new IndexersModule(settings)); modules.add(new IndicesModule(settings)); modules.add(new SearchModule()); modules.add(new TransportActionModule()); @@ -169,7 +171,7 @@ public final class InternalNode implements Node { injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); -// injector.getInstance(IndexerManager.class).start(); + injector.getInstance(IndexerManager.class).start(); injector.getInstance(ClusterService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); @@ -207,7 +209,7 @@ public final class InternalNode implements Node { injector.getInstance(MonitorService.class).stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); -// injector.getInstance(IndexerManager.class).stop(); + injector.getInstance(IndexerManager.class).stop(); injector.getInstance(IndicesClusterStateService.class).stop(); injector.getInstance(IndicesService.class).stop(); injector.getInstance(RestController.class).stop(); @@ -254,7 +256,7 @@ public final class InternalNode implements Node { stopWatch.stop().start("search"); injector.getInstance(SearchService.class).close(); stopWatch.stop().start("indexers"); -// injector.getInstance(IndexerManager.class).close(); + injector.getInstance(IndexerManager.class).close(); stopWatch.stop().start("indices_cluster"); injector.getInstance(IndicesClusterStateService.class).close(); stopWatch.stop().start("indices"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java index 1412ed66950..432760b093f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java @@ -48,7 +48,7 @@ public class RestDeleteMappingAction extends BaseRestHandler { @Inject public RestDeleteMappingAction(Settings settings, Client client, RestController controller) { super(settings, client); controller.registerHandler(DELETE, "/{index}/{type}/_mapping", this); - controller.registerHandler(DELETE, "/{index}/{type}/", this); + controller.registerHandler(DELETE, "/{index}/{type}", this); } @Override public void handleRequest(final RestRequest request, final RestChannel channel) { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/util/concurrent/BlockingThreadPoolTest.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/util/concurrent/BlockingThreadPoolTest.java index 885db54bfe3..ec55790bcef 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/common/util/concurrent/BlockingThreadPoolTest.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/util/concurrent/BlockingThreadPoolTest.java @@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.*; /** * @author kimchy (shay.banon) */ -@Test +@Test(enabled = false) public class BlockingThreadPoolTest { @Test public void testBlocking() throws Exception {