diff --git a/.idea/modules.xml b/.idea/modules.xml index beb0b286dda..a916825c075 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -8,8 +8,8 @@ - + diff --git a/.idea/modules/elasticsearch-root.iml b/.idea/modules/elasticsearch-root.iml index 0a0cf74a725..310489f465a 100644 --- a/.idea/modules/elasticsearch-root.iml +++ b/.idea/modules/elasticsearch-root.iml @@ -20,7 +20,7 @@ - + diff --git a/.idea/modules/plugin-indexer-twitter.iml b/.idea/modules/plugin-river-twitter.iml similarity index 61% rename from .idea/modules/plugin-indexer-twitter.iml rename to .idea/modules/plugin-river-twitter.iml index 83898a6783c..51120027adb 100644 --- a/.idea/modules/plugin-indexer-twitter.iml +++ b/.idea/modules/plugin-river-twitter.iml @@ -1,13 +1,13 @@ - - + + - - - - + + + + diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index e7f4fe97506..fbca3c6f16c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -48,10 +48,10 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.service.IndexService; -import org.elasticsearch.indexer.IndexerIndexName; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.timer.TimerService; import java.io.File; @@ -130,7 +130,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#")); return currentState; } - if (!request.index.equals(IndexerIndexName.Conf.DEFAULT_INDEXER_NAME) && request.index.charAt(0) == '_') { + if (!request.index.equals(RiverIndexName.Conf.DEFAULT_INDEX_NAME) && request.index.charAt(0) == '_') { listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'")); return currentState; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/Loggers.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/Loggers.java index a19cac5adf1..2d1345ca14a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/Loggers.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/Loggers.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indexer.IndexerName; +import org.elasticsearch.river.RiverName; import java.net.InetAddress; import java.net.UnknownHostException; @@ -66,11 +66,11 @@ public class Loggers { return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0])); } - public static ESLogger getLogger(Class clazz, Settings settings, IndexerName indexerName, String... prefixes) { + public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) { List l = Lists.newArrayList(); l.add(SPACE); - l.add(indexerName.type()); - l.add(indexerName.name()); + l.add(riverName.type()); + l.add(riverName.name()); l.addAll(Lists.newArrayList(prefixes)); return getLogger(clazz, settings, l.toArray(new String[l.size()])); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/CloseableIndexerComponent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/CloseableIndexerComponent.java deleted file mode 100644 index e6cddc908b7..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/CloseableIndexerComponent.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indexer; - -import org.elasticsearch.ElasticSearchException; - -/** - * @author kimchy (shay.banon) - */ -public interface CloseableIndexerComponent { - - /** - * Closes the indexer component. A boolean indicating if its part of an actual index - * deletion or not is passed. - * - * @param delete true if the index is being deleted. - * @throws org.elasticsearch.ElasticSearchException - * - */ - void close(boolean delete) throws ElasticSearchException; -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java deleted file mode 100644 index fddda4d5e9c..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indexer.routing; - -import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.indexer.IndexerName; - -import java.io.IOException; -import java.util.Iterator; - -/** - * @author kimchy (shay.banon) - */ -public class IndexersRouting implements Iterable { - - public static final IndexersRouting EMPTY = IndexersRouting.builder().build(); - - private final ImmutableMap indexers; - - private IndexersRouting(ImmutableMap indexers) { - this.indexers = indexers; - } - - public boolean isEmpty() { - return indexers.isEmpty(); - } - - public IndexerRouting routing(IndexerName indexerName) { - return indexers.get(indexerName); - } - - public boolean hasIndexerByName(String name) { - for (IndexerName indexerName : indexers.keySet()) { - if (indexerName.name().equals(name)) { - return true; - } - } - return false; - } - - @Override public Iterator iterator() { - return indexers.values().iterator(); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private MapBuilder indexers = MapBuilder.newMapBuilder(); - - public Builder routing(IndexersRouting routing) { - indexers.putAll(routing.indexers); - return this; - } - - public Builder put(IndexerRouting routing) { - indexers.put(routing.indexerName(), routing); - return this; - } - - public Builder remove(IndexerRouting routing) { - indexers.remove(routing.indexerName()); - return this; - } - - public Builder remove(IndexerName indexerName) { - indexers.remove(indexerName); - return this; - } - - public Builder remote(String indexerName) { - for (IndexerName name : indexers.map().keySet()) { - if (name.name().equals(indexerName)) { - indexers.remove(name); - } - } - return this; - } - - public IndexersRouting build() { - return new IndexersRouting(indexers.immutableMap()); - } - - public static IndexersRouting readFrom(StreamInput in) throws IOException { - Builder builder = new Builder(); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - builder.put(IndexerRouting.readIndexerRouting(in)); - } - return builder.build(); - } - - public static void writeTo(IndexersRouting routing, StreamOutput out) throws IOException { - out.writeVInt(routing.indexers.size()); - for (IndexerRouting indexerRouting : routing) { - indexerRouting.writeTo(out); - } - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 91d82d754ed..51f57815db5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -54,8 +54,6 @@ import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.http.HttpServer; import org.elasticsearch.http.HttpServerModule; -import org.elasticsearch.indexer.IndexerManager; -import org.elasticsearch.indexer.IndexersModule; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -69,6 +67,8 @@ import org.elasticsearch.plugins.PluginsModule; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestModule; +import org.elasticsearch.river.RiversManager; +import org.elasticsearch.river.RiversModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchService; @@ -134,7 +134,7 @@ public final class InternalNode implements Node { if (settings.getAsBoolean("http.enabled", true)) { modules.add(new HttpServerModule(settings)); } - modules.add(new IndexersModule(settings)); + modules.add(new RiversModule(settings)); modules.add(new IndicesModule(settings)); modules.add(new SearchModule()); modules.add(new TransportActionModule()); @@ -171,7 +171,7 @@ public final class InternalNode implements Node { injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); - injector.getInstance(IndexerManager.class).start(); + injector.getInstance(RiversManager.class).start(); injector.getInstance(ClusterService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); @@ -209,7 +209,7 @@ public final class InternalNode implements Node { injector.getInstance(MonitorService.class).stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); - injector.getInstance(IndexerManager.class).stop(); + injector.getInstance(RiversManager.class).stop(); injector.getInstance(IndicesClusterStateService.class).stop(); injector.getInstance(IndicesService.class).stop(); injector.getInstance(RestController.class).stop(); @@ -256,7 +256,7 @@ public final class InternalNode implements Node { stopWatch.stop().start("search"); injector.getInstance(SearchService.class).close(); stopWatch.stop().start("indexers"); - injector.getInstance(IndexerManager.class).close(); + injector.getInstance(RiversManager.class).close(); stopWatch.stop().start("indices_cluster"); injector.getInstance(IndicesClusterStateService.class).close(); stopWatch.stop().start("indices"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java similarity index 73% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java index c92c0fa58bb..f898e57404c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/AbstractIndexerComponent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -25,23 +25,23 @@ import org.elasticsearch.common.logging.Loggers; /** * @author kimchy (shay.banon) */ -public class AbstractIndexerComponent implements IndexerComponent { +public class AbstractRiverComponent implements RiverComponent { protected final ESLogger logger; - protected final IndexerName indexerName; + protected final RiverName riverName; - protected final IndexerSettings settings; + protected final RiverSettings settings; - protected AbstractIndexerComponent(IndexerName indexerName, IndexerSettings settings) { - this.indexerName = indexerName; + protected AbstractRiverComponent(RiverName riverName, RiverSettings settings) { + this.riverName = riverName; this.settings = settings; - this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), indexerName); + this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName); } - @Override public IndexerName indexerName() { - return indexerName; + @Override public RiverName riverName() { + return riverName; } public String nodeName() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/River.java similarity index 91% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/River.java index d5960346ae7..47ef816a91d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/Indexer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/River.java @@ -17,12 +17,12 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; /** * @author kimchy (shay.banon) */ -public interface Indexer extends IndexerComponent { +public interface River extends RiverComponent { void start(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerComponent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverComponent.java similarity index 89% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerComponent.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverComponent.java index 0cded877cbb..6ddec5c9042 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerComponent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverComponent.java @@ -17,12 +17,12 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; /** * @author kimchy (shay.banon) */ -public interface IndexerComponent { +public interface RiverComponent { - IndexerName indexerName(); + RiverName riverName(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverException.java similarity index 58% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerException.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverException.java index 689025e6e3e..9e81de328b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverException.java @@ -17,31 +17,31 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.ElasticSearchException; /** * @author kimchy (shay.banon) */ -public class IndexerException extends ElasticSearchException { +public class RiverException extends ElasticSearchException { - private final IndexerName indexer; + private final RiverName river; - public IndexerException(IndexerName indexer, String msg) { - this(indexer, msg, null); + public RiverException(RiverName river, String msg) { + this(river, msg, null); } - public IndexerException(IndexerName indexer, String msg, Throwable cause) { - this(indexer, true, msg, cause); + public RiverException(RiverName river, String msg, Throwable cause) { + this(river, true, msg, cause); } - protected IndexerException(IndexerName indexer, boolean withSpace, String msg, Throwable cause) { - super("[" + indexer.type() + "][" + indexer.name() + "]" + (withSpace ? " " : "") + msg, cause); - this.indexer = indexer; + protected RiverException(RiverName river, boolean withSpace, String msg, Throwable cause) { + super("[" + river.type() + "][" + river.name() + "]" + (withSpace ? " " : "") + msg, cause); + this.river = river; } - public IndexerName indexerName() { - return indexer; + public RiverName riverName() { + return river; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerIndexName.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverIndexName.java similarity index 86% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerIndexName.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverIndexName.java index 2122d5d0ec2..d4d2dfed2e8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerIndexName.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverIndexName.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.common.inject.BindingAnnotation; import org.elasticsearch.common.settings.Settings; @@ -37,13 +37,13 @@ import static java.lang.annotation.RetentionPolicy.*; @Target({FIELD, PARAMETER}) @Retention(RUNTIME) @Documented -public @interface IndexerIndexName { +public @interface RiverIndexName { static class Conf { - public static String DEFAULT_INDEXER_NAME = "_indexer"; + public static String DEFAULT_INDEX_NAME = "_river"; public static String indexName(Settings settings) { - return settings.get("indexer.index_name", DEFAULT_INDEXER_NAME); + return settings.get("river.index_name", DEFAULT_INDEX_NAME); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverModule.java similarity index 86% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverModule.java index 54c6053a763..07a43d8a800 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverModule.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableList; @@ -35,26 +35,26 @@ import static org.elasticsearch.common.Strings.*; /** * @author kimchy (shay.banon) */ -public class IndexerModule extends AbstractModule implements SpawnModules { +public class RiverModule extends AbstractModule implements SpawnModules { - private IndexerName indexerName; + private RiverName riverName; private final Settings globalSettings; private final Map settings; - public IndexerModule(IndexerName indexerName, Map settings, Settings globalSettings) { - this.indexerName = indexerName; + public RiverModule(RiverName riverName, Map settings, Settings globalSettings) { + this.riverName = riverName; this.globalSettings = globalSettings; this.settings = settings; } @Override public Iterable spawnModules() { - return ImmutableList.of(Modules.createModule(loadTypeModule(indexerName.type(), "org.elasticsearch.indexer.", "IndexerModule"), globalSettings)); + return ImmutableList.of(Modules.createModule(loadTypeModule(riverName.type(), "org.elasticsearch.river.", "RiverModule"), globalSettings)); } @Override protected void configure() { - bind(IndexerSettings.class).toInstance(new IndexerSettings(globalSettings, settings)); + bind(RiverSettings.class).toInstance(new RiverSettings(globalSettings, settings)); } private Class loadTypeModule(String type, String prefixPackage, String suffixClassName) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerName.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverName.java similarity index 90% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerName.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverName.java index 5db043bb829..74b1ef67745 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerName.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverName.java @@ -17,20 +17,20 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import java.io.Serializable; /** * @author kimchy (shay.banon) */ -public class IndexerName implements Serializable { +public class RiverName implements Serializable { private final String type; private final String name; - public IndexerName(String type, String name) { + public RiverName(String type, String name) { this.type = type; this.name = name; } @@ -55,7 +55,7 @@ public class IndexerName implements Serializable { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - IndexerName that = (IndexerName) o; + RiverName that = (RiverName) o; if (name != null ? !name.equals(that.name) : that.name != null) return false; if (type != null ? !type.equals(that.type) : that.type != null) return false; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerNameModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverNameModule.java similarity index 77% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerNameModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverNameModule.java index e5b4ec62f18..cdb20086c6b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerNameModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverNameModule.java @@ -17,22 +17,22 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.common.inject.AbstractModule; /** * @author kimchy (shay.banon) */ -public class IndexerNameModule extends AbstractModule { +public class RiverNameModule extends AbstractModule { - private final IndexerName indexerName; + private final RiverName riverName; - public IndexerNameModule(IndexerName indexerName) { - this.indexerName = indexerName; + public RiverNameModule(RiverName riverName) { + this.riverName = riverName; } @Override protected void configure() { - bind(IndexerName.class).toInstance(indexerName); + bind(RiverName.class).toInstance(riverName); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverSettings.java similarity index 89% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverSettings.java index 1fdd460e089..0fed09556e0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiverSettings.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.common.settings.Settings; @@ -27,13 +27,13 @@ import java.util.Map; * @author kimchy (shayy.banon) */ -public class IndexerSettings { +public class RiverSettings { private final Settings globalSettings; private final Map settings; - public IndexerSettings(Settings globalSettings, Map settings) { + public RiverSettings(Settings globalSettings, Map settings) { this.globalSettings = globalSettings; this.settings = settings; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversManager.java similarity index 63% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversManager.java index 69fdacb939e..253947664f8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversManager.java @@ -17,48 +17,48 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indexer.cluster.IndexerClusterService; -import org.elasticsearch.indexer.routing.IndexersRouter; +import org.elasticsearch.river.cluster.RiverClusterService; +import org.elasticsearch.river.routing.RiversRouter; /** * @author kimchy (shay.banon) */ -public class IndexerManager extends AbstractLifecycleComponent { +public class RiversManager extends AbstractLifecycleComponent { - private final IndexersService indexersService; + private final RiversService riversService; - private final IndexerClusterService clusterService; + private final RiverClusterService clusterService; - private final IndexersRouter indexersRouter; + private final RiversRouter riversRouter; - @Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService, IndexersRouter indexersRouter) { + @Inject public RiversManager(Settings settings, RiversService riversService, RiverClusterService clusterService, RiversRouter riversRouter) { super(settings); - this.indexersService = indexersService; + this.riversService = riversService; this.clusterService = clusterService; - this.indexersRouter = indexersRouter; + this.riversRouter = riversRouter; } @Override protected void doStart() throws ElasticSearchException { - indexersRouter.start(); - indexersService.start(); + riversRouter.start(); + riversService.start(); clusterService.start(); } @Override protected void doStop() throws ElasticSearchException { - indexersRouter.stop(); + riversRouter.stop(); clusterService.stop(); - indexersService.stop(); + riversService.stop(); } @Override protected void doClose() throws ElasticSearchException { - indexersRouter.close(); + riversRouter.close(); clusterService.close(); - indexersService.close(); + riversService.close(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversModule.java similarity index 64% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversModule.java index b47a2f861bf..056fb95392c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversModule.java @@ -17,29 +17,29 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indexer.cluster.IndexerClusterService; -import org.elasticsearch.indexer.routing.IndexersRouter; +import org.elasticsearch.river.cluster.RiverClusterService; +import org.elasticsearch.river.routing.RiversRouter; /** * @author kimchy (shay.banon) */ -public class IndexersModule extends AbstractModule { +public class RiversModule extends AbstractModule { private final Settings settings; - public IndexersModule(Settings settings) { + public RiversModule(Settings settings) { this.settings = settings; } @Override protected void configure() { - bind(String.class).annotatedWith(IndexerIndexName.class).toInstance(IndexerIndexName.Conf.indexName(settings)); - bind(IndexersService.class).asEagerSingleton(); - bind(IndexerClusterService.class).asEagerSingleton(); - bind(IndexersRouter.class).asEagerSingleton(); - bind(IndexerManager.class).asEagerSingleton(); + bind(String.class).annotatedWith(RiverIndexName.class).toInstance(RiverIndexName.Conf.indexName(settings)); + bind(RiversService.class).asEagerSingleton(); + bind(RiverClusterService.class).asEagerSingleton(); + bind(RiversRouter.class).asEagerSingleton(); + bind(RiversManager.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java similarity index 53% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java index a7282628b95..1956ee39171 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer; +package org.elasticsearch.river; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; @@ -35,11 +35,11 @@ import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indexer.cluster.IndexerClusterChangedEvent; -import org.elasticsearch.indexer.cluster.IndexerClusterService; -import org.elasticsearch.indexer.cluster.IndexerClusterState; -import org.elasticsearch.indexer.cluster.IndexerClusterStateListener; -import org.elasticsearch.indexer.routing.IndexerRouting; +import org.elasticsearch.river.cluster.RiverClusterChangedEvent; +import org.elasticsearch.river.cluster.RiverClusterService; +import org.elasticsearch.river.cluster.RiverClusterState; +import org.elasticsearch.river.cluster.RiverClusterStateListener; +import org.elasticsearch.river.routing.RiverRouting; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; @@ -48,9 +48,9 @@ import java.util.concurrent.CountDownLatch; /** * @author kimchy (shay.banon) */ -public class IndexersService extends AbstractLifecycleComponent { +public class RiversService extends AbstractLifecycleComponent { - private final String indexerIndexName; + private final String riverIndexName; private Client client; @@ -60,33 +60,33 @@ public class IndexersService extends AbstractLifecycleComponent private final Injector injector; - private final Map indexersInjectors = Maps.newHashMap(); + private final Map riversInjectors = Maps.newHashMap(); - private volatile ImmutableMap indexers = ImmutableMap.of(); + private volatile ImmutableMap rivers = ImmutableMap.of(); - @Inject public IndexersService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) { + @Inject public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiverClusterService riverClusterService, Injector injector) { super(settings); - this.indexerIndexName = IndexerIndexName.Conf.indexName(settings); + this.riverIndexName = RiverIndexName.Conf.indexName(settings); this.client = client; this.threadPool = threadPool; this.clusterService = clusterService; this.injector = injector; - indexerClusterService.add(new ApplyIndexers()); + riverClusterService.add(new ApplyRivers()); } @Override protected void doStart() throws ElasticSearchException { } @Override protected void doStop() throws ElasticSearchException { - ImmutableSet indices = ImmutableSet.copyOf(this.indexers.keySet()); + ImmutableSet indices = ImmutableSet.copyOf(this.rivers.keySet()); final CountDownLatch latch = new CountDownLatch(indices.size()); - for (final IndexerName indexerName : indices) { + for (final RiverName riverName : indices) { threadPool.cached().execute(new Runnable() { @Override public void run() { try { - closeIndexer(indexerName); + closeRiver(riverName); } catch (Exception e) { - logger.warn("failed to delete indexer on stop [{}]/[{}]", e, indexerName.type(), indexerName.name()); + logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name()); } finally { latch.countDown(); } @@ -103,68 +103,64 @@ public class IndexersService extends AbstractLifecycleComponent @Override protected void doClose() throws ElasticSearchException { } - public synchronized Indexer createIndexer(IndexerName indexerName, Map settings) throws ElasticSearchException { - if (indexersInjectors.containsKey(indexerName)) { - throw new IndexerException(indexerName, "indexer already exists"); + public synchronized River createRiver(RiverName riverName, Map settings) throws ElasticSearchException { + if (riversInjectors.containsKey(riverName)) { + throw new RiverException(riverName, "river already exists"); } - logger.debug("creating indexer [{}][{}]", indexerName.type(), indexerName.name()); + logger.debug("creating river [{}][{}]", riverName.type(), riverName.name()); ModulesBuilder modules = new ModulesBuilder(); - modules.add(new IndexerNameModule(indexerName)); - modules.add(new IndexerModule(indexerName, settings, this.settings)); + modules.add(new RiverNameModule(riverName)); + modules.add(new RiverModule(riverName, settings, this.settings)); Injector indexInjector = modules.createChildInjector(injector); - indexersInjectors.put(indexerName, indexInjector); - Indexer indexer = indexInjector.getInstance(Indexer.class); - indexers = MapBuilder.newMapBuilder(indexers).put(indexerName, indexer).immutableMap(); + riversInjectors.put(riverName, indexInjector); + River river = indexInjector.getInstance(River.class); + rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap(); // we need this start so there can be operations done (like creating an index) which can't be // done on create since Guice can't create two concurrent child injectors - indexer.start(); - return indexer; + river.start(); + return river; } - public synchronized void closeIndexer(IndexerName indexerName) throws ElasticSearchException { - Injector indexerInjector; - Indexer indexer; + public synchronized void closeRiver(RiverName riverName) throws ElasticSearchException { + Injector riverInjector; + River river; synchronized (this) { - indexerInjector = indexersInjectors.remove(indexerName); - if (indexerInjector == null) { - throw new IndexerException(indexerName, "missing"); + riverInjector = riversInjectors.remove(riverName); + if (riverInjector == null) { + throw new RiverException(riverName, "missing"); } - logger.debug("closing indexer [{}][{}]", indexerName.type(), indexerName.name()); + logger.debug("closing river [{}][{}]", riverName.type(), riverName.name()); - Map tmpMap = Maps.newHashMap(indexers); - indexer = tmpMap.remove(indexerName); - indexers = ImmutableMap.copyOf(tmpMap); + Map tmpMap = Maps.newHashMap(rivers); + river = tmpMap.remove(riverName); + rivers = ImmutableMap.copyOf(tmpMap); } -// for (Class closeable : pluginsService.indexServices()) { -// indexerInjector.getInstance(closeable).close(delete); -// } - - indexer.close(); + river.close(); Injectors.close(injector); } - private class ApplyIndexers implements IndexerClusterStateListener { - @Override public void indexerClusterChanged(IndexerClusterChangedEvent event) { + private class ApplyRivers implements RiverClusterStateListener { + @Override public void riverClusterChanged(RiverClusterChangedEvent event) { DiscoveryNode localNode = clusterService.localNode(); - IndexerClusterState state = event.state(); + RiverClusterState state = event.state(); // first, go over and delete ones that either don't exists or are not allocated - for (IndexerName indexerName : indexers.keySet()) { - IndexerRouting routing = state.routing().routing(indexerName); + for (RiverName riverName : rivers.keySet()) { + RiverRouting routing = state.routing().routing(riverName); if (routing == null || !localNode.equals(routing.node())) { // not routed at all, and not allocated here, clean it (we delete the relevant ones before) - closeIndexer(indexerName); + closeRiver(riverName); } } - for (final IndexerRouting routing : state.routing()) { + for (final RiverRouting routing : state.routing()) { // not allocated if (routing.node() == null) { continue; @@ -174,21 +170,21 @@ public class IndexersService extends AbstractLifecycleComponent continue; } // if its already created, ignore it - if (indexers.containsKey(routing.indexerName())) { + if (rivers.containsKey(routing.riverName())) { continue; } - client.prepareGet(indexerIndexName, routing.indexerName().name(), "_meta").execute(new ActionListener() { + client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").execute(new ActionListener() { @Override public void onResponse(GetResponse getResponse) { - if (!indexers.containsKey(routing.indexerName())) { + if (!rivers.containsKey(routing.riverName())) { if (getResponse.exists()) { - // only create the indexer if it exists, otherwise, the indexing meta data has not been visible yet... - createIndexer(routing.indexerName(), getResponse.sourceAsMap()); + // only create the river if it exists, otherwise, the indexing meta data has not been visible yet... + createRiver(routing.riverName(), getResponse.sourceAsMap()); } } } @Override public void onFailure(Throwable e) { - logger.warn("failed to get _meta from [{}]/[{}]", e, routing.indexerName().type(), routing.indexerName().name()); + logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name()); } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java similarity index 80% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java index c6bc142b144..9f3ebbdee50 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer.cluster; +package org.elasticsearch.river.cluster; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -35,10 +35,10 @@ import java.io.IOException; /** * @author kimchy (shay.banon) */ -public class PublishIndexerClusterStateAction extends AbstractComponent { +public class PublishRiverClusterStateAction extends AbstractComponent { public static interface NewClusterStateListener { - void onNewClusterState(IndexerClusterState clusterState); + void onNewClusterState(RiverClusterState clusterState); } private final TransportService transportService; @@ -47,8 +47,8 @@ public class PublishIndexerClusterStateAction extends AbstractComponent { private final NewClusterStateListener listener; - public PublishIndexerClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, - NewClusterStateListener listener) { + public PublishRiverClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, + NewClusterStateListener listener) { super(settings); this.transportService = transportService; this.clusterService = clusterService; @@ -60,7 +60,7 @@ public class PublishIndexerClusterStateAction extends AbstractComponent { transportService.removeHandler(PublishClusterStateRequestHandler.ACTION); } - public void publish(IndexerClusterState clusterState) { + public void publish(RiverClusterState clusterState) { final DiscoveryNodes discoNodes = clusterService.state().nodes(); for (final DiscoveryNode node : discoNodes) { if (node.equals(discoNodes.localNode())) { @@ -68,15 +68,15 @@ public class PublishIndexerClusterStateAction extends AbstractComponent { continue; } - // we only want to send nodes that are either possible master nodes or indexer nodes - // master nodes because they will handle the state and the allocation of indexers - // and indexer nodes since they will end up creating indexes + // we only want to send nodes that are either possible master nodes or river nodes + // master nodes because they will handle the state and the allocation of rivers + // and river nodes since they will end up creating indexes if (node.clientNode()) { continue; } - if (!node.masterNode() && !IndexerNodeHelper.isIndexerNode(node)) { + if (!node.masterNode() && !RiverNodeHelper.isRiverNode(node)) { continue; } @@ -90,27 +90,27 @@ public class PublishIndexerClusterStateAction extends AbstractComponent { private class PublishClusterStateRequest implements Streamable { - private IndexerClusterState clusterState; + private RiverClusterState clusterState; private PublishClusterStateRequest() { } - private PublishClusterStateRequest(IndexerClusterState clusterState) { + private PublishClusterStateRequest(RiverClusterState clusterState) { this.clusterState = clusterState; } @Override public void readFrom(StreamInput in) throws IOException { - clusterState = IndexerClusterState.Builder.readFrom(in); + clusterState = RiverClusterState.Builder.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { - IndexerClusterState.Builder.writeTo(clusterState, out); + RiverClusterState.Builder.writeTo(clusterState, out); } } private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler { - static final String ACTION = "indexer/state/publish"; + static final String ACTION = "river/state/publish"; @Override public PublishClusterStateRequest newInstance() { return new PublishClusterStateRequest(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterChangedEvent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java similarity index 75% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterChangedEvent.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java index 1bcaacfd313..d31bed2091d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterChangedEvent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java @@ -17,20 +17,20 @@ * under the License. */ -package org.elasticsearch.indexer.cluster; +package org.elasticsearch.river.cluster; /** * @author kimchy (shay.banon) */ -public class IndexerClusterChangedEvent { +public class RiverClusterChangedEvent { private final String source; - private final IndexerClusterState previousState; + private final RiverClusterState previousState; - private final IndexerClusterState state; + private final RiverClusterState state; - public IndexerClusterChangedEvent(String source, IndexerClusterState state, IndexerClusterState previousState) { + public RiverClusterChangedEvent(String source, RiverClusterState state, RiverClusterState previousState) { this.source = source; this.state = state; this.previousState = previousState; @@ -43,11 +43,11 @@ public class IndexerClusterChangedEvent { return this.source; } - public IndexerClusterState state() { + public RiverClusterState state() { return this.state; } - public IndexerClusterState previousState() { + public RiverClusterState previousState() { return this.previousState; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java similarity index 74% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java index fccfeb2e063..cef5cf55c5f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer.cluster; +package org.elasticsearch.river.cluster; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterService; @@ -38,27 +38,27 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** * @author kimchy (shay.banon) */ -public class IndexerClusterService extends AbstractLifecycleComponent { +public class RiverClusterService extends AbstractLifecycleComponent { private final ClusterService clusterService; - private final PublishIndexerClusterStateAction publishAction; + private final PublishRiverClusterStateAction publishAction; - private final List clusterStateListeners = new CopyOnWriteArrayList(); + private final List clusterStateListeners = new CopyOnWriteArrayList(); private volatile ExecutorService updateTasksExecutor; - private volatile IndexerClusterState clusterState = IndexerClusterState.builder().build(); + private volatile RiverClusterState clusterState = RiverClusterState.builder().build(); - @Inject public IndexerClusterService(Settings settings, TransportService transportService, ClusterService clusterService) { + @Inject public RiverClusterService(Settings settings, TransportService transportService, ClusterService clusterService) { super(settings); this.clusterService = clusterService; - this.publishAction = new PublishIndexerClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener()); + this.publishAction = new PublishRiverClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener()); } @Override protected void doStart() throws ElasticSearchException { - this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "indexerClusterService#updateTask")); + this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "riverClusterService#updateTask")); } @Override protected void doStop() throws ElasticSearchException { @@ -73,15 +73,15 @@ public class IndexerClusterService extends AbstractLifecycleComponentnull if its not allocated. + * The node the river is allocated to, null if its not allocated. */ public DiscoveryNode node() { return node; @@ -59,22 +59,22 @@ public class IndexerRouting implements Streamable { this.node = node; } - public static IndexerRouting readIndexerRouting(StreamInput in) throws IOException { - IndexerRouting routing = new IndexerRouting(); + public static RiverRouting readRiverRouting(StreamInput in) throws IOException { + RiverRouting routing = new RiverRouting(); routing.readFrom(in); return routing; } @Override public void readFrom(StreamInput in) throws IOException { - indexerName = new IndexerName(in.readUTF(), in.readUTF()); + riverName = new RiverName(in.readUTF(), in.readUTF()); if (in.readBoolean()) { node = DiscoveryNode.readNode(in); } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(indexerName.type()); - out.writeUTF(indexerName.name()); + out.writeUTF(riverName.type()); + out.writeUTF(riverName.name()); if (node == null) { out.writeBoolean(false); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/routing/RiversRouter.java similarity index 59% rename from modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/river/routing/RiversRouter.java index 9841e58f476..edaf5bedbf8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/routing/RiversRouter.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer.routing; +package org.elasticsearch.river.routing; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.get.GetResponse; @@ -35,13 +35,13 @@ import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.indexer.IndexerIndexName; -import org.elasticsearch.indexer.IndexerName; -import org.elasticsearch.indexer.cluster.IndexerClusterService; -import org.elasticsearch.indexer.cluster.IndexerClusterState; -import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask; -import org.elasticsearch.indexer.cluster.IndexerNodeHelper; import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.river.RiverIndexName; +import org.elasticsearch.river.RiverName; +import org.elasticsearch.river.cluster.RiverClusterService; +import org.elasticsearch.river.cluster.RiverClusterState; +import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask; +import org.elasticsearch.river.cluster.RiverNodeHelper; import java.util.Iterator; import java.util.List; @@ -50,18 +50,18 @@ import java.util.Map; /** * @author kimchy (shay.banon) */ -public class IndexersRouter extends AbstractLifecycleComponent implements ClusterStateListener { +public class RiversRouter extends AbstractLifecycleComponent implements ClusterStateListener { - private final String indexerIndexName; + private final String riverIndexName; private final Client client; - private final IndexerClusterService indexerClusterService; + private final RiverClusterService riverClusterService; - @Inject public IndexersRouter(Settings settings, Client client, ClusterService clusterService, IndexerClusterService indexerClusterService) { + @Inject public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService) { super(settings); - this.indexerIndexName = IndexerIndexName.Conf.indexName(settings); - this.indexerClusterService = indexerClusterService; + this.riverIndexName = RiverIndexName.Conf.indexName(settings); + this.riverClusterService = riverClusterService; this.client = client; clusterService.add(this); } @@ -80,34 +80,34 @@ public class IndexersRouter extends AbstractLifecycleComponent i return; } if (event.nodesChanged() || event.metaDataChanged() || event.blocksChanged()) { - indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() { - @Override public IndexerClusterState execute(IndexerClusterState currentState) { - if (!event.state().metaData().hasIndex(indexerIndexName)) { + riverClusterService.submitStateUpdateTask("reroute_rivers_node_changed", new RiverClusterStateUpdateTask() { + @Override public RiverClusterState execute(RiverClusterState currentState) { + if (!event.state().metaData().hasIndex(riverIndexName)) { // if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state if (!currentState.routing().isEmpty()) { - return IndexerClusterState.builder().state(currentState).routing(IndexersRouting.builder()).build(); + return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build(); } return currentState; } - IndexersRouting.Builder routingBuilder = IndexersRouting.builder().routing(currentState.routing()); + RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing()); boolean dirty = false; - IndexMetaData indexMetaData = event.state().metaData().index(indexerIndexName); - // go over and create new indexer routing (with no node) for new types (indexers names) + IndexMetaData indexMetaData = event.state().metaData().index(riverIndexName); + // go over and create new river routing (with no node) for new types (rivers names) for (Map.Entry entry : indexMetaData.mappings().entrySet()) { - String mappingType = entry.getKey(); // mapping type is the name of the indexer - if (!currentState.routing().hasIndexerByName(mappingType)) { - // no indexer, we need to add it to the routing with no node allocation + String mappingType = entry.getKey(); // mapping type is the name of the river + if (!currentState.routing().hasRiverByName(mappingType)) { + // no river, we need to add it to the routing with no node allocation try { - client.admin().indices().prepareRefresh(indexerIndexName).execute().actionGet(); - GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet(); + client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet(); + GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").execute().actionGet(); if (getResponse.exists()) { - String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null); - if (indexerType == null) { - logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName); + String riverType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null); + if (riverType == null) { + logger.warn("no river type provided for [{}], ignoring...", riverIndexName); } else { - routingBuilder.put(new IndexerRouting(new IndexerName(indexerType, mappingType), null)); + routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null)); dirty = true; } } @@ -121,41 +121,41 @@ public class IndexersRouter extends AbstractLifecycleComponent i } } // now, remove routings that were deleted - for (IndexerRouting routing : currentState.routing()) { - if (!indexMetaData.mappings().containsKey(routing.indexerName().name())) { + for (RiverRouting routing : currentState.routing()) { + if (!indexMetaData.mappings().containsKey(routing.riverName().name())) { routingBuilder.remove(routing); dirty = true; } } - // build a list from nodes to indexers - Map> nodesToIndexers = Maps.newHashMap(); + // build a list from nodes to rivers + Map> nodesToRivers = Maps.newHashMap(); for (DiscoveryNode node : event.state().nodes()) { - if (IndexerNodeHelper.isIndexerNode(node)) { - nodesToIndexers.put(node, Lists.newArrayList()); + if (RiverNodeHelper.isRiverNode(node)) { + nodesToRivers.put(node, Lists.newArrayList()); } } - List unassigned = Lists.newArrayList(); - for (IndexerRouting routing : routingBuilder.build()) { + List unassigned = Lists.newArrayList(); + for (RiverRouting routing : routingBuilder.build()) { if (routing.node() == null) { unassigned.add(routing); } else { - List l = nodesToIndexers.get(routing.node()); + List l = nodesToRivers.get(routing.node()); if (l == null) { l = Lists.newArrayList(); - nodesToIndexers.put(routing.node(), l); + nodesToRivers.put(routing.node(), l); } l.add(routing); } } - for (Iterator it = unassigned.iterator(); it.hasNext();) { - IndexerRouting routing = it.next(); + for (Iterator it = unassigned.iterator(); it.hasNext();) { + RiverRouting routing = it.next(); DiscoveryNode smallest = null; int smallestSize = Integer.MAX_VALUE; - for (Map.Entry> entry : nodesToIndexers.entrySet()) { - if (IndexerNodeHelper.isIndexerNode(entry.getKey(), routing.indexerName())) { + for (Map.Entry> entry : nodesToRivers.entrySet()) { + if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) { if (entry.getValue().size() < smallestSize) { smallestSize = entry.getValue().size(); smallest = entry.getKey(); @@ -166,7 +166,7 @@ public class IndexersRouter extends AbstractLifecycleComponent i dirty = true; it.remove(); routing.node(smallest); - nodesToIndexers.get(smallest).add(routing); + nodesToRivers.get(smallest).add(routing); } } @@ -174,7 +174,7 @@ public class IndexersRouter extends AbstractLifecycleComponent i // add relocation logic... if (dirty) { - return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build(); + return RiverClusterState.builder().state(currentState).routing(routingBuilder).build(); } return currentState; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/river/routing/RiversRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/routing/RiversRouting.java new file mode 100644 index 00000000000..5ffa919b811 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/routing/RiversRouting.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.routing; + +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.river.RiverName; + +import java.io.IOException; +import java.util.Iterator; + +/** + * @author kimchy (shay.banon) + */ +public class RiversRouting implements Iterable { + + public static final RiversRouting EMPTY = RiversRouting.builder().build(); + + private final ImmutableMap rivers; + + private RiversRouting(ImmutableMap rivers) { + this.rivers = rivers; + } + + public boolean isEmpty() { + return rivers.isEmpty(); + } + + public RiverRouting routing(RiverName riverName) { + return rivers.get(riverName); + } + + public boolean hasRiverByName(String name) { + for (RiverName riverName : rivers.keySet()) { + if (riverName.name().equals(name)) { + return true; + } + } + return false; + } + + @Override public Iterator iterator() { + return rivers.values().iterator(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private MapBuilder rivers = MapBuilder.newMapBuilder(); + + public Builder routing(RiversRouting routing) { + rivers.putAll(routing.rivers); + return this; + } + + public Builder put(RiverRouting routing) { + rivers.put(routing.riverName(), routing); + return this; + } + + public Builder remove(RiverRouting routing) { + rivers.remove(routing.riverName()); + return this; + } + + public Builder remove(RiverName riverName) { + rivers.remove(riverName); + return this; + } + + public Builder remote(String riverName) { + for (RiverName name : rivers.map().keySet()) { + if (name.name().equals(riverName)) { + rivers.remove(name); + } + } + return this; + } + + public RiversRouting build() { + return new RiversRouting(rivers.immutableMap()); + } + + public static RiversRouting readFrom(StreamInput in) throws IOException { + Builder builder = new Builder(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + builder.put(RiverRouting.readRiverRouting(in)); + } + return builder.build(); + } + + public static void writeTo(RiversRouting routing, StreamOutput out) throws IOException { + out.writeVInt(routing.rivers.size()); + for (RiverRouting riverRouting : routing) { + riverRouting.writeTo(out); + } + } + } +} diff --git a/plugins/indexer/twitter/src/main/java/es-plugin.properties b/plugins/indexer/twitter/src/main/java/es-plugin.properties deleted file mode 100644 index d0ef70b6592..00000000000 --- a/plugins/indexer/twitter/src/main/java/es-plugin.properties +++ /dev/null @@ -1,2 +0,0 @@ -plugin=org.elasticsearch.plugin.indexer.twitter.IndexerTwitterPlugin - diff --git a/plugins/indexer/twitter/build.gradle b/plugins/river/twitter/build.gradle similarity index 94% rename from plugins/indexer/twitter/build.gradle rename to plugins/river/twitter/build.gradle index 93a2fd4066e..4d39cacb8e2 100644 --- a/plugins/indexer/twitter/build.gradle +++ b/plugins/river/twitter/build.gradle @@ -3,11 +3,11 @@ dependsOn(':elasticsearch') apply plugin: 'java' apply plugin: 'maven' -archivesBaseName = "elasticsearch-indexer-twitter" +archivesBaseName = "elasticsearch-river-twitter" explodedDistDir = new File(distsDir, 'exploded') -manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::Indexer::Twitter", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) +manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::River::Twitter", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) configurations.compile.transitive = true configurations.testCompile.transitive = true @@ -117,7 +117,7 @@ uploadArchives { pom.project { inceptionYear '2009' - name 'elasticsearch-plugins-indexer-twitter' + name 'elasticsearch-plugins-river-twitter' description 'Attachments Plugin for ElasticSearch' licenses { license { diff --git a/plugins/river/twitter/src/main/java/es-plugin.properties b/plugins/river/twitter/src/main/java/es-plugin.properties new file mode 100644 index 00000000000..1a150f7e641 --- /dev/null +++ b/plugins/river/twitter/src/main/java/es-plugin.properties @@ -0,0 +1,2 @@ +plugin=org.elasticsearch.plugin.river.twitter.RiverTwitterPlugin + diff --git a/plugins/indexer/twitter/src/main/java/org/elasticsearch/plugin/indexer/twitter/IndexerTwitterPlugin.java b/plugins/river/twitter/src/main/java/org/elasticsearch/plugin/river/twitter/RiverTwitterPlugin.java similarity index 82% rename from plugins/indexer/twitter/src/main/java/org/elasticsearch/plugin/indexer/twitter/IndexerTwitterPlugin.java rename to plugins/river/twitter/src/main/java/org/elasticsearch/plugin/river/twitter/RiverTwitterPlugin.java index 49b63729c94..7a310ccaefa 100644 --- a/plugins/indexer/twitter/src/main/java/org/elasticsearch/plugin/indexer/twitter/IndexerTwitterPlugin.java +++ b/plugins/river/twitter/src/main/java/org/elasticsearch/plugin/river/twitter/RiverTwitterPlugin.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.plugin.indexer.twitter; +package org.elasticsearch.plugin.river.twitter; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.plugins.AbstractPlugin; @@ -25,16 +25,16 @@ import org.elasticsearch.plugins.AbstractPlugin; /** * @author kimchy (shay.banon) */ -public class IndexerTwitterPlugin extends AbstractPlugin { +public class RiverTwitterPlugin extends AbstractPlugin { - @Inject public IndexerTwitterPlugin() { + @Inject public RiverTwitterPlugin() { } @Override public String name() { - return "indexer-twitter"; + return "river-twitter"; } @Override public String description() { - return "Indexer Twitter Plugin"; + return "River Twitter Plugin"; } } diff --git a/plugins/indexer/twitter/src/main/java/org/elasticsearch/indexer/twitter/TwitterIndexer.java b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java similarity index 95% rename from plugins/indexer/twitter/src/main/java/org/elasticsearch/indexer/twitter/TwitterIndexer.java rename to plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java index f05c3245104..5bcf7ff8484 100644 --- a/plugins/indexer/twitter/src/main/java/org/elasticsearch/indexer/twitter/TwitterIndexer.java +++ b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indexer.twitter; +package org.elasticsearch.river.twitter; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -30,11 +30,11 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.indexer.AbstractIndexerComponent; -import org.elasticsearch.indexer.Indexer; -import org.elasticsearch.indexer.IndexerName; -import org.elasticsearch.indexer.IndexerSettings; import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.river.AbstractRiverComponent; +import org.elasticsearch.river.River; +import org.elasticsearch.river.RiverName; +import org.elasticsearch.river.RiverSettings; import twitter4j.*; import java.net.URL; @@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * @author kimchy (shay.banon) */ -public class TwitterIndexer extends AbstractIndexerComponent implements Indexer { +public class TwitterRiver extends AbstractRiverComponent implements River { private final Client client; @@ -63,8 +63,8 @@ public class TwitterIndexer extends AbstractIndexerComponent implements Indexer private volatile BulkRequestBuilder currentRequest; - @Inject public TwitterIndexer(IndexerName indexerName, IndexerSettings settings, Client client) { - super(indexerName, settings); + @Inject public TwitterRiver(RiverName riverName, RiverSettings settings, Client client) { + super(riverName, settings); this.client = client; String user = XContentMapValues.nodeStringValue(settings.settings().get("user"), null); @@ -85,10 +85,10 @@ public class TwitterIndexer extends AbstractIndexerComponent implements Indexer if (settings.settings().containsKey("index")) { Map indexSettings = (Map) settings.settings().get("index"); - indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), indexerName.name()); + indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name()); typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status"); } else { - indexName = indexerName.name(); + indexName = riverName.name(); typeName = "status"; } diff --git a/plugins/indexer/twitter/src/main/java/org/elasticsearch/indexer/twitter/TwitterIndexerModule.java b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiverModule.java similarity index 81% rename from plugins/indexer/twitter/src/main/java/org/elasticsearch/indexer/twitter/TwitterIndexerModule.java rename to plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiverModule.java index 9d80afbb9f1..5d276848c05 100644 --- a/plugins/indexer/twitter/src/main/java/org/elasticsearch/indexer/twitter/TwitterIndexerModule.java +++ b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiverModule.java @@ -17,17 +17,17 @@ * under the License. */ -package org.elasticsearch.indexer.twitter; +package org.elasticsearch.river.twitter; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.indexer.Indexer; +import org.elasticsearch.river.River; /** * @author kimchy (shay.banon) */ -public class TwitterIndexerModule extends AbstractModule { +public class TwitterRiverModule extends AbstractModule { @Override protected void configure() { - bind(Indexer.class).to(TwitterIndexer.class).asEagerSingleton(); + bind(River.class).to(TwitterRiver.class).asEagerSingleton(); } } diff --git a/settings.gradle b/settings.gradle index 582e3f3ae7c..7c1405b4770 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,7 +15,7 @@ include 'plugins-mapper-attachments' include 'plugins-client-groovy' include 'plugins-transport-memcached' include 'plugins-transport-thrift' -include 'plugins-indexer-twitter' +include 'plugins-river-twitter' rootProject.name = 'elasticsearch-root' rootProject.children.each {project ->