From 08d7125cd52d9eb33a6e8c27c2007193f83373ea Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 17 Sep 2010 00:22:42 +0200 Subject: [PATCH] more work on indexer --- .../xcontent/support/XContentMapValues.java | 7 + .../AbstractConcurrentMapFieldDataCache.java | 6 +- .../indexer/AbstractIndexerComponent.java | 25 +-- ...tingsModule.java => IndexerIndexName.java} | 29 +-- .../elasticsearch/indexer/IndexerManager.java | 9 +- .../elasticsearch/indexer/IndexerModule.java | 26 +-- .../{settings => }/IndexerSettings.java | 4 +- .../elasticsearch/indexer/IndexersModule.java | 3 + .../indexer/IndexersService.java | 53 +++--- .../indexer/cluster/IndexerClusterState.java | 33 +--- .../PublishIndexerClusterStateAction.java | 2 +- .../indexer/dummy/DummyIndexer.java | 9 +- .../indexer/metadata/IndexerMetaData.java | 140 --------------- .../indexer/metadata/IndexersMetaData.java | 169 ------------------ .../indexer/routing/IndexerRouting.java | 48 ++++- .../indexer/routing/IndexerRoutingState.java | 65 +++++++ .../indexer/routing/IndexersRouter.java | 87 ++++++++- .../indexer/routing/IndexersRouting.java | 41 ++++- 18 files changed, 328 insertions(+), 428 deletions(-) rename modules/elasticsearch/src/main/java/org/elasticsearch/indexer/{settings/IndexerSettingsModule.java => IndexerIndexName.java} (61%) rename modules/elasticsearch/src/main/java/org/elasticsearch/indexer/{settings => }/IndexerSettings.java (94%) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java index 3e1feef980c..cb5c7d36e4b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java @@ -35,6 +35,13 @@ public class XContentMapValues { return node instanceof List; } + public static String nodeStringValue(Object node, String defaultValue) { + if (node == null) { + return defaultValue; + } + return node.toString(); + } + public static float nodeFloatValue(Object node) { if (node instanceof Number) { return ((Number) node).floatValue(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/support/AbstractConcurrentMapFieldDataCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/support/AbstractConcurrentMapFieldDataCache.java index b3eceef6e51..9e5738091a8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/support/AbstractConcurrentMapFieldDataCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/support/AbstractConcurrentMapFieldDataCache.java @@ -67,13 +67,17 @@ public abstract class AbstractConcurrentMapFieldDataCache extends AbstractIndexC return cache(type.fieldDataClass(), reader, fieldName); } + protected ConcurrentMap buildFilterMap() { + return ConcurrentCollections.newConcurrentMap(); + } + @Override public T cache(Class type, IndexReader reader, String fieldName) throws IOException { ConcurrentMap fieldDataCache = cache.get(reader.getFieldCacheKey()); if (fieldDataCache == null) { synchronized (creationMutex) { fieldDataCache = cache.get(reader.getFieldCacheKey()); if (fieldDataCache == null) { - fieldDataCache = ConcurrentCollections.newConcurrentMap(); + fieldDataCache = buildFilterMap(); cache.put(reader.getFieldCacheKey(), fieldDataCache); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java index e4c9b6f68ea..1789aa27917 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java @@ -22,7 +22,8 @@ package org.elasticsearch.indexer; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indexer.settings.IndexerSettings; + +import java.util.Map; /** * @author kimchy (shay.banon) @@ -33,24 +34,16 @@ public class AbstractIndexerComponent implements IndexerComponent { protected final IndexerName indexerName; - protected final Settings indexSettings; + protected final Settings settings; - protected final Settings componentSettings; + protected final Map indexerSettings; - protected AbstractIndexerComponent(IndexerName indexerName, @IndexerSettings Settings indexSettings) { + protected AbstractIndexerComponent(IndexerName indexerName, Settings settings, @IndexerSettings Map indexerSettings) { this.indexerName = indexerName; - this.indexSettings = indexSettings; - this.componentSettings = indexSettings.getComponentSettings(getClass()); + this.settings = settings; + this.indexerSettings = indexerSettings; - this.logger = Loggers.getLogger(getClass(), indexSettings, indexerName); - } - - protected AbstractIndexerComponent(IndexerName indexerName, @IndexerSettings Settings indexSettings, String prefixSettings) { - this.indexerName = indexerName; - this.indexSettings = indexSettings; - this.componentSettings = indexSettings.getComponentSettings(prefixSettings, getClass()); - - this.logger = Loggers.getLogger(getClass(), indexSettings, indexerName); + this.logger = Loggers.getLogger(getClass(), settings, indexerName); } @Override public IndexerName indexerName() { @@ -58,6 +51,6 @@ public class AbstractIndexerComponent implements IndexerComponent { } public String nodeName() { - return indexSettings.get("name", ""); + return settings.get("name", ""); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/settings/IndexerSettingsModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerIndexName.java similarity index 61% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/settings/IndexerSettingsModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerIndexName.java index bbb1b533701..f901ff00bd6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/settings/IndexerSettingsModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerIndexName.java @@ -17,23 +17,24 @@ * under the License. */ -package org.elasticsearch.indexer.settings; +package org.elasticsearch.indexer; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.inject.BindingAnnotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.*; +import static java.lang.annotation.RetentionPolicy.*; /** - * @author kimchy (shay.banon) + * @author kimchy (Shay Banon) */ -public class IndexerSettingsModule extends AbstractModule { - private final Settings settings; - - public IndexerSettingsModule(Settings settings) { - this.settings = settings; - } - - @Override protected void configure() { - bind(Settings.class).annotatedWith(IndexerSettings.class).toInstance(settings); - } +@BindingAnnotation +@Target({FIELD, PARAMETER}) +@Retention(RUNTIME) +@Documented +public @interface IndexerIndexName { } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java index 4b1a4b689ed..69fdacb939e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indexer.cluster.IndexerClusterService; +import org.elasticsearch.indexer.routing.IndexersRouter; /** * @author kimchy (shay.banon) @@ -34,23 +35,29 @@ public class IndexerManager extends AbstractLifecycleComponent { private final IndexerClusterService clusterService; - @Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService) { + private final IndexersRouter indexersRouter; + + @Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService, IndexersRouter indexersRouter) { super(settings); this.indexersService = indexersService; this.clusterService = clusterService; + this.indexersRouter = indexersRouter; } @Override protected void doStart() throws ElasticSearchException { + indexersRouter.start(); indexersService.start(); clusterService.start(); } @Override protected void doStop() throws ElasticSearchException { + indexersRouter.stop(); clusterService.stop(); indexersService.stop(); } @Override protected void doClose() throws ElasticSearchException { + indexersRouter.close(); clusterService.close(); indexersService.close(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java index 304d0ec4865..87dba17b4c2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.settings.NoClassSettingsException; import org.elasticsearch.common.settings.Settings; +import java.util.Map; + import static org.elasticsearch.common.Strings.*; /** @@ -37,42 +39,40 @@ public class IndexerModule extends AbstractModule implements SpawnModules { private IndexerName indexerName; - private final Settings settings; + private final Settings globalSettings; - public IndexerModule(IndexerName indexerName, Settings settings) { + private final Map settings; + + public IndexerModule(IndexerName indexerName, Map settings, Settings globalSettings) { this.indexerName = indexerName; + this.globalSettings = globalSettings; this.settings = settings; } @Override public Iterable spawnModules() { - String type = settings.get("indexer.type"); - if (type == null) { - return ImmutableList.of(Modules.createModule(loadTypeModule(indexerName.type(), "org.elasticsearch.indexer.", "IndexerModule"), settings)); - } else { - return ImmutableList.of(Modules.createModule(settings.getAsClass("indexer.type", Module.class, "org.elasticsearch.indexer.", "IndexerModule"), settings)); - } + return ImmutableList.of(Modules.createModule(loadTypeModule(indexerName.type(), "org.elasticsearch.indexer.", "IndexerModule"), globalSettings)); } @Override protected void configure() { - + bind(Map.class).annotatedWith(IndexerSettings.class).toInstance(settings); } private Class loadTypeModule(String type, String prefixPackage, String suffixClassName) { String fullClassName = type; try { - return (Class) settings.getClassLoader().loadClass(fullClassName); + return (Class) globalSettings.getClassLoader().loadClass(fullClassName); } catch (ClassNotFoundException e) { fullClassName = prefixPackage + Strings.capitalize(toCamelCase(type)) + suffixClassName; try { - return (Class) settings.getClassLoader().loadClass(fullClassName); + return (Class) globalSettings.getClassLoader().loadClass(fullClassName); } catch (ClassNotFoundException e1) { fullClassName = prefixPackage + toCamelCase(type) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName; try { - return (Class) settings.getClassLoader().loadClass(fullClassName); + return (Class) globalSettings.getClassLoader().loadClass(fullClassName); } catch (ClassNotFoundException e2) { fullClassName = prefixPackage + toCamelCase(type).toLowerCase() + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName; try { - return (Class) settings.getClassLoader().loadClass(fullClassName); + return (Class) globalSettings.getClassLoader().loadClass(fullClassName); } catch (ClassNotFoundException e3) { throw new NoClassSettingsException("Failed to load class with value [" + type + "]", e); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/settings/IndexerSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java similarity index 94% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/settings/IndexerSettings.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java index 0c10474d058..0e7f685eeb3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/settings/IndexerSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer.settings; +package org.elasticsearch.indexer; import org.elasticsearch.common.inject.BindingAnnotation; @@ -29,7 +29,7 @@ import static java.lang.annotation.ElementType.*; import static java.lang.annotation.RetentionPolicy.*; /** - * @author kimchy (shay.banon) + * @author kimchy (Shay Banon) */ @BindingAnnotation diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java index 250cb139923..d651831b30a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java @@ -22,6 +22,7 @@ package org.elasticsearch.indexer; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indexer.cluster.IndexerClusterService; +import org.elasticsearch.indexer.routing.IndexersRouter; /** * @author kimchy (shay.banon) @@ -35,8 +36,10 @@ public class IndexersModule extends AbstractModule { } @Override protected void configure() { + bind(String.class).annotatedWith(IndexerIndexName.class).toInstance(settings.get("indexer.index_name", "indexer")); bind(IndexersService.class).asEagerSingleton(); bind(IndexerClusterService.class).asEagerSingleton(); + bind(IndexersRouter.class).asEagerSingleton(); bind(IndexerManager.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java index 299d12bce65..f6f8671c366 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java @@ -20,6 +20,9 @@ package org.elasticsearch.indexer; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.ImmutableMap; @@ -36,21 +39,21 @@ import org.elasticsearch.indexer.cluster.IndexerClusterChangedEvent; import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.indexer.cluster.IndexerClusterState; import org.elasticsearch.indexer.cluster.IndexerClusterStateListener; -import org.elasticsearch.indexer.metadata.IndexerMetaData; import org.elasticsearch.indexer.routing.IndexerRouting; -import org.elasticsearch.indexer.settings.IndexerSettingsModule; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; import java.util.concurrent.CountDownLatch; -import static org.elasticsearch.common.settings.ImmutableSettings.*; - /** * @author kimchy (shay.banon) */ public class IndexersService extends AbstractLifecycleComponent { + private final String indexerIndexName; + + private Client client; + private final ThreadPool threadPool; private final ClusterService clusterService; @@ -61,8 +64,10 @@ public class IndexersService extends AbstractLifecycleComponent private volatile ImmutableMap indexers = ImmutableMap.of(); - @Inject public IndexersService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) { + @Inject public IndexersService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) { super(settings); + this.indexerIndexName = settings.get("indexer.index_name", "indexer"); + this.client = client; this.threadPool = threadPool; this.clusterService = clusterService; this.injector = injector; @@ -98,24 +103,16 @@ public class IndexersService extends AbstractLifecycleComponent @Override protected void doClose() throws ElasticSearchException { } - public synchronized Indexer createIndexer(IndexerName indexerName, Settings settings) throws ElasticSearchException { + public synchronized Indexer createIndexer(IndexerName indexerName, Map settings) throws ElasticSearchException { if (indexersInjectors.containsKey(indexerName)) { throw new IndexerException(indexerName, "indexer already exists"); } logger.debug("creating indexer [{}][{}]", indexerName.type(), indexerName.name()); - Settings indexerSettings = settingsBuilder() - .put(this.settings) - .put(settings) - .classLoader(settings.getClassLoader()) - .globalSettings(settings.getGlobalSettings()) - .build(); - ModulesBuilder modules = new ModulesBuilder(); modules.add(new IndexerNameModule(indexerName)); - modules.add(new IndexerSettingsModule(indexerSettings)); - modules.add(new IndexerModule(indexerName, indexerSettings)); + modules.add(new IndexerModule(indexerName, settings, this.settings)); Injector indexInjector = modules.createChildInjector(injector); indexersInjectors.put(indexerName, indexInjector); @@ -170,12 +167,6 @@ public class IndexersService extends AbstractLifecycleComponent // first, go over and delete ones that either don't exists or are not allocated for (IndexerName indexerName : indexers.keySet()) { - // if its not on the metadata, it was deleted, delete it - IndexerMetaData indexerMetaData = state.metaData().indexer(indexerName); - if (indexerMetaData == null) { - deleteIndexer(indexerName); - } - IndexerRouting routing = state.routing().routing(indexerName); if (routing == null || !localNode.equals(routing.node())) { // not routed at all, and not allocated here, clean it (we delete the relevant ones before) @@ -183,15 +174,27 @@ public class IndexersService extends AbstractLifecycleComponent } } - for (IndexerRouting routing : state.routing()) { + for (final IndexerRouting routing : state.routing()) { + // not allocated + if (routing.node() == null) { + continue; + } // only apply changes to the local node if (!routing.node().equals(localNode)) { continue; } + client.prepareGet(indexerIndexName, routing.indexerName().name(), "_meta").execute(new ActionListener() { + @Override public void onResponse(GetResponse getResponse) { + if (getResponse.exists()) { + // only create the indexer if it exists, otherwise, the indexing meta data has not been visible yet... + createIndexer(routing.indexerName(), getResponse.sourceAsMap()); + } + } - IndexerMetaData indexerMetaData = state.metaData().indexer(routing.indexerName()); - - createIndexer(indexerMetaData.indexerName(), indexerMetaData.settings()); + @Override public void onFailure(Throwable e) { + logger.warn("failed to get _meta from [{}]/[{}]", routing.indexerName().type(), routing.indexerName().name()); + } + }); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java index b73fd238c60..fbe17e8543c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java @@ -21,11 +21,8 @@ package org.elasticsearch.indexer.cluster; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indexer.metadata.IndexersMetaData; import org.elasticsearch.indexer.routing.IndexersRouting; -import javax.annotation.Nullable; import java.io.IOException; /** @@ -35,19 +32,15 @@ public class IndexerClusterState { private final long version; - private final IndexersMetaData metaData; - private final IndexersRouting routing; public IndexerClusterState(long version, IndexerClusterState state) { this.version = version; - this.metaData = state.metaData(); this.routing = state.routing(); } - IndexerClusterState(long version, IndexersMetaData metaData, IndexersRouting routing) { + IndexerClusterState(long version, IndexersRouting routing) { this.version = version; - this.metaData = metaData; this.routing = routing; } @@ -55,10 +48,6 @@ public class IndexerClusterState { return this.version; } - public IndexersMetaData metaData() { - return metaData; - } - public IndexersRouting routing() { return routing; } @@ -71,26 +60,14 @@ public class IndexerClusterState { private long version = 0; - private IndexersMetaData metaData; - - private IndexersRouting routing; + private IndexersRouting routing = IndexersRouting.EMPTY; public Builder state(IndexerClusterState state) { this.version = state.version(); - this.metaData = state.metaData(); this.routing = state.routing(); return this; } - public Builder metaData(IndexersMetaData.Builder builder) { - return metaData(builder.build()); - } - - public Builder metaData(IndexersMetaData metaData) { - this.metaData = metaData; - return this; - } - public Builder routing(IndexersRouting.Builder builder) { return routing(builder.build()); } @@ -101,20 +78,18 @@ public class IndexerClusterState { } public IndexerClusterState build() { - return new IndexerClusterState(version, metaData, routing); + return new IndexerClusterState(version, routing); } - public static IndexerClusterState readFrom(StreamInput in, @Nullable Settings settings) throws IOException { + public static IndexerClusterState readFrom(StreamInput in) throws IOException { Builder builder = new Builder(); builder.version = in.readVLong(); - builder.metaData = IndexersMetaData.Builder.readFrom(in, settings); builder.routing = IndexersRouting.Builder.readFrom(in); return builder.build(); } public static void writeTo(IndexerClusterState clusterState, StreamOutput out) throws IOException { out.writeVLong(clusterState.version); - IndexersMetaData.Builder.writeTo(clusterState.metaData, out); IndexersRouting.Builder.writeTo(clusterState.routing, out); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java index 51bd89c6fd2..c6bc142b144 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java @@ -100,7 +100,7 @@ public class PublishIndexerClusterStateAction extends AbstractComponent { } @Override public void readFrom(StreamInput in) throws IOException { - clusterState = IndexerClusterState.Builder.readFrom(in, settings); + clusterState = IndexerClusterState.Builder.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/dummy/DummyIndexer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/dummy/DummyIndexer.java index bcfd3a4d60a..087c92bafa7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/dummy/DummyIndexer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/dummy/DummyIndexer.java @@ -19,20 +19,21 @@ package org.elasticsearch.indexer.dummy; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indexer.AbstractIndexerComponent; import org.elasticsearch.indexer.Indexer; import org.elasticsearch.indexer.IndexerName; -import org.elasticsearch.indexer.settings.IndexerSettings; +import org.elasticsearch.indexer.IndexerSettings; + +import java.util.Map; /** * @author kimchy (shay.banon) */ public class DummyIndexer extends AbstractIndexerComponent implements Indexer { - @Inject public DummyIndexer(IndexerName indexerName, @IndexerSettings Settings indexSettings) { - super(indexerName, indexSettings); + public DummyIndexer(IndexerName indexerName, Settings settings, @IndexerSettings Map indexerSettings) { + super(indexerName, settings, indexerSettings); logger.info("created"); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java deleted file mode 100644 index 6c8409bc1ff..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indexer.metadata; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.indexer.IndexerName; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Map; - -/** - * @author kimchy (shay.banon) - */ -public class IndexerMetaData { - - private final IndexerName indexerName; - - private final Settings settings; - - private IndexerMetaData(IndexerName indexerName, Settings settings) { - this.indexerName = indexerName; - this.settings = settings; - } - - public IndexerName indexerName() { - return indexerName; - } - - public Settings settings() { - return settings; - } - - public static class Builder { - - private IndexerName indexerName; - - private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; - - public Builder(IndexerName indexerName) { - this.indexerName = indexerName; - } - - public Builder settings(Settings.Builder settings) { - this.settings = settings.build(); - return this; - } - - public Builder settings(Settings settings) { - this.settings = settings; - return this; - } - - public IndexerMetaData build() { - return new IndexerMetaData(indexerName, settings); - } - - public static void toXContent(IndexerMetaData indexerMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(indexerMetaData.indexerName().name()); - - builder.field("type", indexerMetaData.indexerName().type()); - - builder.startObject("settings"); - for (Map.Entry entry : indexerMetaData.settings().getAsMap().entrySet()) { - builder.field(entry.getKey(), entry.getValue()); - } - builder.endObject(); - - builder.endObject(); - } - - public static IndexerMetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException { - String name = parser.currentName(); - ImmutableSettings.Builder settingsBuilder = null; - String type = null; - - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if ("settings".equals(currentFieldName)) { - settingsBuilder = ImmutableSettings.settingsBuilder().globalSettings(globalSettings); - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - String key = parser.currentName(); - token = parser.nextToken(); - String value = parser.text(); - settingsBuilder.put(key, value); - } - } - } else if (token.isValue()) { - if ("type".equals(currentFieldName)) { - type = parser.text(); - } - } - } - Builder builder = new Builder(new IndexerName(name, type)); - if (settingsBuilder != null) { - builder.settings(settingsBuilder); - } - return builder.build(); - } - - public static IndexerMetaData readFrom(StreamInput in, Settings globalSettings) throws IOException { - Builder builder = new Builder(new IndexerName(in.readUTF(), in.readUTF())); - builder.settings(ImmutableSettings.readSettingsFromStream(in, globalSettings)); - return builder.build(); - } - - public static void writeTo(IndexerMetaData indexerMetaData, StreamOutput out) throws IOException { - out.writeUTF(indexerMetaData.indexerName().type()); - out.writeUTF(indexerMetaData.indexerName().name()); - ImmutableSettings.writeSettingsToStream(indexerMetaData.settings(), out); - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java deleted file mode 100644 index fd21e4ee692..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indexer.metadata; - -import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.*; -import org.elasticsearch.indexer.IndexerName; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Iterator; - -/** - * @author kimchy (shay.banon) - */ -public class IndexersMetaData implements Iterable { - - private final ImmutableMap indexers; - - private final boolean recoveredFromGateway; - - private IndexersMetaData(ImmutableMap indexers, boolean recoveredFromGateway) { - this.indexers = indexers; - this.recoveredFromGateway = recoveredFromGateway; - } - - @Override public Iterator iterator() { - return indexers.values().iterator(); - } - - public IndexerMetaData indexer(IndexerName indexerName) { - return indexers.get(indexerName); - } - - public boolean recoveredFromGateway() { - return recoveredFromGateway; - } - - public static class Builder { - private MapBuilder indexers = MapBuilder.newMapBuilder(); - - private boolean recoveredFromGateway = false; - - public Builder put(IndexerMetaData.Builder builder) { - return put(builder.build()); - } - - public Builder put(IndexerMetaData indexerMetaData) { - indexers.put(indexerMetaData.indexerName(), indexerMetaData); - return this; - } - - public IndexerMetaData get(IndexerName indexerName) { - return indexers.get(indexerName); - } - - public Builder remove(IndexerName indexerName) { - indexers.remove(indexerName); - return this; - } - - public Builder metaData(IndexersMetaData metaData) { - this.indexers.putAll(metaData.indexers); - this.recoveredFromGateway = metaData.recoveredFromGateway; - return this; - } - - /** - * Indicates that this cluster state has been recovered from the gateawy. - */ - public Builder markAsRecoveredFromGateway() { - this.recoveredFromGateway = true; - return this; - } - - public IndexersMetaData build() { - return new IndexersMetaData(indexers.immutableMap(), recoveredFromGateway); - } - - public static String toXContent(IndexersMetaData metaData) throws IOException { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - builder.startObject(); - toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - return builder.string(); - } - - public static void toXContent(IndexersMetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject("meta-data"); - - builder.startObject("indexers"); - for (IndexerMetaData indexMetaData : metaData) { - IndexerMetaData.Builder.toXContent(indexMetaData, builder, params); - } - builder.endObject(); - - builder.endObject(); - } - - public static IndexersMetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException { - Builder builder = new Builder(); - - XContentParser.Token token = parser.currentToken(); - String currentFieldName = parser.currentName(); - if (!"meta-data".equals(currentFieldName)) { - token = parser.nextToken(); - currentFieldName = parser.currentName(); - if (token == null) { - // no data... - return builder.build(); - } - } - - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if ("indexers".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - builder.put(IndexerMetaData.Builder.fromXContent(parser, globalSettings)); - } - } - } - } - return builder.build(); - } - - public static IndexersMetaData readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException { - Builder builder = new Builder(); - // we only serialize it using readFrom, not in to/from XContent - builder.recoveredFromGateway = in.readBoolean(); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - builder.put(IndexerMetaData.Builder.readFrom(in, globalSettings)); - } - return builder.build(); - } - - public static void writeTo(IndexersMetaData metaData, StreamOutput out) throws IOException { - out.writeBoolean(metaData.recoveredFromGateway()); - out.writeVInt(metaData.indexers.size()); - for (IndexerMetaData indexMetaData : metaData) { - IndexerMetaData.Builder.writeTo(indexMetaData, out); - } - } - - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java index 4b454a6c877..7b648e8a06e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java @@ -20,19 +20,30 @@ package org.elasticsearch.indexer.routing; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.indexer.IndexerName; +import java.io.IOException; + /** * @author kimchy (shay.banon) */ -public class IndexerRouting { +public class IndexerRouting implements Streamable { private IndexerName indexerName; + private IndexerRoutingState state; + private DiscoveryNode node; - IndexerRouting(IndexerName indexerName, DiscoveryNode node) { + private IndexerRouting() { + } + + IndexerRouting(IndexerName indexerName, IndexerRoutingState state, DiscoveryNode node) { this.indexerName = indexerName; + this.state = state; this.node = node; } @@ -40,7 +51,40 @@ public class IndexerRouting { return indexerName; } + /** + * The node the indexer is allocated to, null if its not allocated. + */ public DiscoveryNode node() { return node; } + + public IndexerRoutingState state() { + return this.state; + } + + public static IndexerRouting readIndexerRouting(StreamInput in) throws IOException { + IndexerRouting routing = new IndexerRouting(); + routing.readFrom(in); + return routing; + } + + @Override public void readFrom(StreamInput in) throws IOException { + indexerName = new IndexerName(in.readUTF(), in.readUTF()); + state = IndexerRoutingState.fromValue(in.readByte()); + if (in.readBoolean()) { + node = DiscoveryNode.readNode(in); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(indexerName.type()); + out.writeUTF(indexerName.name()); + out.writeByte(state.value()); + if (node == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + node.writeTo(out); + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java new file mode 100644 index 00000000000..f69f521bf4d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRoutingState.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.routing; + +import org.elasticsearch.ElasticSearchIllegalStateException; + +/** + * The state of the indexer as defined by the cluster. + * + * @author kimchy (shay.banon) + */ +public enum IndexerRoutingState { + /** + * The indexer is not assigned to any node. + */ + UNASSIGNED((byte) 1), + /** + * The indexer is initializing. + */ + INITIALIZING((byte) 2), + /** + * The indexer is started. + */ + STARTED((byte) 3); + + private byte value; + + IndexerRoutingState(byte value) { + this.value = value; + } + + public byte value() { + return this.value; + } + + public static IndexerRoutingState fromValue(byte value) { + switch (value) { + case 1: + return UNASSIGNED; + case 2: + return INITIALIZING; + case 3: + return STARTED; + default: + throw new ElasticSearchIllegalStateException("No should routing state mapped for [" + value + "]"); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java index 6097eadcd86..19b02c34a35 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java @@ -19,34 +19,111 @@ package org.elasticsearch.indexer.routing; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.indexer.cluster.IndexerClusterState; import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask; +import java.util.Map; + /** * @author kimchy (shay.banon) */ -public class IndexersRouter extends AbstractComponent implements ClusterStateListener { +public class IndexersRouter extends AbstractLifecycleComponent implements ClusterStateListener { + + private final String indexerIndexName; + + private final Client client; private final IndexerClusterService indexerClusterService; - @Inject public IndexersRouter(Settings settings, ClusterService clusterService, IndexerClusterService indexerClusterService) { + @Inject public IndexersRouter(Settings settings, Client client, ClusterService clusterService, IndexerClusterService indexerClusterService) { super(settings); + this.indexerIndexName = settings.get("indexer.index_name", "indexer"); this.indexerClusterService = indexerClusterService; + this.client = client; clusterService.add(this); } + @Override protected void doStart() throws ElasticSearchException { + } + + @Override protected void doStop() throws ElasticSearchException { + } + + @Override protected void doClose() throws ElasticSearchException { + } + @Override public void clusterChanged(final ClusterChangedEvent event) { - if (event.nodesChanged()) { + if (!event.localNodeMaster()) { + return; + } + if (event.nodesChanged() || event.metaDataChanged()) { indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() { @Override public IndexerClusterState execute(IndexerClusterState currentState) { - return null; //To change body of implemented methods use File | Settings | File Templates. + if (!event.state().metaData().hasIndex(indexerIndexName)) { + // if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state + if (!currentState.routing().isEmpty()) { + return IndexerClusterState.builder().state(currentState).routing(IndexersRouting.builder()).build(); + } + return currentState; + } + + IndexersRouting.Builder routingBuilder = IndexersRouting.builder().routing(currentState.routing()); + boolean dirty = false; + + IndexMetaData indexMetaData = event.state().metaData().index(indexerIndexName); + // go over and create new indexer routing (with no node) for new types (indexers names) + for (Map.Entry entry : indexMetaData.mappings().entrySet()) { + String mappingType = entry.getKey(); // mapping type is the name of the indexer + if (!currentState.routing().hasIndexerByName(mappingType)) { + // no indexer, we need to add it to the routing with no node allocation + try { + GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet(); + if (getResponse.exists()) { + String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null); + if (indexerType == null) { + logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName); + } else { + routingBuilder.put(new IndexerRouting(new IndexerName(mappingType, indexerType), IndexerRoutingState.UNASSIGNED, null)); + dirty = true; + } + } + } catch (Exception e) { + logger.warn("failed to get/parse _meta for [{}]", mappingType); + } + } + } + // now, remove routings that were deleted + for (IndexerRouting routing : currentState.routing()) { + if (!indexMetaData.mappings().containsKey(routing.indexerName().name())) { + routingBuilder.remove(routing); + dirty = true; + } + } + + // now, allocate indexers + + // see if we can relocate indexers (we can simply first unassign then, then publish) and then, next round, they will be assigned + // but, we need to make sure that there will *be* next round of this is the logic + + + if (dirty) { + return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build(); + } + return currentState; } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java index dc0fc8d3d74..fddda4d5e9c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java @@ -19,7 +19,6 @@ package org.elasticsearch.indexer.routing; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.StreamInput; @@ -34,20 +33,39 @@ import java.util.Iterator; */ public class IndexersRouting implements Iterable { + public static final IndexersRouting EMPTY = IndexersRouting.builder().build(); + private final ImmutableMap indexers; private IndexersRouting(ImmutableMap indexers) { this.indexers = indexers; } + public boolean isEmpty() { + return indexers.isEmpty(); + } + public IndexerRouting routing(IndexerName indexerName) { return indexers.get(indexerName); } + public boolean hasIndexerByName(String name) { + for (IndexerName indexerName : indexers.keySet()) { + if (indexerName.name().equals(name)) { + return true; + } + } + return false; + } + @Override public Iterator iterator() { return indexers.values().iterator(); } + public static Builder builder() { + return new Builder(); + } + public static class Builder { private MapBuilder indexers = MapBuilder.newMapBuilder(); @@ -67,6 +85,20 @@ public class IndexersRouting implements Iterable { return this; } + public Builder remove(IndexerName indexerName) { + indexers.remove(indexerName); + return this; + } + + public Builder remote(String indexerName) { + for (IndexerName name : indexers.map().keySet()) { + if (name.name().equals(indexerName)) { + indexers.remove(name); + } + } + return this; + } + public IndexersRouting build() { return new IndexersRouting(indexers.immutableMap()); } @@ -75,7 +107,7 @@ public class IndexersRouting implements Iterable { Builder builder = new Builder(); int size = in.readVInt(); for (int i = 0; i < size; i++) { - builder.put(new IndexerRouting(new IndexerName(in.readUTF(), in.readUTF()), DiscoveryNode.readNode(in))); + builder.put(IndexerRouting.readIndexerRouting(in)); } return builder.build(); } @@ -83,10 +115,7 @@ public class IndexersRouting implements Iterable { public static void writeTo(IndexersRouting routing, StreamOutput out) throws IOException { out.writeVInt(routing.indexers.size()); for (IndexerRouting indexerRouting : routing) { - out.writeUTF(indexerRouting.indexerName().type()); - out.writeUTF(indexerRouting.indexerName().name()); - - indexerRouting.node().writeTo(out); + indexerRouting.writeTo(out); } } }