rename indexer to river

This commit is contained in:
kimchy 2010-09-21 13:35:09 +02:00
parent 69fad80b01
commit 4fe7cbccca
39 changed files with 446 additions and 488 deletions

View File

@ -8,8 +8,8 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-indexer-twitter.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-indexer-twitter.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" />

View File

@ -20,7 +20,7 @@
<orderEntry type="module" module-name="plugin-analysis-icu" /> <orderEntry type="module" module-name="plugin-analysis-icu" />
<orderEntry type="module" module-name="plugins-hadoop" /> <orderEntry type="module" module-name="plugins-hadoop" />
<orderEntry type="module" module-name="plugin-cloud-aws" /> <orderEntry type="module" module-name="plugin-cloud-aws" />
<orderEntry type="module" module-name="plugin-indexer-twitter" /> <orderEntry type="module" module-name="plugin-river-twitter" />
<orderEntry type="module" module-name="test-integration" /> <orderEntry type="module" module-name="test-integration" />
</component> </component>
</module> </module>

View File

@ -1,13 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4"> <module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false"> <component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/indexer/twitter/build/classes/main" /> <output url="file://$MODULE_DIR$/../../plugins/river/twitter/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/indexer/twitter/build/classes/test" /> <output-test url="file://$MODULE_DIR$/../../plugins/river/twitter/build/classes/test" />
<exclude-output /> <exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/indexer/twitter"> <content url="file://$MODULE_DIR$/../../plugins/river/twitter">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/indexer/twitter/src/main/java" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/../../plugins/river/twitter/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../plugins/indexer/twitter/src/test/java" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/../../plugins/river/twitter/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/indexer/twitter/build" /> <excludeFolder url="file://$MODULE_DIR$/../../plugins/river/twitter/build" />
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />

View File

@ -48,10 +48,10 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indexer.IndexerIndexName;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.timer.TimerService; import org.elasticsearch.timer.TimerService;
import java.io.File; import java.io.File;
@ -130,7 +130,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#")); listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#"));
return currentState; return currentState;
} }
if (!request.index.equals(IndexerIndexName.Conf.DEFAULT_INDEXER_NAME) && request.index.charAt(0) == '_') { if (!request.index.equals(RiverIndexName.Conf.DEFAULT_INDEX_NAME) && request.index.charAt(0) == '_') {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'")); listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'"));
return currentState; return currentState;
} }

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.river.RiverName;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -66,11 +66,11 @@ public class Loggers {
return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0])); return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0]));
} }
public static ESLogger getLogger(Class clazz, Settings settings, IndexerName indexerName, String... prefixes) { public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) {
List<String> l = Lists.newArrayList(); List<String> l = Lists.newArrayList();
l.add(SPACE); l.add(SPACE);
l.add(indexerName.type()); l.add(riverName.type());
l.add(indexerName.name()); l.add(riverName.name());
l.addAll(Lists.newArrayList(prefixes)); l.addAll(Lists.newArrayList(prefixes));
return getLogger(clazz, settings, l.toArray(new String[l.size()])); return getLogger(clazz, settings, l.toArray(new String[l.size()]));
} }

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer;
import org.elasticsearch.ElasticSearchException;
/**
* @author kimchy (shay.banon)
*/
public interface CloseableIndexerComponent {
/**
* Closes the indexer component. A boolean indicating if its part of an actual index
* deletion or not is passed.
*
* @param delete <tt>true</tt> if the index is being deleted.
* @throws org.elasticsearch.ElasticSearchException
*
*/
void close(boolean delete) throws ElasticSearchException;
}

View File

@ -1,122 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.routing;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.indexer.IndexerName;
import java.io.IOException;
import java.util.Iterator;
/**
* @author kimchy (shay.banon)
*/
public class IndexersRouting implements Iterable<IndexerRouting> {
public static final IndexersRouting EMPTY = IndexersRouting.builder().build();
private final ImmutableMap<IndexerName, IndexerRouting> indexers;
private IndexersRouting(ImmutableMap<IndexerName, IndexerRouting> indexers) {
this.indexers = indexers;
}
public boolean isEmpty() {
return indexers.isEmpty();
}
public IndexerRouting routing(IndexerName indexerName) {
return indexers.get(indexerName);
}
public boolean hasIndexerByName(String name) {
for (IndexerName indexerName : indexers.keySet()) {
if (indexerName.name().equals(name)) {
return true;
}
}
return false;
}
@Override public Iterator<IndexerRouting> iterator() {
return indexers.values().iterator();
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private MapBuilder<IndexerName, IndexerRouting> indexers = MapBuilder.newMapBuilder();
public Builder routing(IndexersRouting routing) {
indexers.putAll(routing.indexers);
return this;
}
public Builder put(IndexerRouting routing) {
indexers.put(routing.indexerName(), routing);
return this;
}
public Builder remove(IndexerRouting routing) {
indexers.remove(routing.indexerName());
return this;
}
public Builder remove(IndexerName indexerName) {
indexers.remove(indexerName);
return this;
}
public Builder remote(String indexerName) {
for (IndexerName name : indexers.map().keySet()) {
if (name.name().equals(indexerName)) {
indexers.remove(name);
}
}
return this;
}
public IndexersRouting build() {
return new IndexersRouting(indexers.immutableMap());
}
public static IndexersRouting readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(IndexerRouting.readIndexerRouting(in));
}
return builder.build();
}
public static void writeTo(IndexersRouting routing, StreamOutput out) throws IOException {
out.writeVInt(routing.indexers.size());
for (IndexerRouting indexerRouting : routing) {
indexerRouting.writeTo(out);
}
}
}
}

View File

@ -54,8 +54,6 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer; import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerModule; import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.indexer.IndexerManager;
import org.elasticsearch.indexer.IndexersModule;
import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@ -69,6 +67,8 @@ import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestModule; import org.elasticsearch.rest.RestModule;
import org.elasticsearch.river.RiversManager;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService;
@ -134,7 +134,7 @@ public final class InternalNode implements Node {
if (settings.getAsBoolean("http.enabled", true)) { if (settings.getAsBoolean("http.enabled", true)) {
modules.add(new HttpServerModule(settings)); modules.add(new HttpServerModule(settings));
} }
modules.add(new IndexersModule(settings)); modules.add(new RiversModule(settings));
modules.add(new IndicesModule(settings)); modules.add(new IndicesModule(settings));
modules.add(new SearchModule()); modules.add(new SearchModule());
modules.add(new TransportActionModule()); modules.add(new TransportActionModule());
@ -171,7 +171,7 @@ public final class InternalNode implements Node {
injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndexerManager.class).start(); injector.getInstance(RiversManager.class).start();
injector.getInstance(ClusterService.class).start(); injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start(); injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start(); injector.getInstance(SearchService.class).start();
@ -209,7 +209,7 @@ public final class InternalNode implements Node {
injector.getInstance(MonitorService.class).stop(); injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop(); injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop(); injector.getInstance(SearchService.class).stop();
injector.getInstance(IndexerManager.class).stop(); injector.getInstance(RiversManager.class).stop();
injector.getInstance(IndicesClusterStateService.class).stop(); injector.getInstance(IndicesClusterStateService.class).stop();
injector.getInstance(IndicesService.class).stop(); injector.getInstance(IndicesService.class).stop();
injector.getInstance(RestController.class).stop(); injector.getInstance(RestController.class).stop();
@ -256,7 +256,7 @@ public final class InternalNode implements Node {
stopWatch.stop().start("search"); stopWatch.stop().start("search");
injector.getInstance(SearchService.class).close(); injector.getInstance(SearchService.class).close();
stopWatch.stop().start("indexers"); stopWatch.stop().start("indexers");
injector.getInstance(IndexerManager.class).close(); injector.getInstance(RiversManager.class).close();
stopWatch.stop().start("indices_cluster"); stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close(); injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices"); stopWatch.stop().start("indices");

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
@ -25,23 +25,23 @@ import org.elasticsearch.common.logging.Loggers;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class AbstractIndexerComponent implements IndexerComponent { public class AbstractRiverComponent implements RiverComponent {
protected final ESLogger logger; protected final ESLogger logger;
protected final IndexerName indexerName; protected final RiverName riverName;
protected final IndexerSettings settings; protected final RiverSettings settings;
protected AbstractIndexerComponent(IndexerName indexerName, IndexerSettings settings) { protected AbstractRiverComponent(RiverName riverName, RiverSettings settings) {
this.indexerName = indexerName; this.riverName = riverName;
this.settings = settings; this.settings = settings;
this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), indexerName); this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
} }
@Override public IndexerName indexerName() { @Override public RiverName riverName() {
return indexerName; return riverName;
} }
public String nodeName() { public String nodeName() {

View File

@ -17,12 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public interface Indexer extends IndexerComponent { public interface River extends RiverComponent {
void start(); void start();

View File

@ -17,12 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public interface IndexerComponent { public interface RiverComponent {
IndexerName indexerName(); RiverName riverName();
} }

View File

@ -17,31 +17,31 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerException extends ElasticSearchException { public class RiverException extends ElasticSearchException {
private final IndexerName indexer; private final RiverName river;
public IndexerException(IndexerName indexer, String msg) { public RiverException(RiverName river, String msg) {
this(indexer, msg, null); this(river, msg, null);
} }
public IndexerException(IndexerName indexer, String msg, Throwable cause) { public RiverException(RiverName river, String msg, Throwable cause) {
this(indexer, true, msg, cause); this(river, true, msg, cause);
} }
protected IndexerException(IndexerName indexer, boolean withSpace, String msg, Throwable cause) { protected RiverException(RiverName river, boolean withSpace, String msg, Throwable cause) {
super("[" + indexer.type() + "][" + indexer.name() + "]" + (withSpace ? " " : "") + msg, cause); super("[" + river.type() + "][" + river.name() + "]" + (withSpace ? " " : "") + msg, cause);
this.indexer = indexer; this.river = river;
} }
public IndexerName indexerName() { public RiverName riverName() {
return indexer; return river;
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.common.inject.BindingAnnotation; import org.elasticsearch.common.inject.BindingAnnotation;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -37,13 +37,13 @@ import static java.lang.annotation.RetentionPolicy.*;
@Target({FIELD, PARAMETER}) @Target({FIELD, PARAMETER})
@Retention(RUNTIME) @Retention(RUNTIME)
@Documented @Documented
public @interface IndexerIndexName { public @interface RiverIndexName {
static class Conf { static class Conf {
public static String DEFAULT_INDEXER_NAME = "_indexer"; public static String DEFAULT_INDEX_NAME = "_river";
public static String indexName(Settings settings) { public static String indexName(Settings settings) {
return settings.get("indexer.index_name", DEFAULT_INDEXER_NAME); return settings.get("river.index_name", DEFAULT_INDEX_NAME);
} }
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableList;
@ -35,26 +35,26 @@ import static org.elasticsearch.common.Strings.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerModule extends AbstractModule implements SpawnModules { public class RiverModule extends AbstractModule implements SpawnModules {
private IndexerName indexerName; private RiverName riverName;
private final Settings globalSettings; private final Settings globalSettings;
private final Map<String, Object> settings; private final Map<String, Object> settings;
public IndexerModule(IndexerName indexerName, Map<String, Object> settings, Settings globalSettings) { public RiverModule(RiverName riverName, Map<String, Object> settings, Settings globalSettings) {
this.indexerName = indexerName; this.riverName = riverName;
this.globalSettings = globalSettings; this.globalSettings = globalSettings;
this.settings = settings; this.settings = settings;
} }
@Override public Iterable<? extends Module> spawnModules() { @Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(loadTypeModule(indexerName.type(), "org.elasticsearch.indexer.", "IndexerModule"), globalSettings)); return ImmutableList.of(Modules.createModule(loadTypeModule(riverName.type(), "org.elasticsearch.river.", "RiverModule"), globalSettings));
} }
@Override protected void configure() { @Override protected void configure() {
bind(IndexerSettings.class).toInstance(new IndexerSettings(globalSettings, settings)); bind(RiverSettings.class).toInstance(new RiverSettings(globalSettings, settings));
} }
private Class<? extends Module> loadTypeModule(String type, String prefixPackage, String suffixClassName) { private Class<? extends Module> loadTypeModule(String type, String prefixPackage, String suffixClassName) {

View File

@ -17,20 +17,20 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import java.io.Serializable; import java.io.Serializable;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerName implements Serializable { public class RiverName implements Serializable {
private final String type; private final String type;
private final String name; private final String name;
public IndexerName(String type, String name) { public RiverName(String type, String name) {
this.type = type; this.type = type;
this.name = name; this.name = name;
} }
@ -55,7 +55,7 @@ public class IndexerName implements Serializable {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
IndexerName that = (IndexerName) o; RiverName that = (RiverName) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false; if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false; if (type != null ? !type.equals(that.type) : that.type != null) return false;

View File

@ -17,22 +17,22 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerNameModule extends AbstractModule { public class RiverNameModule extends AbstractModule {
private final IndexerName indexerName; private final RiverName riverName;
public IndexerNameModule(IndexerName indexerName) { public RiverNameModule(RiverName riverName) {
this.indexerName = indexerName; this.riverName = riverName;
} }
@Override protected void configure() { @Override protected void configure() {
bind(IndexerName.class).toInstance(indexerName); bind(RiverName.class).toInstance(riverName);
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -27,13 +27,13 @@ import java.util.Map;
* @author kimchy (shayy.banon) * @author kimchy (shayy.banon)
*/ */
public class IndexerSettings { public class RiverSettings {
private final Settings globalSettings; private final Settings globalSettings;
private final Map<String, Object> settings; private final Map<String, Object> settings;
public IndexerSettings(Settings globalSettings, Map<String, Object> settings) { public RiverSettings(Settings globalSettings, Map<String, Object> settings) {
this.globalSettings = globalSettings; this.globalSettings = globalSettings;
this.settings = settings; this.settings = settings;
} }

View File

@ -17,48 +17,48 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.indexer.routing.IndexersRouter; import org.elasticsearch.river.routing.RiversRouter;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerManager extends AbstractLifecycleComponent<IndexerManager> { public class RiversManager extends AbstractLifecycleComponent<RiversManager> {
private final IndexersService indexersService; private final RiversService riversService;
private final IndexerClusterService clusterService; private final RiverClusterService clusterService;
private final IndexersRouter indexersRouter; private final RiversRouter riversRouter;
@Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService, IndexersRouter indexersRouter) { @Inject public RiversManager(Settings settings, RiversService riversService, RiverClusterService clusterService, RiversRouter riversRouter) {
super(settings); super(settings);
this.indexersService = indexersService; this.riversService = riversService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indexersRouter = indexersRouter; this.riversRouter = riversRouter;
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
indexersRouter.start(); riversRouter.start();
indexersService.start(); riversService.start();
clusterService.start(); clusterService.start();
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
indexersRouter.stop(); riversRouter.stop();
clusterService.stop(); clusterService.stop();
indexersService.stop(); riversService.stop();
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
indexersRouter.close(); riversRouter.close();
clusterService.close(); clusterService.close();
indexersService.close(); riversService.close();
} }
} }

View File

@ -17,29 +17,29 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.indexer.routing.IndexersRouter; import org.elasticsearch.river.routing.RiversRouter;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexersModule extends AbstractModule { public class RiversModule extends AbstractModule {
private final Settings settings; private final Settings settings;
public IndexersModule(Settings settings) { public RiversModule(Settings settings) {
this.settings = settings; this.settings = settings;
} }
@Override protected void configure() { @Override protected void configure() {
bind(String.class).annotatedWith(IndexerIndexName.class).toInstance(IndexerIndexName.Conf.indexName(settings)); bind(String.class).annotatedWith(RiverIndexName.class).toInstance(RiverIndexName.Conf.indexName(settings));
bind(IndexersService.class).asEagerSingleton(); bind(RiversService.class).asEagerSingleton();
bind(IndexerClusterService.class).asEagerSingleton(); bind(RiverClusterService.class).asEagerSingleton();
bind(IndexersRouter.class).asEagerSingleton(); bind(RiversRouter.class).asEagerSingleton();
bind(IndexerManager.class).asEagerSingleton(); bind(RiversManager.class).asEagerSingleton();
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer; package org.elasticsearch.river;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -35,11 +35,11 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterChangedEvent; import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState; import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateListener; import org.elasticsearch.river.cluster.RiverClusterStateListener;
import org.elasticsearch.indexer.routing.IndexerRouting; import org.elasticsearch.river.routing.RiverRouting;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map; import java.util.Map;
@ -48,9 +48,9 @@ import java.util.concurrent.CountDownLatch;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexersService extends AbstractLifecycleComponent<IndexersService> { public class RiversService extends AbstractLifecycleComponent<RiversService> {
private final String indexerIndexName; private final String riverIndexName;
private Client client; private Client client;
@ -60,33 +60,33 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
private final Injector injector; private final Injector injector;
private final Map<IndexerName, Injector> indexersInjectors = Maps.newHashMap(); private final Map<RiverName, Injector> riversInjectors = Maps.newHashMap();
private volatile ImmutableMap<IndexerName, Indexer> indexers = ImmutableMap.of(); private volatile ImmutableMap<RiverName, River> rivers = ImmutableMap.of();
@Inject public IndexersService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) { @Inject public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiverClusterService riverClusterService, Injector injector) {
super(settings); super(settings);
this.indexerIndexName = IndexerIndexName.Conf.indexName(settings); this.riverIndexName = RiverIndexName.Conf.indexName(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.injector = injector; this.injector = injector;
indexerClusterService.add(new ApplyIndexers()); riverClusterService.add(new ApplyRivers());
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
ImmutableSet<IndexerName> indices = ImmutableSet.copyOf(this.indexers.keySet()); ImmutableSet<RiverName> indices = ImmutableSet.copyOf(this.rivers.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size()); final CountDownLatch latch = new CountDownLatch(indices.size());
for (final IndexerName indexerName : indices) { for (final RiverName riverName : indices) {
threadPool.cached().execute(new Runnable() { threadPool.cached().execute(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
closeIndexer(indexerName); closeRiver(riverName);
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to delete indexer on stop [{}]/[{}]", e, indexerName.type(), indexerName.name()); logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name());
} finally { } finally {
latch.countDown(); latch.countDown();
} }
@ -103,68 +103,64 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
} }
public synchronized Indexer createIndexer(IndexerName indexerName, Map<String, Object> settings) throws ElasticSearchException { public synchronized River createRiver(RiverName riverName, Map<String, Object> settings) throws ElasticSearchException {
if (indexersInjectors.containsKey(indexerName)) { if (riversInjectors.containsKey(riverName)) {
throw new IndexerException(indexerName, "indexer already exists"); throw new RiverException(riverName, "river already exists");
} }
logger.debug("creating indexer [{}][{}]", indexerName.type(), indexerName.name()); logger.debug("creating river [{}][{}]", riverName.type(), riverName.name());
ModulesBuilder modules = new ModulesBuilder(); ModulesBuilder modules = new ModulesBuilder();
modules.add(new IndexerNameModule(indexerName)); modules.add(new RiverNameModule(riverName));
modules.add(new IndexerModule(indexerName, settings, this.settings)); modules.add(new RiverModule(riverName, settings, this.settings));
Injector indexInjector = modules.createChildInjector(injector); Injector indexInjector = modules.createChildInjector(injector);
indexersInjectors.put(indexerName, indexInjector); riversInjectors.put(riverName, indexInjector);
Indexer indexer = indexInjector.getInstance(Indexer.class); River river = indexInjector.getInstance(River.class);
indexers = MapBuilder.newMapBuilder(indexers).put(indexerName, indexer).immutableMap(); rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();
// we need this start so there can be operations done (like creating an index) which can't be // we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors // done on create since Guice can't create two concurrent child injectors
indexer.start(); river.start();
return indexer; return river;
} }
public synchronized void closeIndexer(IndexerName indexerName) throws ElasticSearchException { public synchronized void closeRiver(RiverName riverName) throws ElasticSearchException {
Injector indexerInjector; Injector riverInjector;
Indexer indexer; River river;
synchronized (this) { synchronized (this) {
indexerInjector = indexersInjectors.remove(indexerName); riverInjector = riversInjectors.remove(riverName);
if (indexerInjector == null) { if (riverInjector == null) {
throw new IndexerException(indexerName, "missing"); throw new RiverException(riverName, "missing");
} }
logger.debug("closing indexer [{}][{}]", indexerName.type(), indexerName.name()); logger.debug("closing river [{}][{}]", riverName.type(), riverName.name());
Map<IndexerName, Indexer> tmpMap = Maps.newHashMap(indexers); Map<RiverName, River> tmpMap = Maps.newHashMap(rivers);
indexer = tmpMap.remove(indexerName); river = tmpMap.remove(riverName);
indexers = ImmutableMap.copyOf(tmpMap); rivers = ImmutableMap.copyOf(tmpMap);
} }
// for (Class<? extends CloseableIndexerComponent> closeable : pluginsService.indexServices()) { river.close();
// indexerInjector.getInstance(closeable).close(delete);
// }
indexer.close();
Injectors.close(injector); Injectors.close(injector);
} }
private class ApplyIndexers implements IndexerClusterStateListener { private class ApplyRivers implements RiverClusterStateListener {
@Override public void indexerClusterChanged(IndexerClusterChangedEvent event) { @Override public void riverClusterChanged(RiverClusterChangedEvent event) {
DiscoveryNode localNode = clusterService.localNode(); DiscoveryNode localNode = clusterService.localNode();
IndexerClusterState state = event.state(); RiverClusterState state = event.state();
// first, go over and delete ones that either don't exists or are not allocated // first, go over and delete ones that either don't exists or are not allocated
for (IndexerName indexerName : indexers.keySet()) { for (RiverName riverName : rivers.keySet()) {
IndexerRouting routing = state.routing().routing(indexerName); RiverRouting routing = state.routing().routing(riverName);
if (routing == null || !localNode.equals(routing.node())) { if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before) // not routed at all, and not allocated here, clean it (we delete the relevant ones before)
closeIndexer(indexerName); closeRiver(riverName);
} }
} }
for (final IndexerRouting routing : state.routing()) { for (final RiverRouting routing : state.routing()) {
// not allocated // not allocated
if (routing.node() == null) { if (routing.node() == null) {
continue; continue;
@ -174,21 +170,21 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
continue; continue;
} }
// if its already created, ignore it // if its already created, ignore it
if (indexers.containsKey(routing.indexerName())) { if (rivers.containsKey(routing.riverName())) {
continue; continue;
} }
client.prepareGet(indexerIndexName, routing.indexerName().name(), "_meta").execute(new ActionListener<GetResponse>() { client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").execute(new ActionListener<GetResponse>() {
@Override public void onResponse(GetResponse getResponse) { @Override public void onResponse(GetResponse getResponse) {
if (!indexers.containsKey(routing.indexerName())) { if (!rivers.containsKey(routing.riverName())) {
if (getResponse.exists()) { if (getResponse.exists()) {
// only create the indexer if it exists, otherwise, the indexing meta data has not been visible yet... // only create the river if it exists, otherwise, the indexing meta data has not been visible yet...
createIndexer(routing.indexerName(), getResponse.sourceAsMap()); createRiver(routing.riverName(), getResponse.sourceAsMap());
} }
} }
} }
@Override public void onFailure(Throwable e) { @Override public void onFailure(Throwable e) {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.indexerName().type(), routing.indexerName().name()); logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
} }
}); });
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -35,10 +35,10 @@ import java.io.IOException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class PublishIndexerClusterStateAction extends AbstractComponent { public class PublishRiverClusterStateAction extends AbstractComponent {
public static interface NewClusterStateListener { public static interface NewClusterStateListener {
void onNewClusterState(IndexerClusterState clusterState); void onNewClusterState(RiverClusterState clusterState);
} }
private final TransportService transportService; private final TransportService transportService;
@ -47,7 +47,7 @@ public class PublishIndexerClusterStateAction extends AbstractComponent {
private final NewClusterStateListener listener; private final NewClusterStateListener listener;
public PublishIndexerClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, public PublishRiverClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService,
NewClusterStateListener listener) { NewClusterStateListener listener) {
super(settings); super(settings);
this.transportService = transportService; this.transportService = transportService;
@ -60,7 +60,7 @@ public class PublishIndexerClusterStateAction extends AbstractComponent {
transportService.removeHandler(PublishClusterStateRequestHandler.ACTION); transportService.removeHandler(PublishClusterStateRequestHandler.ACTION);
} }
public void publish(IndexerClusterState clusterState) { public void publish(RiverClusterState clusterState) {
final DiscoveryNodes discoNodes = clusterService.state().nodes(); final DiscoveryNodes discoNodes = clusterService.state().nodes();
for (final DiscoveryNode node : discoNodes) { for (final DiscoveryNode node : discoNodes) {
if (node.equals(discoNodes.localNode())) { if (node.equals(discoNodes.localNode())) {
@ -68,15 +68,15 @@ public class PublishIndexerClusterStateAction extends AbstractComponent {
continue; continue;
} }
// we only want to send nodes that are either possible master nodes or indexer nodes // we only want to send nodes that are either possible master nodes or river nodes
// master nodes because they will handle the state and the allocation of indexers // master nodes because they will handle the state and the allocation of rivers
// and indexer nodes since they will end up creating indexes // and river nodes since they will end up creating indexes
if (node.clientNode()) { if (node.clientNode()) {
continue; continue;
} }
if (!node.masterNode() && !IndexerNodeHelper.isIndexerNode(node)) { if (!node.masterNode() && !RiverNodeHelper.isRiverNode(node)) {
continue; continue;
} }
@ -90,27 +90,27 @@ public class PublishIndexerClusterStateAction extends AbstractComponent {
private class PublishClusterStateRequest implements Streamable { private class PublishClusterStateRequest implements Streamable {
private IndexerClusterState clusterState; private RiverClusterState clusterState;
private PublishClusterStateRequest() { private PublishClusterStateRequest() {
} }
private PublishClusterStateRequest(IndexerClusterState clusterState) { private PublishClusterStateRequest(RiverClusterState clusterState) {
this.clusterState = clusterState; this.clusterState = clusterState;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
clusterState = IndexerClusterState.Builder.readFrom(in); clusterState = RiverClusterState.Builder.readFrom(in);
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
IndexerClusterState.Builder.writeTo(clusterState, out); RiverClusterState.Builder.writeTo(clusterState, out);
} }
} }
private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<PublishClusterStateRequest> { private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<PublishClusterStateRequest> {
static final String ACTION = "indexer/state/publish"; static final String ACTION = "river/state/publish";
@Override public PublishClusterStateRequest newInstance() { @Override public PublishClusterStateRequest newInstance() {
return new PublishClusterStateRequest(); return new PublishClusterStateRequest();

View File

@ -17,20 +17,20 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerClusterChangedEvent { public class RiverClusterChangedEvent {
private final String source; private final String source;
private final IndexerClusterState previousState; private final RiverClusterState previousState;
private final IndexerClusterState state; private final RiverClusterState state;
public IndexerClusterChangedEvent(String source, IndexerClusterState state, IndexerClusterState previousState) { public RiverClusterChangedEvent(String source, RiverClusterState state, RiverClusterState previousState) {
this.source = source; this.source = source;
this.state = state; this.state = state;
this.previousState = previousState; this.previousState = previousState;
@ -43,11 +43,11 @@ public class IndexerClusterChangedEvent {
return this.source; return this.source;
} }
public IndexerClusterState state() { public RiverClusterState state() {
return this.state; return this.state;
} }
public IndexerClusterState previousState() { public RiverClusterState previousState() {
return this.previousState; return this.previousState;
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -38,27 +38,27 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClusterService> { public class RiverClusterService extends AbstractLifecycleComponent<RiverClusterService> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final PublishIndexerClusterStateAction publishAction; private final PublishRiverClusterStateAction publishAction;
private final List<IndexerClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<IndexerClusterStateListener>(); private final List<RiverClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<RiverClusterStateListener>();
private volatile ExecutorService updateTasksExecutor; private volatile ExecutorService updateTasksExecutor;
private volatile IndexerClusterState clusterState = IndexerClusterState.builder().build(); private volatile RiverClusterState clusterState = RiverClusterState.builder().build();
@Inject public IndexerClusterService(Settings settings, TransportService transportService, ClusterService clusterService) { @Inject public RiverClusterService(Settings settings, TransportService transportService, ClusterService clusterService) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
this.publishAction = new PublishIndexerClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener()); this.publishAction = new PublishRiverClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener());
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "indexerClusterService#updateTask")); this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "riverClusterService#updateTask"));
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
@ -73,15 +73,15 @@ public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClu
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
} }
public void add(IndexerClusterStateListener listener) { public void add(RiverClusterStateListener listener) {
clusterStateListeners.add(listener); clusterStateListeners.add(listener);
} }
public void remove(IndexerClusterStateListener listener) { public void remove(RiverClusterStateListener listener) {
clusterStateListeners.remove(listener); clusterStateListeners.remove(listener);
} }
public void submitStateUpdateTask(final String source, final IndexerClusterStateUpdateTask updateTask) { public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) { if (!lifecycle.started()) {
return; return;
} }
@ -93,7 +93,7 @@ public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClu
} }
logger.debug("processing [{}]: execute", source); logger.debug("processing [{}]: execute", source);
IndexerClusterState previousClusterState = clusterState; RiverClusterState previousClusterState = clusterState;
try { try {
clusterState = updateTask.execute(previousClusterState); clusterState = updateTask.execute(previousClusterState);
} catch (Exception e) { } catch (Exception e) {
@ -104,7 +104,7 @@ public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClu
if (previousClusterState != clusterState) { if (previousClusterState != clusterState) {
if (clusterService.state().nodes().localNodeMaster()) { if (clusterService.state().nodes().localNodeMaster()) {
// only the master controls the version numbers // only the master controls the version numbers
clusterState = new IndexerClusterState(clusterState.version() + 1, clusterState); clusterState = new RiverClusterState(clusterState.version() + 1, clusterState);
} else { } else {
// we got this cluster state from the master, filter out based on versions (don't call listeners) // we got this cluster state from the master, filter out based on versions (don't call listeners)
if (clusterState.version() < previousClusterState.version()) { if (clusterState.version() < previousClusterState.version()) {
@ -120,10 +120,10 @@ public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClu
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source); logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
} }
IndexerClusterChangedEvent clusterChangedEvent = new IndexerClusterChangedEvent(source, clusterState, previousClusterState); RiverClusterChangedEvent clusterChangedEvent = new RiverClusterChangedEvent(source, clusterState, previousClusterState);
for (IndexerClusterStateListener listener : clusterStateListeners) { for (RiverClusterStateListener listener : clusterStateListeners) {
listener.indexerClusterChanged(clusterChangedEvent); listener.riverClusterChanged(clusterChangedEvent);
} }
// if we are the master, publish the new state to all nodes // if we are the master, publish the new state to all nodes
@ -139,16 +139,16 @@ public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClu
}); });
} }
private class UpdateClusterStateListener implements PublishIndexerClusterStateAction.NewClusterStateListener { private class UpdateClusterStateListener implements PublishRiverClusterStateAction.NewClusterStateListener {
@Override public void onNewClusterState(final IndexerClusterState clusterState) { @Override public void onNewClusterState(final RiverClusterState clusterState) {
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
if (state.nodes().localNodeMaster()) { if (state.nodes().localNodeMaster()) {
logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode()); logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode());
return; return;
} }
submitStateUpdateTask("received_state", new IndexerClusterStateUpdateTask() { submitStateUpdateTask("received_state", new RiverClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) { @Override public RiverClusterState execute(RiverClusterState currentState) {
return clusterState; return clusterState;
} }
}); });

View File

@ -17,29 +17,29 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.indexer.routing.IndexersRouting; import org.elasticsearch.river.routing.RiversRouting;
import java.io.IOException; import java.io.IOException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerClusterState { public class RiverClusterState {
private final long version; private final long version;
private final IndexersRouting routing; private final RiversRouting routing;
public IndexerClusterState(long version, IndexerClusterState state) { public RiverClusterState(long version, RiverClusterState state) {
this.version = version; this.version = version;
this.routing = state.routing(); this.routing = state.routing();
} }
IndexerClusterState(long version, IndexersRouting routing) { RiverClusterState(long version, RiversRouting routing) {
this.version = version; this.version = version;
this.routing = routing; this.routing = routing;
} }
@ -48,7 +48,7 @@ public class IndexerClusterState {
return this.version; return this.version;
} }
public IndexersRouting routing() { public RiversRouting routing() {
return routing; return routing;
} }
@ -60,37 +60,37 @@ public class IndexerClusterState {
private long version = 0; private long version = 0;
private IndexersRouting routing = IndexersRouting.EMPTY; private RiversRouting routing = RiversRouting.EMPTY;
public Builder state(IndexerClusterState state) { public Builder state(RiverClusterState state) {
this.version = state.version(); this.version = state.version();
this.routing = state.routing(); this.routing = state.routing();
return this; return this;
} }
public Builder routing(IndexersRouting.Builder builder) { public Builder routing(RiversRouting.Builder builder) {
return routing(builder.build()); return routing(builder.build());
} }
public Builder routing(IndexersRouting routing) { public Builder routing(RiversRouting routing) {
this.routing = routing; this.routing = routing;
return this; return this;
} }
public IndexerClusterState build() { public RiverClusterState build() {
return new IndexerClusterState(version, routing); return new RiverClusterState(version, routing);
} }
public static IndexerClusterState readFrom(StreamInput in) throws IOException { public static RiverClusterState readFrom(StreamInput in) throws IOException {
Builder builder = new Builder(); Builder builder = new Builder();
builder.version = in.readVLong(); builder.version = in.readVLong();
builder.routing = IndexersRouting.Builder.readFrom(in); builder.routing = RiversRouting.Builder.readFrom(in);
return builder.build(); return builder.build();
} }
public static void writeTo(IndexerClusterState clusterState, StreamOutput out) throws IOException { public static void writeTo(RiverClusterState clusterState, StreamOutput out) throws IOException {
out.writeVLong(clusterState.version); out.writeVLong(clusterState.version);
IndexersRouting.Builder.writeTo(clusterState.routing, out); RiversRouting.Builder.writeTo(clusterState.routing, out);
} }
} }
} }

View File

@ -17,12 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public interface IndexerClusterStateListener { public interface RiverClusterStateListener {
void indexerClusterChanged(IndexerClusterChangedEvent event); void riverClusterChanged(RiverClusterChangedEvent event);
} }

View File

@ -17,12 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public interface IndexerClusterStateUpdateTask { public interface RiverClusterStateUpdateTask {
IndexerClusterState execute(IndexerClusterState currentState); RiverClusterState execute(RiverClusterState currentState);
} }

View File

@ -17,38 +17,38 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.cluster; package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.river.RiverName;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerNodeHelper { public class RiverNodeHelper {
public static boolean isIndexerNode(DiscoveryNode node) { public static boolean isRiverNode(DiscoveryNode node) {
if (node.clientNode()) { if (node.clientNode()) {
return false; return false;
} }
String indexer = node.attributes().get("indexer"); String river = node.attributes().get("river");
// by default, if not set, its an indexer node (better OOB exp) // by default, if not set, its an river node (better OOB exp)
if (indexer == null) { if (river == null) {
return true; return true;
} }
if ("_none_".equals(indexer)) { if ("_none_".equals(river)) {
return false; return false;
} }
// there is at least one indexer settings, we need it // there is at least one river settings, we need it
return true; return true;
} }
public static boolean isIndexerNode(DiscoveryNode node, IndexerName indexerName) { public static boolean isRiverNode(DiscoveryNode node, RiverName riverName) {
if (!isIndexerNode(node)) { if (!isRiverNode(node)) {
return false; return false;
} }
String indexer = node.attributes().get("indexer"); String river = node.attributes().get("river");
// by default, if not set, its an indexer node (better OOB exp) // by default, if not set, its an river node (better OOB exp)
return indexer == null || indexer.contains(indexerName.type()) || indexer.contains(indexerName.name()); return river == null || river.contains(riverName.type()) || river.contains(riverName.name());
} }
} }

View File

@ -17,21 +17,21 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.dummy; package org.elasticsearch.river.dummy;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.indexer.AbstractIndexerComponent; import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.indexer.Indexer; import org.elasticsearch.river.River;
import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.river.RiverName;
import org.elasticsearch.indexer.IndexerSettings; import org.elasticsearch.river.RiverSettings;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class DummyIndexer extends AbstractIndexerComponent implements Indexer { public class DummyRiver extends AbstractRiverComponent implements River {
@Inject public DummyIndexer(IndexerName indexerName, IndexerSettings settings) { @Inject public DummyRiver(RiverName riverName, RiverSettings settings) {
super(indexerName, settings); super(riverName, settings);
logger.info("create"); logger.info("create");
} }

View File

@ -17,17 +17,17 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.dummy; package org.elasticsearch.river.dummy;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.indexer.Indexer; import org.elasticsearch.river.River;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class DummyIndexerModule extends AbstractModule { public class DummyRiverModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(Indexer.class).to(DummyIndexer.class).asEagerSingleton(); bind(River.class).to(DummyRiver.class).asEagerSingleton();
} }
} }

View File

@ -17,39 +17,39 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.routing; package org.elasticsearch.river.routing;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.river.RiverName;
import java.io.IOException; import java.io.IOException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerRouting implements Streamable { public class RiverRouting implements Streamable {
private IndexerName indexerName; private RiverName riverName;
private DiscoveryNode node; private DiscoveryNode node;
private IndexerRouting() { private RiverRouting() {
} }
IndexerRouting(IndexerName indexerName, DiscoveryNode node) { RiverRouting(RiverName riverName, DiscoveryNode node) {
this.indexerName = indexerName; this.riverName = riverName;
this.node = node; this.node = node;
} }
public IndexerName indexerName() { public RiverName riverName() {
return indexerName; return riverName;
} }
/** /**
* The node the indexer is allocated to, <tt>null</tt> if its not allocated. * The node the river is allocated to, <tt>null</tt> if its not allocated.
*/ */
public DiscoveryNode node() { public DiscoveryNode node() {
return node; return node;
@ -59,22 +59,22 @@ public class IndexerRouting implements Streamable {
this.node = node; this.node = node;
} }
public static IndexerRouting readIndexerRouting(StreamInput in) throws IOException { public static RiverRouting readRiverRouting(StreamInput in) throws IOException {
IndexerRouting routing = new IndexerRouting(); RiverRouting routing = new RiverRouting();
routing.readFrom(in); routing.readFrom(in);
return routing; return routing;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
indexerName = new IndexerName(in.readUTF(), in.readUTF()); riverName = new RiverName(in.readUTF(), in.readUTF());
if (in.readBoolean()) { if (in.readBoolean()) {
node = DiscoveryNode.readNode(in); node = DiscoveryNode.readNode(in);
} }
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(indexerName.type()); out.writeUTF(riverName.type());
out.writeUTF(indexerName.name()); out.writeUTF(riverName.name());
if (node == null) { if (node == null) {
out.writeBoolean(false); out.writeBoolean(false);
} else { } else {

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.routing; package org.elasticsearch.river.routing;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
@ -35,13 +35,13 @@ import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indexer.IndexerIndexName;
import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask;
import org.elasticsearch.indexer.cluster.IndexerNodeHelper;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask;
import org.elasticsearch.river.cluster.RiverNodeHelper;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -50,18 +50,18 @@ import java.util.Map;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> implements ClusterStateListener { public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> implements ClusterStateListener {
private final String indexerIndexName; private final String riverIndexName;
private final Client client; private final Client client;
private final IndexerClusterService indexerClusterService; private final RiverClusterService riverClusterService;
@Inject public IndexersRouter(Settings settings, Client client, ClusterService clusterService, IndexerClusterService indexerClusterService) { @Inject public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService) {
super(settings); super(settings);
this.indexerIndexName = IndexerIndexName.Conf.indexName(settings); this.riverIndexName = RiverIndexName.Conf.indexName(settings);
this.indexerClusterService = indexerClusterService; this.riverClusterService = riverClusterService;
this.client = client; this.client = client;
clusterService.add(this); clusterService.add(this);
} }
@ -80,34 +80,34 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
return; return;
} }
if (event.nodesChanged() || event.metaDataChanged() || event.blocksChanged()) { if (event.nodesChanged() || event.metaDataChanged() || event.blocksChanged()) {
indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() { riverClusterService.submitStateUpdateTask("reroute_rivers_node_changed", new RiverClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) { @Override public RiverClusterState execute(RiverClusterState currentState) {
if (!event.state().metaData().hasIndex(indexerIndexName)) { if (!event.state().metaData().hasIndex(riverIndexName)) {
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state // if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
if (!currentState.routing().isEmpty()) { if (!currentState.routing().isEmpty()) {
return IndexerClusterState.builder().state(currentState).routing(IndexersRouting.builder()).build(); return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build();
} }
return currentState; return currentState;
} }
IndexersRouting.Builder routingBuilder = IndexersRouting.builder().routing(currentState.routing()); RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing());
boolean dirty = false; boolean dirty = false;
IndexMetaData indexMetaData = event.state().metaData().index(indexerIndexName); IndexMetaData indexMetaData = event.state().metaData().index(riverIndexName);
// go over and create new indexer routing (with no node) for new types (indexers names) // go over and create new river routing (with no node) for new types (rivers names)
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) { for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
String mappingType = entry.getKey(); // mapping type is the name of the indexer String mappingType = entry.getKey(); // mapping type is the name of the river
if (!currentState.routing().hasIndexerByName(mappingType)) { if (!currentState.routing().hasRiverByName(mappingType)) {
// no indexer, we need to add it to the routing with no node allocation // no river, we need to add it to the routing with no node allocation
try { try {
client.admin().indices().prepareRefresh(indexerIndexName).execute().actionGet(); client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet();
GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet(); GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").execute().actionGet();
if (getResponse.exists()) { if (getResponse.exists()) {
String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null); String riverType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null);
if (indexerType == null) { if (riverType == null) {
logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName); logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
} else { } else {
routingBuilder.put(new IndexerRouting(new IndexerName(indexerType, mappingType), null)); routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
dirty = true; dirty = true;
} }
} }
@ -121,41 +121,41 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
} }
} }
// now, remove routings that were deleted // now, remove routings that were deleted
for (IndexerRouting routing : currentState.routing()) { for (RiverRouting routing : currentState.routing()) {
if (!indexMetaData.mappings().containsKey(routing.indexerName().name())) { if (!indexMetaData.mappings().containsKey(routing.riverName().name())) {
routingBuilder.remove(routing); routingBuilder.remove(routing);
dirty = true; dirty = true;
} }
} }
// build a list from nodes to indexers // build a list from nodes to rivers
Map<DiscoveryNode, List<IndexerRouting>> nodesToIndexers = Maps.newHashMap(); Map<DiscoveryNode, List<RiverRouting>> nodesToRivers = Maps.newHashMap();
for (DiscoveryNode node : event.state().nodes()) { for (DiscoveryNode node : event.state().nodes()) {
if (IndexerNodeHelper.isIndexerNode(node)) { if (RiverNodeHelper.isRiverNode(node)) {
nodesToIndexers.put(node, Lists.<IndexerRouting>newArrayList()); nodesToRivers.put(node, Lists.<RiverRouting>newArrayList());
} }
} }
List<IndexerRouting> unassigned = Lists.newArrayList(); List<RiverRouting> unassigned = Lists.newArrayList();
for (IndexerRouting routing : routingBuilder.build()) { for (RiverRouting routing : routingBuilder.build()) {
if (routing.node() == null) { if (routing.node() == null) {
unassigned.add(routing); unassigned.add(routing);
} else { } else {
List<IndexerRouting> l = nodesToIndexers.get(routing.node()); List<RiverRouting> l = nodesToRivers.get(routing.node());
if (l == null) { if (l == null) {
l = Lists.newArrayList(); l = Lists.newArrayList();
nodesToIndexers.put(routing.node(), l); nodesToRivers.put(routing.node(), l);
} }
l.add(routing); l.add(routing);
} }
} }
for (Iterator<IndexerRouting> it = unassigned.iterator(); it.hasNext();) { for (Iterator<RiverRouting> it = unassigned.iterator(); it.hasNext();) {
IndexerRouting routing = it.next(); RiverRouting routing = it.next();
DiscoveryNode smallest = null; DiscoveryNode smallest = null;
int smallestSize = Integer.MAX_VALUE; int smallestSize = Integer.MAX_VALUE;
for (Map.Entry<DiscoveryNode, List<IndexerRouting>> entry : nodesToIndexers.entrySet()) { for (Map.Entry<DiscoveryNode, List<RiverRouting>> entry : nodesToRivers.entrySet()) {
if (IndexerNodeHelper.isIndexerNode(entry.getKey(), routing.indexerName())) { if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) {
if (entry.getValue().size() < smallestSize) { if (entry.getValue().size() < smallestSize) {
smallestSize = entry.getValue().size(); smallestSize = entry.getValue().size();
smallest = entry.getKey(); smallest = entry.getKey();
@ -166,7 +166,7 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
dirty = true; dirty = true;
it.remove(); it.remove();
routing.node(smallest); routing.node(smallest);
nodesToIndexers.get(smallest).add(routing); nodesToRivers.get(smallest).add(routing);
} }
} }
@ -174,7 +174,7 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
// add relocation logic... // add relocation logic...
if (dirty) { if (dirty) {
return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build(); return RiverClusterState.builder().state(currentState).routing(routingBuilder).build();
} }
return currentState; return currentState;
} }

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.river.RiverName;
import java.io.IOException;
import java.util.Iterator;
/**
* @author kimchy (shay.banon)
*/
public class RiversRouting implements Iterable<RiverRouting> {
public static final RiversRouting EMPTY = RiversRouting.builder().build();
private final ImmutableMap<RiverName, RiverRouting> rivers;
private RiversRouting(ImmutableMap<RiverName, RiverRouting> rivers) {
this.rivers = rivers;
}
public boolean isEmpty() {
return rivers.isEmpty();
}
public RiverRouting routing(RiverName riverName) {
return rivers.get(riverName);
}
public boolean hasRiverByName(String name) {
for (RiverName riverName : rivers.keySet()) {
if (riverName.name().equals(name)) {
return true;
}
}
return false;
}
@Override public Iterator<RiverRouting> iterator() {
return rivers.values().iterator();
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private MapBuilder<RiverName, RiverRouting> rivers = MapBuilder.newMapBuilder();
public Builder routing(RiversRouting routing) {
rivers.putAll(routing.rivers);
return this;
}
public Builder put(RiverRouting routing) {
rivers.put(routing.riverName(), routing);
return this;
}
public Builder remove(RiverRouting routing) {
rivers.remove(routing.riverName());
return this;
}
public Builder remove(RiverName riverName) {
rivers.remove(riverName);
return this;
}
public Builder remote(String riverName) {
for (RiverName name : rivers.map().keySet()) {
if (name.name().equals(riverName)) {
rivers.remove(name);
}
}
return this;
}
public RiversRouting build() {
return new RiversRouting(rivers.immutableMap());
}
public static RiversRouting readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(RiverRouting.readRiverRouting(in));
}
return builder.build();
}
public static void writeTo(RiversRouting routing, StreamOutput out) throws IOException {
out.writeVInt(routing.rivers.size());
for (RiverRouting riverRouting : routing) {
riverRouting.writeTo(out);
}
}
}
}

View File

@ -1,2 +0,0 @@
plugin=org.elasticsearch.plugin.indexer.twitter.IndexerTwitterPlugin

View File

@ -3,11 +3,11 @@ dependsOn(':elasticsearch')
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'maven' apply plugin: 'maven'
archivesBaseName = "elasticsearch-indexer-twitter" archivesBaseName = "elasticsearch-river-twitter"
explodedDistDir = new File(distsDir, 'exploded') explodedDistDir = new File(distsDir, 'exploded')
manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::Indexer::Twitter", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::River::Twitter", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
configurations.compile.transitive = true configurations.compile.transitive = true
configurations.testCompile.transitive = true configurations.testCompile.transitive = true
@ -117,7 +117,7 @@ uploadArchives {
pom.project { pom.project {
inceptionYear '2009' inceptionYear '2009'
name 'elasticsearch-plugins-indexer-twitter' name 'elasticsearch-plugins-river-twitter'
description 'Attachments Plugin for ElasticSearch' description 'Attachments Plugin for ElasticSearch'
licenses { licenses {
license { license {

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.plugin.river.twitter.RiverTwitterPlugin

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.plugin.indexer.twitter; package org.elasticsearch.plugin.river.twitter;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin; import org.elasticsearch.plugins.AbstractPlugin;
@ -25,16 +25,16 @@ import org.elasticsearch.plugins.AbstractPlugin;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class IndexerTwitterPlugin extends AbstractPlugin { public class RiverTwitterPlugin extends AbstractPlugin {
@Inject public IndexerTwitterPlugin() { @Inject public RiverTwitterPlugin() {
} }
@Override public String name() { @Override public String name() {
return "indexer-twitter"; return "river-twitter";
} }
@Override public String description() { @Override public String description() {
return "Indexer Twitter Plugin"; return "River Twitter Plugin";
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.twitter; package org.elasticsearch.river.twitter;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -30,11 +30,11 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indexer.AbstractIndexerComponent;
import org.elasticsearch.indexer.Indexer;
import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.IndexerSettings;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import twitter4j.*; import twitter4j.*;
import java.net.URL; import java.net.URL;
@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class TwitterIndexer extends AbstractIndexerComponent implements Indexer { public class TwitterRiver extends AbstractRiverComponent implements River {
private final Client client; private final Client client;
@ -63,8 +63,8 @@ public class TwitterIndexer extends AbstractIndexerComponent implements Indexer
private volatile BulkRequestBuilder currentRequest; private volatile BulkRequestBuilder currentRequest;
@Inject public TwitterIndexer(IndexerName indexerName, IndexerSettings settings, Client client) { @Inject public TwitterRiver(RiverName riverName, RiverSettings settings, Client client) {
super(indexerName, settings); super(riverName, settings);
this.client = client; this.client = client;
String user = XContentMapValues.nodeStringValue(settings.settings().get("user"), null); String user = XContentMapValues.nodeStringValue(settings.settings().get("user"), null);
@ -85,10 +85,10 @@ public class TwitterIndexer extends AbstractIndexerComponent implements Indexer
if (settings.settings().containsKey("index")) { if (settings.settings().containsKey("index")) {
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index"); Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), indexerName.name()); indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name());
typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status"); typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status");
} else { } else {
indexName = indexerName.name(); indexName = riverName.name();
typeName = "status"; typeName = "status";
} }

View File

@ -17,17 +17,17 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.indexer.twitter; package org.elasticsearch.river.twitter;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.indexer.Indexer; import org.elasticsearch.river.River;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class TwitterIndexerModule extends AbstractModule { public class TwitterRiverModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(Indexer.class).to(TwitterIndexer.class).asEagerSingleton(); bind(River.class).to(TwitterRiver.class).asEagerSingleton();
} }
} }

View File

@ -15,7 +15,7 @@ include 'plugins-mapper-attachments'
include 'plugins-client-groovy' include 'plugins-client-groovy'
include 'plugins-transport-memcached' include 'plugins-transport-memcached'
include 'plugins-transport-thrift' include 'plugins-transport-thrift'
include 'plugins-indexer-twitter' include 'plugins-river-twitter'
rootProject.name = 'elasticsearch-root' rootProject.name = 'elasticsearch-root'
rootProject.children.each {project -> rootProject.children.each {project ->