diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d3f7a5b9356..77386088709 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -54,7 +54,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; @@ -64,7 +63,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -1068,34 +1066,18 @@ public abstract class TransportReplicationAction executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable { Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - final boolean created; final ShardId shardId = indexShard.shardId(); if (update != null) { final String indexName = shardId.getIndex(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - // With rivers, we have a chicken and egg problem if indexing - // the _meta document triggers a mapping update. Because we would - // like to validate the mapping update first, but on the other - // hand putting the mapping would start the river, which expects - // to find a _meta document - // So we have no choice but to index first and send mappings afterwards - MapperService mapperService = indexShard.indexService().mapperService(); - mapperService.merge(request.type(), new CompressedXContent(update.toBytes()), true); - created = operation.execute(indexShard); - mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); - operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard); - update = operation.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnPrimaryException(shardId, - "Dynamics mappings are not available on the node that holds the primary yet"); - } - created = operation.execute(indexShard); + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); + operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard); + update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnPrimaryException(shardId, + "Dynamics mappings are not available on the node that holds the primary yet"); } - } else { - created = operation.execute(indexShard); } + final boolean created = operation.execute(indexShard); // update the version on request so it will happen on the replicas final long version = operation.version(); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index b4ca88f046b..f8e809e9a55 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -53,7 +53,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; @@ -62,7 +61,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.IndexService; import org.elasticsearch.indices.*; -import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -93,7 +91,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final AllocationService allocationService; private final MetaDataService metaDataService; private final Version version; - private final String riverIndexName; private final AliasValidator aliasValidator; private final IndexTemplateFilter indexTemplateFilter; private final NodeEnvironment nodeEnv; @@ -101,7 +98,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { @Inject public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService, - Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator, + Version version, AliasValidator aliasValidator, Set indexTemplateFilters, NodeEnvironment nodeEnv) { super(settings); this.threadPool = threadPool; @@ -110,7 +107,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.allocationService = allocationService; this.metaDataService = metaDataService; this.version = version; - this.riverIndexName = riverIndexName; this.aliasValidator = aliasValidator; this.nodeEnv = nodeEnv; @@ -163,7 +159,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { if (index.contains("#")) { throw new InvalidIndexNameException(new Index(index), index, "must not contain '#'"); } - if (!index.equals(riverIndexName) && index.charAt(0) == '_') { + if (index.charAt(0) == '_') { throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'"); } if (!index.toLowerCase(Locale.ROOT).equals(index)) { @@ -306,11 +302,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) { - if (request.index().equals(riverIndexName)) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); - } else { - indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); - } + indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); } } if (request.index().equals(ScriptService.SCRIPT_INDEX)) { @@ -318,11 +310,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all"); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) { - if (request.index().equals(riverIndexName)) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); - } else { - indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); - } + indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); } } diff --git a/core/src/main/java/org/elasticsearch/common/logging/Loggers.java b/core/src/main/java/org/elasticsearch/common/logging/Loggers.java index 512b8a40fe6..8f546c93e89 100644 --- a/core/src/main/java/org/elasticsearch/common/logging/Loggers.java +++ b/core/src/main/java/org/elasticsearch/common/logging/Loggers.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.Classes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.river.RiverName; import java.net.InetAddress; import java.net.UnknownHostException; @@ -71,15 +70,6 @@ public class Loggers { return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0])); } - public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) { - List l = Lists.newArrayList(); - l.add(SPACE); - l.add(riverName.type()); - l.add(riverName.name()); - l.addAll(Lists.newArrayList(prefixes)); - return getLogger(clazz, settings, l.toArray(new String[l.size()])); - } - public static ESLogger getLogger(Class clazz, Settings settings, String... prefixes) { return getLogger(buildClassLoggerName(clazz), settings, prefixes); } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 355bea50643..e562bff50e5 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -81,8 +81,6 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestModule; -import org.elasticsearch.river.RiversManager; -import org.elasticsearch.river.RiversModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; @@ -182,7 +180,6 @@ public class Node implements Releasable { if (settings.getAsBoolean(HTTP_ENABLED, true)) { modules.add(new HttpServerModule(settings)); } - modules.add(new RiversModule(settings)); modules.add(new IndicesModule(settings)); modules.add(new SearchModule(settings)); modules.add(new ActionModule(false)); @@ -247,7 +244,6 @@ public class Node implements Releasable { injector.getInstance(IndexingMemoryController.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(IndicesTTLService.class).start(); - injector.getInstance(RiversManager.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(TransportService.class).start(); injector.getInstance(ClusterService.class).start(); @@ -289,8 +285,6 @@ public class Node implements Releasable { injector.getInstance(HttpServer.class).stop(); } - injector.getInstance(RiversManager.class).stop(); - injector.getInstance(SnapshotsService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); @@ -339,10 +333,6 @@ public class Node implements Releasable { if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).close(); } - - stopWatch.stop().start("rivers"); - injector.getInstance(RiversManager.class).close(); - stopWatch.stop().start("snapshot_service"); injector.getInstance(SnapshotsService.class).close(); stopWatch.stop().start("client"); diff --git a/core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java b/core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java deleted file mode 100644 index b8ce3985d5f..00000000000 --- a/core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; - -/** - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public class AbstractRiverComponent implements RiverComponent { - - protected final ESLogger logger; - - protected final RiverName riverName; - - protected final RiverSettings settings; - - protected AbstractRiverComponent(RiverName riverName, RiverSettings settings) { - this.riverName = riverName; - this.settings = settings; - - this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName); - } - - @Override - public RiverName riverName() { - return riverName; - } - - public String nodeName() { - return settings.globalSettings().get("name", ""); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/River.java b/core/src/main/java/org/elasticsearch/river/River.java deleted file mode 100644 index 574b5fd4f0b..00000000000 --- a/core/src/main/java/org/elasticsearch/river/River.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -/** - * Allows to import data into elasticsearch via plugin - * Gets allocated on a node and eventually automatically re-allocated if needed - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public interface River extends RiverComponent { - - /** - * Called whenever the river is registered on a node, which can happen when: - * 1) the river _meta document gets indexed - * 2) an already registered river gets started on a node - */ - void start(); - - /** - * Called when the river is closed on a node, which can happen when: - * 1) the river is deleted by deleting its type through the delete mapping api - * 2) the node where the river is allocated is shut down or the river gets rerouted to another node - */ - void close(); -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverComponent.java b/core/src/main/java/org/elasticsearch/river/RiverComponent.java deleted file mode 100644 index a8cfb898798..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverComponent.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -/** - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public interface RiverComponent { - - RiverName riverName(); -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverException.java b/core/src/main/java/org/elasticsearch/river/RiverException.java deleted file mode 100644 index 4f9243949f6..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.ElasticsearchException; - -/** - * - */ -public class RiverException extends ElasticsearchException { - - private final RiverName river; - - public RiverException(RiverName river, String msg) { - this(river, msg, null); - } - - public RiverException(RiverName river, String msg, Throwable cause) { - this(river, true, msg, cause); - } - - protected RiverException(RiverName river, boolean withSpace, String msg, Throwable cause) { - super("[" + river.type() + "][" + river.name() + "]" + (withSpace ? " " : "") + msg, cause); - this.river = river; - } - - public RiverName riverName() { - return river; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverIndexName.java b/core/src/main/java/org/elasticsearch/river/RiverIndexName.java deleted file mode 100644 index 66d2d03bab8..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverIndexName.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.inject.BindingAnnotation; -import org.elasticsearch.common.settings.Settings; - -import java.lang.annotation.Documented; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - * - */ - -@BindingAnnotation -@Target({FIELD, PARAMETER}) -@Retention(RUNTIME) -@Documented -public @interface RiverIndexName { - - static class Conf { - public static final String DEFAULT_INDEX_NAME = "_river"; - - public static String indexName(Settings settings) { - return settings.get("river.index_name", DEFAULT_INDEX_NAME); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverModule.java b/core/src/main/java/org/elasticsearch/river/RiverModule.java deleted file mode 100644 index 83a686903b2..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverModule.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableList; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.Modules; -import org.elasticsearch.common.inject.SpawnModules; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.NoClassSettingsException; - -import java.util.Locale; -import java.util.Map; - -import static org.elasticsearch.common.Strings.toCamelCase; - -/** - * - */ -public class RiverModule extends AbstractModule implements SpawnModules { - - private RiverName riverName; - - private final Settings globalSettings; - - private final Map settings; - - private final RiversTypesRegistry typesRegistry; - - public RiverModule(RiverName riverName, Map settings, Settings globalSettings, RiversTypesRegistry typesRegistry) { - this.riverName = riverName; - this.globalSettings = globalSettings; - this.settings = settings; - this.typesRegistry = typesRegistry; - } - - @Override - public Iterable spawnModules() { - return ImmutableList.of(Modules.createModule(loadTypeModule(riverName.type(), "org.elasticsearch.river.", "RiverModule"), globalSettings)); - } - - @Override - protected void configure() { - bind(RiverSettings.class).toInstance(new RiverSettings(globalSettings, settings)); - } - - private Class loadTypeModule(String type, String prefixPackage, String suffixClassName) { - Class registered = typesRegistry.type(type); - if (registered != null) { - return registered; - } - String fullClassName = type; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e) { - fullClassName = prefixPackage + Strings.capitalize(toCamelCase(type)) + suffixClassName; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e1) { - fullClassName = prefixPackage + toCamelCase(type) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e2) { - fullClassName = prefixPackage + toCamelCase(type).toLowerCase(Locale.ROOT) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e3) { - throw new NoClassSettingsException("Failed to load class with value [" + type + "]", e); - } - } - } - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverName.java b/core/src/main/java/org/elasticsearch/river/RiverName.java deleted file mode 100644 index 7078574c1b4..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverName.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import java.io.Serializable; - -/** - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public class RiverName implements Serializable { - - private final String type; - - private final String name; - - public RiverName(String type, String name) { - this.type = type; - this.name = name; - } - - public String type() { - return this.type; - } - - public String getType() { - return type(); - } - - public String name() { - return this.name; - } - - public String getName() { - return name(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - RiverName that = (RiverName) o; - - if (name != null ? !name.equals(that.name) : that.name != null) return false; - if (type != null ? !type.equals(that.type) : that.type != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = type != null ? type.hashCode() : 0; - result = 31 * result + (name != null ? name.hashCode() : 0); - return result; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverNameModule.java b/core/src/main/java/org/elasticsearch/river/RiverNameModule.java deleted file mode 100644 index cc908c0fd6e..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverNameModule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.inject.AbstractModule; - -/** - * - */ -public class RiverNameModule extends AbstractModule { - - private final RiverName riverName; - - public RiverNameModule(RiverName riverName) { - this.riverName = riverName; - } - - @Override - protected void configure() { - bind(RiverName.class).toInstance(riverName); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverSettings.java b/core/src/main/java/org/elasticsearch/river/RiverSettings.java deleted file mode 100644 index a06214861a7..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverSettings.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.settings.Settings; - -import java.util.Map; - -/** - * (shayy.banon) - */ - -public class RiverSettings { - - private final Settings globalSettings; - - private final Map settings; - - public RiverSettings(Settings globalSettings, Map settings) { - this.globalSettings = globalSettings; - this.settings = settings; - } - - public Settings globalSettings() { - return globalSettings; - } - - public Map settings() { - return settings; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversManager.java b/core/src/main/java/org/elasticsearch/river/RiversManager.java deleted file mode 100644 index c1fc6da82b2..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversManager.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.routing.RiversRouter; - -/** - * - */ -public class RiversManager extends AbstractLifecycleComponent { - - private final RiversService riversService; - - private final RiverClusterService clusterService; - - private final RiversRouter riversRouter; - - @Inject - public RiversManager(Settings settings, RiversService riversService, RiverClusterService clusterService, RiversRouter riversRouter) { - super(settings); - this.riversService = riversService; - this.clusterService = clusterService; - this.riversRouter = riversRouter; - } - - @Override - protected void doStart() { - riversRouter.start(); - riversService.start(); - clusterService.start(); - } - - @Override - protected void doStop() { - riversRouter.stop(); - clusterService.stop(); - riversService.stop(); - } - - @Override - protected void doClose() { - riversRouter.close(); - clusterService.close(); - riversService.close(); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversModule.java b/core/src/main/java/org/elasticsearch/river/RiversModule.java deleted file mode 100644 index 912874fce72..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversModule.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.routing.RiversRouter; - -import java.util.Map; - -/** - * - */ -public class RiversModule extends AbstractModule { - - private final Settings settings; - - private Map> riverTypes = Maps.newHashMap(); - - public RiversModule(Settings settings) { - this.settings = settings; - } - - /** - * Registers a custom river type name against a module. - * - * @param type The type - * @param module The module - */ - public void registerRiver(String type, Class module) { - riverTypes.put(type, module); - } - - @Override - protected void configure() { - bind(String.class).annotatedWith(RiverIndexName.class).toInstance(RiverIndexName.Conf.indexName(settings)); - bind(RiversService.class).asEagerSingleton(); - bind(RiverClusterService.class).asEagerSingleton(); - bind(RiversRouter.class).asEagerSingleton(); - bind(RiversManager.class).asEagerSingleton(); - bind(RiversTypesRegistry.class).toInstance(new RiversTypesRegistry(ImmutableMap.copyOf(riverTypes))); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java b/core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java deleted file mode 100644 index f2059fd3c42..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.PreProcessModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.PluginsService; - -/** - * A module that simply calls the {@link PluginsService#processModule(org.elasticsearch.common.inject.Module)} - * in order to allow plugins to pre process specific river modules. - */ -public class RiversPluginsModule extends AbstractModule implements PreProcessModule { - - private final Settings settings; - - private final PluginsService pluginsService; - - public RiversPluginsModule(Settings settings, PluginsService pluginsService) { - this.settings = settings; - this.pluginsService = pluginsService; - } - - @Override - public void processModule(Module module) { - pluginsService.processModule(module); - } - - @Override - protected void configure() { - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/river/RiversService.java b/core/src/main/java/org/elasticsearch/river/RiversService.java deleted file mode 100644 index 268956eecf8..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversService.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.get.GetRequestBuilder; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.support.ThreadedActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.inject.ModulesBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.river.cluster.RiverClusterChangedEvent; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.cluster.RiverClusterState; -import org.elasticsearch.river.cluster.RiverClusterStateListener; -import org.elasticsearch.river.routing.RiverRouting; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; - -/** - * - */ -public class RiversService extends AbstractLifecycleComponent { - - private final String riverIndexName; - - private Client client; - - private final ThreadPool threadPool; - - private final ClusterService clusterService; - - private final RiversTypesRegistry typesRegistry; - - private final Injector injector; - - private final Map riversInjectors = Maps.newHashMap(); - - private volatile ImmutableMap rivers = ImmutableMap.of(); - - @Inject - public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiversTypesRegistry typesRegistry, RiverClusterService riverClusterService, Injector injector) { - super(settings); - this.riverIndexName = RiverIndexName.Conf.indexName(settings); - this.client = client; - this.threadPool = threadPool; - this.clusterService = clusterService; - this.typesRegistry = typesRegistry; - this.injector = injector; - riverClusterService.add(new ApplyRivers()); - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - ImmutableSet indices = ImmutableSet.copyOf(this.rivers.keySet()); - final CountDownLatch latch = new CountDownLatch(indices.size()); - for (final RiverName riverName : indices) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - try { - closeRiver(riverName); - } catch (Exception e) { - logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name()); - } finally { - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - // ignore - } - } - - @Override - protected void doClose() { - } - - public synchronized void createRiver(RiverName riverName, Map settings) { - if (riversInjectors.containsKey(riverName)) { - logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name()); - return; - } - - logger.info("rivers have been deprecated. Read https://www.elastic.co/blog/deprecating_rivers"); - logger.debug("creating river [{}][{}]", riverName.type(), riverName.name()); - - try { - ModulesBuilder modules = new ModulesBuilder(); - modules.add(new RiverNameModule(riverName)); - modules.add(new RiverModule(riverName, settings, this.settings, typesRegistry)); - modules.add(new RiversPluginsModule(this.settings, injector.getInstance(PluginsService.class))); - - Injector indexInjector = modules.createChildInjector(injector); - riversInjectors.put(riverName, indexInjector); - River river = indexInjector.getInstance(River.class); - rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap(); - - - // we need this start so there can be operations done (like creating an index) which can't be - // done on create since Guice can't create two concurrent child injectors - river.start(); - - XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - - builder.startObject("node"); - builder.field("id", clusterService.localNode().id()); - builder.field("name", clusterService.localNode().name()); - builder.field("transport_address", clusterService.localNode().address().toString()); - builder.endObject(); - - builder.endObject(); - - - client.prepareIndex(riverIndexName, riverName.name(), "_status") - .setConsistencyLevel(WriteConsistencyLevel.ONE) - .setSource(builder).execute().actionGet(); - } catch (Exception e) { - logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name()); - - try { - XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - builder.field("error", ExceptionsHelper.detailedMessage(e)); - - builder.startObject("node"); - builder.field("id", clusterService.localNode().id()); - builder.field("name", clusterService.localNode().name()); - builder.field("transport_address", clusterService.localNode().address().toString()); - builder.endObject(); - builder.endObject(); - - client.prepareIndex(riverIndexName, riverName.name(), "_status") - .setConsistencyLevel(WriteConsistencyLevel.ONE) - .setSource(builder).execute().actionGet(); - } catch (Exception e1) { - logger.warn("failed to write failed status for river creation", e); - } - } - } - - public synchronized void closeRiver(RiverName riverName) { - Injector riverInjector; - River river; - synchronized (this) { - riverInjector = riversInjectors.remove(riverName); - if (riverInjector == null) { - throw new RiverException(riverName, "missing"); - } - logger.debug("closing river [{}][{}]", riverName.type(), riverName.name()); - - Map tmpMap = Maps.newHashMap(rivers); - river = tmpMap.remove(riverName); - rivers = ImmutableMap.copyOf(tmpMap); - } - - river.close(); - } - - private class ApplyRivers implements RiverClusterStateListener { - @Override - public void riverClusterChanged(RiverClusterChangedEvent event) { - DiscoveryNode localNode = clusterService.localNode(); - RiverClusterState state = event.state(); - - // first, go over and delete ones that either don't exists or are not allocated - for (final RiverName riverName : rivers.keySet()) { - RiverRouting routing = state.routing().routing(riverName); - if (routing == null || !localNode.equals(routing.node())) { - // not routed at all, and not allocated here, clean it (we delete the relevant ones before) - closeRiver(riverName); - } - } - - for (final RiverRouting routing : state.routing()) { - // not allocated - if (routing.node() == null) { - logger.trace("river {} has no routing node", routing.riverName().getName()); - continue; - } - // only apply changes to the local node - if (!routing.node().equals(localNode)) { - logger.trace("river {} belongs to node {}", routing.riverName().getName(), routing.node()); - continue; - } - // if its already created, ignore it - if (rivers.containsKey(routing.riverName())) { - logger.trace("river {} is already allocated", routing.riverName().getName()); - continue; - } - prepareGetMetaDocument(routing.riverName().name()).execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, new ActionListener() { - @Override - public void onResponse(GetResponse getResponse) { - if (!rivers.containsKey(routing.riverName())) { - if (getResponse.isExists()) { - // only create the river if it exists, otherwise, the indexing meta data has not been visible yet... - createRiver(routing.riverName(), getResponse.getSourceAsMap()); - } else { - //this should never happen as we've just found the _meta document in RiversRouter - logger.warn("{}/{}/_meta document not found", riverIndexName, routing.riverName().getName()); - } - } - } - - @Override - public void onFailure(Throwable e) { - // if its this is a failure that need to be retried, then do it - // this might happen if the state of the river index has not been propagated yet to this node, which - // should happen pretty fast since we managed to get the _meta in the RiversRouter - Throwable failure = ExceptionsHelper.unwrapCause(e); - if (isShardNotAvailableException(failure)) { - logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name()); - final ActionListener listener = this; - try { - threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.LISTENER, new Runnable() { - @Override - public void run() { - prepareGetMetaDocument(routing.riverName().name()).execute(listener); - } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Couldn't schedule river start retry, node might be shutting down", ex); - } - } else { - logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name()); - } - } - })); - } - } - - private GetRequestBuilder prepareGetMetaDocument(String riverName) { - return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary"); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java b/core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java deleted file mode 100644 index 3400dc40ec7..00000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableMap; -import org.elasticsearch.common.inject.Module; - -/** - * A type registry for rivers - */ -public class RiversTypesRegistry { - - private final ImmutableMap> riverTypes; - - public RiversTypesRegistry(ImmutableMap> riverTypes) { - this.riverTypes = riverTypes; - } - - public Class type(String type) { - return riverTypes.get(type); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java b/core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java deleted file mode 100644 index d783adee752..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; - -/** - * - */ -public class PublishRiverClusterStateAction extends AbstractComponent { - - public static final String ACTION_NAME = "internal:river/state/publish"; - - public interface NewClusterStateListener { - void onNewClusterState(RiverClusterState clusterState); - } - - private final TransportService transportService; - - private final ClusterService clusterService; - - private final NewClusterStateListener listener; - - public PublishRiverClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, - NewClusterStateListener listener) { - super(settings); - this.transportService = transportService; - this.clusterService = clusterService; - this.listener = listener; - transportService.registerRequestHandler(ACTION_NAME, PublishClusterStateRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler()); - } - - public void close() { - transportService.removeHandler(ACTION_NAME); - } - - public void publish(RiverClusterState clusterState) { - final DiscoveryNodes discoNodes = clusterService.state().nodes(); - final DiscoveryNode localNode = discoNodes.localNode(); - for (final DiscoveryNode node : discoNodes) { - if (node.equals(localNode)) { - // no need to send to our self - continue; - } - - // we only want to send nodes that are either possible master nodes or river nodes - // master nodes because they will handle the state and the allocation of rivers - // and river nodes since they will end up creating indexes - - if (!node.masterNode() && !RiverNodeHelper.isRiverNode(node)) { - continue; - } - - transportService.sendRequest(node, ACTION_NAME, new PublishClusterStateRequest(clusterState), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); - } - }); - } - } - - static class PublishClusterStateRequest extends TransportRequest { - - private RiverClusterState clusterState; - - PublishClusterStateRequest() { - } - - private PublishClusterStateRequest(RiverClusterState clusterState) { - this.clusterState = clusterState; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - clusterState = RiverClusterState.Builder.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - RiverClusterState.Builder.writeTo(clusterState, out); - } - } - - private class PublishClusterStateRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { - listener.onNewClusterState(request.clusterState); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java deleted file mode 100644 index 2863a401bf6..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -/** - * - */ -public class RiverClusterChangedEvent { - - private final String source; - - private final RiverClusterState previousState; - - private final RiverClusterState state; - - public RiverClusterChangedEvent(String source, RiverClusterState state, RiverClusterState previousState) { - this.source = source; - this.state = state; - this.previousState = previousState; - } - - /** - * The source that caused this cluster event to be raised. - */ - public String source() { - return this.source; - } - - public RiverClusterState state() { - return this.state; - } - - public RiverClusterState previousState() { - return this.previousState; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java deleted file mode 100644 index 3a5208da457..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.TransportService; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.Executors.newSingleThreadExecutor; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; - -/** - * - */ -public class RiverClusterService extends AbstractLifecycleComponent { - - private final ClusterService clusterService; - - private final PublishRiverClusterStateAction publishAction; - - private final List clusterStateListeners = new CopyOnWriteArrayList<>(); - - private volatile ExecutorService updateTasksExecutor; - - private volatile RiverClusterState clusterState = RiverClusterState.builder().build(); - - @Inject - public RiverClusterService(Settings settings, TransportService transportService, ClusterService clusterService) { - super(settings); - this.clusterService = clusterService; - - this.publishAction = new PublishRiverClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener()); - } - - @Override - protected void doStart() { - this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "riverClusterService#updateTask")); - } - - @Override - protected void doStop() { - updateTasksExecutor.shutdown(); - try { - updateTasksExecutor.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } - } - - @Override - protected void doClose() { - } - - public void add(RiverClusterStateListener listener) { - clusterStateListeners.add(listener); - } - - public void remove(RiverClusterStateListener listener) { - clusterStateListeners.remove(listener); - } - - /** - * The current state. - */ - public ClusterState state() { - return clusterService.state(); - } - - public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) { - if (!lifecycle.started()) { - return; - } - updateTasksExecutor.execute(new Runnable() { - @Override - public void run() { - if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster_service not started", source); - return; - } - logger.debug("processing [{}]: execute", source); - - RiverClusterState previousClusterState = clusterState; - try { - clusterState = updateTask.execute(previousClusterState); - } catch (Exception e) { - StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); - logger.warn(sb.toString(), e); - return; - } - if (previousClusterState != clusterState) { - if (clusterService.state().nodes().localNodeMaster()) { - // only the master controls the version numbers - clusterState = new RiverClusterState(clusterState.version() + 1, clusterState); - } else { - // we got this cluster state from the master, filter out based on versions (don't call listeners) - if (clusterState.version() < previousClusterState.version()) { - logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring"); - return; - } - } - - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); - logger.trace(sb.toString()); - } else if (logger.isDebugEnabled()) { - logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source); - } - - RiverClusterChangedEvent clusterChangedEvent = new RiverClusterChangedEvent(source, clusterState, previousClusterState); - - for (RiverClusterStateListener listener : clusterStateListeners) { - listener.riverClusterChanged(clusterChangedEvent); - } - - // if we are the master, publish the new state to all nodes - if (clusterService.state().nodes().localNodeMaster()) { - publishAction.publish(clusterState); - } - - logger.debug("processing [{}]: done applying updated cluster_state", source); - } else { - logger.debug("processing [{}]: no change in cluster_state", source); - } - } - }); - } - - private class UpdateClusterStateListener implements PublishRiverClusterStateAction.NewClusterStateListener { - @Override - public void onNewClusterState(final RiverClusterState clusterState) { - ClusterState state = clusterService.state(); - if (state.nodes().localNodeMaster()) { - logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode()); - return; - } - - submitStateUpdateTask("received_state", new RiverClusterStateUpdateTask() { - @Override - public RiverClusterState execute(RiverClusterState currentState) { - return clusterState; - } - }); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java deleted file mode 100644 index 7be85a6d51b..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.river.routing.RiversRouting; - -import java.io.IOException; - -/** - * - */ -public class RiverClusterState { - - private final long version; - - private final RiversRouting routing; - - public RiverClusterState(long version, RiverClusterState state) { - this.version = version; - this.routing = state.routing(); - } - - RiverClusterState(long version, RiversRouting routing) { - this.version = version; - this.routing = routing; - } - - public long version() { - return this.version; - } - - public RiversRouting routing() { - return routing; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private long version = 0; - - private RiversRouting routing = RiversRouting.EMPTY; - - public Builder state(RiverClusterState state) { - this.version = state.version(); - this.routing = state.routing(); - return this; - } - - public Builder routing(RiversRouting.Builder builder) { - return routing(builder.build()); - } - - public Builder routing(RiversRouting routing) { - this.routing = routing; - return this; - } - - public RiverClusterState build() { - return new RiverClusterState(version, routing); - } - - public static RiverClusterState readFrom(StreamInput in) throws IOException { - Builder builder = new Builder(); - builder.version = in.readVLong(); - builder.routing = RiversRouting.Builder.readFrom(in); - return builder.build(); - } - - public static void writeTo(RiverClusterState clusterState, StreamOutput out) throws IOException { - out.writeVLong(clusterState.version); - RiversRouting.Builder.writeTo(clusterState.routing, out); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java deleted file mode 100644 index c8cac757155..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -/** - * - */ -public interface RiverClusterStateListener { - - void riverClusterChanged(RiverClusterChangedEvent event); -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java deleted file mode 100644 index eb7d3fe275b..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -/** - * - */ -public interface RiverClusterStateUpdateTask { - - RiverClusterState execute(RiverClusterState currentState); -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java deleted file mode 100644 index 63007514470..00000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.river.RiverName; - -/** - * - */ -public class RiverNodeHelper { - - public static boolean isRiverNode(DiscoveryNode node) { - // we don't allocate rivers on client nodes - if (node.clientNode()) { - return false; - } - String river = node.attributes().get("river"); - // by default, if not set, it's a river node (better OOB exp) - if (river == null) { - return true; - } - if ("_none_".equals(river)) { - return false; - } - // there is at least one river settings, we need it - return true; - } - - public static boolean isRiverNode(DiscoveryNode node, RiverName riverName) { - if (!isRiverNode(node)) { - return false; - } - String river = node.attributes().get("river"); - // by default, if not set, its an river node (better OOB exp) - return river == null || river.contains(riverName.type()) || river.contains(riverName.name()); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java b/core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java deleted file mode 100644 index f90a928bc5e..00000000000 --- a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.dummy; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.river.AbstractRiverComponent; -import org.elasticsearch.river.River; -import org.elasticsearch.river.RiverName; -import org.elasticsearch.river.RiverSettings; - -/** - * - */ -public class DummyRiver extends AbstractRiverComponent implements River { - - @Inject - public DummyRiver(RiverName riverName, RiverSettings settings) { - super(riverName, settings); - logger.info("create"); - } - - @Override - public void start() { - logger.info("start"); - } - - @Override - public void close() { - logger.info("close"); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java b/core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java deleted file mode 100644 index a0e3057955e..00000000000 --- a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.dummy; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.river.River; - -/** - * - */ -public class DummyRiverModule extends AbstractModule { - - @Override - protected void configure() { - bind(River.class).to(DummyRiver.class).asEagerSingleton(); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java b/core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java deleted file mode 100644 index 0fe41d6f24f..00000000000 --- a/core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.routing; - -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.river.RiverName; - -import java.io.IOException; - -/** - * - */ -public class RiverRouting implements Streamable { - - private RiverName riverName; - - private DiscoveryNode node; - - private RiverRouting() { - } - - RiverRouting(RiverName riverName, DiscoveryNode node) { - this.riverName = riverName; - this.node = node; - } - - public RiverName riverName() { - return riverName; - } - - /** - * The node the river is allocated to, null if its not allocated. - */ - public DiscoveryNode node() { - return node; - } - - void node(DiscoveryNode node) { - this.node = node; - } - - public static RiverRouting readRiverRouting(StreamInput in) throws IOException { - RiverRouting routing = new RiverRouting(); - routing.readFrom(in); - return routing; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - riverName = new RiverName(in.readString(), in.readString()); - if (in.readBoolean()) { - node = DiscoveryNode.readNode(in); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(riverName.type()); - out.writeString(riverName.name()); - if (node == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - node.writeTo(out); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java b/core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java deleted file mode 100644 index 065d71d077c..00000000000 --- a/core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.routing; - -import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.river.RiverIndexName; -import org.elasticsearch.river.RiverName; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.cluster.RiverClusterState; -import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask; -import org.elasticsearch.river.cluster.RiverNodeHelper; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * - */ -public class RiversRouter extends AbstractLifecycleComponent implements ClusterStateListener { - - private static final TimeValue RIVER_START_RETRY_INTERVAL = TimeValue.timeValueMillis(1000); - private static final int RIVER_START_MAX_RETRIES = 5; - - private final String riverIndexName; - - private final Client client; - - private final RiverClusterService riverClusterService; - - private final ThreadPool threadPool; - - @Inject - public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService, ThreadPool threadPool) { - super(settings); - this.riverIndexName = RiverIndexName.Conf.indexName(settings); - this.riverClusterService = riverClusterService; - this.client = client; - this.threadPool = threadPool; - clusterService.add(this); - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public void clusterChanged(final ClusterChangedEvent event) { - if (!event.localNodeMaster()) { - return; - } - final String source = "reroute_rivers_node_changed"; - //we'll try again a few times if we don't find the river _meta document while the type is there - final CountDown countDown = new CountDown(RIVER_START_MAX_RETRIES); - riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() { - @Override - public RiverClusterState execute(RiverClusterState currentState) { - return updateRiverClusterState(source, currentState, event.state(), countDown); - } - }); - } - - protected RiverClusterState updateRiverClusterState(final String source, final RiverClusterState currentState, - ClusterState newClusterState, final CountDown countDown) { - if (!newClusterState.metaData().hasIndex(riverIndexName)) { - // if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state - if (!currentState.routing().isEmpty()) { - return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build(); - } - return currentState; - } - - RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing()); - boolean dirty = false; - IndexMetaData indexMetaData = newClusterState.metaData().index(riverIndexName); - - boolean metaFound = true; - // go over and create new river routing (with no node) for new types (rivers names) - for (ObjectCursor cursor : indexMetaData.mappings().values()) { - String mappingType = cursor.value.type(); // mapping type is the name of the river - if (MapperService.DEFAULT_MAPPING.equals(mappingType)) { - continue; - } - if (!currentState.routing().hasRiverByName(mappingType)) { - // no river, we need to add it to the routing with no node allocation - try { - GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").setPreference("_primary").get(); - if (getResponse.isExists()) { - - logger.debug("{}/{}/_meta document found.", riverIndexName, mappingType); - - String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null); - if (riverType == null) { - logger.warn("no river type provided for [{}], ignoring...", riverIndexName); - } else { - routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null)); - dirty = true; - } - } else { - // At least one type does not have _meta - metaFound = false; - } - } catch (NoShardAvailableActionException e) { - // ignore, we will get it next time... - } catch (ClusterBlockException e) { - // ignore, we will get it next time - } catch (IndexMissingException e) { - // ignore, we will get it next time - } catch (IllegalIndexShardStateException e) { - // ignore, we will get it next time - } catch (Exception e) { - logger.warn("failed to get/parse _meta for [{}]", e, mappingType); - } - } - } - - // At least one type does not have _meta, so we are - // going to reschedule some checks - if (!metaFound) { - if (countDown.countDown()) { - logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES); - } else { - logger.debug("no river _meta document found retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis()); - try { - threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() { - @Override - public void run() { - riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() { - @Override - public RiverClusterState execute(RiverClusterState currentState) { - return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown); - } - }); - } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Couldn't schedule river start retry, node might be shutting down", ex); - } - } - } - - // now, remove routings that were deleted - // also, apply nodes that were removed and rivers were running on - for (RiverRouting routing : currentState.routing()) { - if (!indexMetaData.mappings().containsKey(routing.riverName().name())) { - routingBuilder.remove(routing); - dirty = true; - } else if (routing.node() != null && !newClusterState.nodes().nodeExists(routing.node().id())) { - routingBuilder.remove(routing); - routingBuilder.put(new RiverRouting(routing.riverName(), null)); - dirty = true; - } - } - - // build a list from nodes to rivers - Map> nodesToRivers = Maps.newHashMap(); - - for (DiscoveryNode node : newClusterState.nodes()) { - if (RiverNodeHelper.isRiverNode(node)) { - nodesToRivers.put(node, Lists.newArrayList()); - } - } - - List unassigned = Lists.newArrayList(); - for (RiverRouting routing : routingBuilder.build()) { - if (routing.node() == null) { - unassigned.add(routing); - } else { - List l = nodesToRivers.get(routing.node()); - if (l == null) { - l = Lists.newArrayList(); - nodesToRivers.put(routing.node(), l); - } - l.add(routing); - } - } - for (Iterator it = unassigned.iterator(); it.hasNext(); ) { - RiverRouting routing = it.next(); - DiscoveryNode smallest = null; - int smallestSize = Integer.MAX_VALUE; - for (Map.Entry> entry : nodesToRivers.entrySet()) { - if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) { - if (entry.getValue().size() < smallestSize) { - smallestSize = entry.getValue().size(); - smallest = entry.getKey(); - } - } - } - if (smallest != null) { - dirty = true; - it.remove(); - routing.node(smallest); - nodesToRivers.get(smallest).add(routing); - logger.debug("going to allocate river [{}] on node {}", routing.riverName().getName(), smallest); - } - } - - - // add relocation logic... - - if (dirty) { - return RiverClusterState.builder().state(currentState).routing(routingBuilder).build(); - } - return currentState; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java b/core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java deleted file mode 100644 index 837e3dd709e..00000000000 --- a/core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.routing; - -import com.google.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.river.RiverName; - -import java.io.IOException; -import java.util.Iterator; - -/** - * - */ -public class RiversRouting implements Iterable { - - public static final RiversRouting EMPTY = RiversRouting.builder().build(); - - private final ImmutableMap rivers; - - private RiversRouting(ImmutableMap rivers) { - this.rivers = rivers; - } - - public boolean isEmpty() { - return rivers.isEmpty(); - } - - public RiverRouting routing(RiverName riverName) { - return rivers.get(riverName); - } - - public boolean hasRiverByName(String name) { - for (RiverName riverName : rivers.keySet()) { - if (riverName.name().equals(name)) { - return true; - } - } - return false; - } - - @Override - public Iterator iterator() { - return rivers.values().iterator(); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private MapBuilder rivers = MapBuilder.newMapBuilder(); - - public Builder routing(RiversRouting routing) { - rivers.putAll(routing.rivers); - return this; - } - - public Builder put(RiverRouting routing) { - rivers.put(routing.riverName(), routing); - return this; - } - - public Builder remove(RiverRouting routing) { - rivers.remove(routing.riverName()); - return this; - } - - public Builder remove(RiverName riverName) { - rivers.remove(riverName); - return this; - } - - public Builder remote(String riverName) { - for (RiverName name : rivers.map().keySet()) { - if (name.name().equals(riverName)) { - rivers.remove(name); - } - } - return this; - } - - public RiversRouting build() { - return new RiversRouting(rivers.immutableMap()); - } - - public static RiversRouting readFrom(StreamInput in) throws IOException { - Builder builder = new Builder(); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - builder.put(RiverRouting.readRiverRouting(in)); - } - return builder.build(); - } - - public static void writeTo(RiversRouting routing, StreamOutput out) throws IOException { - out.writeVInt(routing.rivers.size()); - for (RiverRouting riverRouting : routing) { - riverRouting.writeTo(out); - } - } - } -} diff --git a/core/src/test/java/org/elasticsearch/river/RiverTests.java b/core/src/test/java/org/elasticsearch/river/RiverTests.java deleted file mode 100644 index 1a3e0e70168..00000000000 --- a/core/src/test/java/org/elasticsearch/river/RiverTests.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.base.Predicate; - -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequestBuilder; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.river.dummy.DummyRiverModule; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.Matchers.equalTo; - -@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE) -@AwaitsFix(bugUrl="occasionally fails apparently due to synchronous mappings updates") -public class RiverTests extends ElasticsearchIntegrationTest { - - @Override - protected void beforeIndexDeletion() { - } - - @Test - public void testRiverStart() throws Exception { - startAndCheckRiverIsStarted("dummy-river-test"); - } - - @Test - public void testMultipleRiversStart() throws Exception { - int nbRivers = between(2,10); - logger.info("--> testing with {} rivers...", nbRivers); - Thread[] riverCreators = new Thread[nbRivers]; - final CountDownLatch latch = new CountDownLatch(nbRivers); - final MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); - for (int i = 0; i < nbRivers; i++) { - final String riverName = "dummy-river-test-" + i; - riverCreators[i] = new Thread() { - @Override - public void run() { - try { - startRiver(riverName); - } catch (Throwable t) { - logger.warn("failed to register river {}", t, riverName); - } finally { - latch.countDown(); - } - } - }; - riverCreators[i].start(); - multiGetRequestBuilder.add(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status"); - } - - latch.await(); - - logger.info("--> checking that all rivers were created"); - assertThat(awaitBusy(new Predicate() { - @Override - public boolean apply(Object obj) { - MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.get(); - for (MultiGetItemResponse getItemResponse : multiGetItemResponse) { - if (getItemResponse.isFailed() || !getItemResponse.getResponse().isExists()) { - return false; - } - } - return true; - } - }, 5, TimeUnit.SECONDS), equalTo(true)); - } - - /** - * Test case for https://github.com/elasticsearch/elasticsearch/issues/4577 - * River does not start when using config/templates files - */ - @Test - public void startDummyRiverWithDefaultTemplate() throws Exception { - logger.info("--> create empty template"); - client().admin().indices().preparePutTemplate("template_1") - .setTemplate("*") - .setOrder(0) - .addMapping(MapperService.DEFAULT_MAPPING, - JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING) - .endObject().endObject()) - .get(); - - startAndCheckRiverIsStarted("dummy-river-default-template-test"); - } - - /** - * Test case for https://github.com/elasticsearch/elasticsearch/issues/4577 - * River does not start when using config/templates files - */ - @Test - public void startDummyRiverWithSomeTemplates() throws Exception { - logger.info("--> create some templates"); - client().admin().indices().preparePutTemplate("template_1") - .setTemplate("*") - .setOrder(0) - .addMapping(MapperService.DEFAULT_MAPPING, - JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING) - .endObject().endObject()) - .get(); - client().admin().indices().preparePutTemplate("template_2") - .setTemplate("*") - .setOrder(0) - .addMapping("atype", - JsonXContent.contentBuilder().startObject().startObject("atype") - .endObject().endObject()) - .get(); - - startAndCheckRiverIsStarted("dummy-river-template-test"); - } - - /** - * Create a Dummy river then check it has been started. We will fail after 5 seconds. - * @param riverName Dummy river needed to be started - */ - private void startAndCheckRiverIsStarted(final String riverName) throws InterruptedException { - startRiver(riverName); - checkRiverIsStarted(riverName); - } - - private void startRiver(final String riverName) { - logger.info("--> starting river [{}]", riverName); - IndexResponse indexResponse = client().prepareIndex(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_meta") - .setSource("type", DummyRiverModule.class.getCanonicalName()).get(); - assertTrue(indexResponse.isCreated()); - ensureGreen(); - } - - private void checkRiverIsStarted(final String riverName) throws InterruptedException { - logger.info("--> checking that river [{}] was created", riverName); - assertThat(awaitBusy(new Predicate() { - @Override - public boolean apply(Object obj) { - GetResponse response = client().prepareGet(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status").get(); - return response.isExists(); - } - }, 5, TimeUnit.SECONDS), equalTo(true)); - } - -} diff --git a/docs/reference/migration/migrate_2_0.asciidoc b/docs/reference/migration/migrate_2_0.asciidoc index 0965e53ba9d..d010ca46286 100644 --- a/docs/reference/migration/migrate_2_0.asciidoc +++ b/docs/reference/migration/migrate_2_0.asciidoc @@ -9,6 +9,16 @@ your application to Elasticsearch 2.0. Elasticsearch now binds to the loopback interface by default (usually 127.0.0.1 or ::1), the setting `network.host` can be specified to change this behavior. +=== Rivers removal + +Elasticsearch does not support rivers anymore. While we had first planned to +keep them around to ease migration, keeping support for rivers proved to be +challenging as it conflicted with other important changes that we wanted to +bring to 2.0 like synchronous dynamic mappings updates, so we eventually +decided to remove them entirely. See +https://www.elastic.co/blog/deprecating_rivers for more background about why +we are moving away from rivers. + === Indices API The <> will, by default produce an error response diff --git a/docs/reference/modules/plugins.asciidoc b/docs/reference/modules/plugins.asciidoc index 40c288280cc..ecc0a9ab98c 100644 --- a/docs/reference/modules/plugins.asciidoc +++ b/docs/reference/modules/plugins.asciidoc @@ -214,49 +214,6 @@ You can disable that check using `plugins.check_lucene: false`. * https://github.com/shikhar/eskka[eskka Discovery Plugin] (by Shikhar Bhushan) * https://github.com/grantr/elasticsearch-srv-discovery[DNS SRV Discovery Plugin] (by Grant Rodgers) -[float] -[[river]] -==== River Plugins - -deprecated[1.5.0,Rivers have been deprecated. See https://www.elastic.co/blog/deprecating_rivers for more details] - -.Supported by Elasticsearch -* https://github.com/elasticsearch/elasticsearch-river-couchdb[CouchDB River Plugin] -* https://github.com/elasticsearch/elasticsearch-river-rabbitmq[RabbitMQ River Plugin] -* https://github.com/elasticsearch/elasticsearch-river-twitter[Twitter River Plugin] -* https://github.com/elasticsearch/elasticsearch-river-wikipedia[Wikipedia River Plugin] - -.Supported by the community -* https://github.com/domdorn/elasticsearch-river-activemq/[ActiveMQ River Plugin] (by Dominik Dorn) -* https://github.com/albogdano/elasticsearch-river-amazonsqs[Amazon SQS River Plugin] (by Alex Bogdanovski) -* https://github.com/xxBedy/elasticsearch-river-csv[CSV River Plugin] (by Martin Bednar) -* http://www.pilato.fr/dropbox/[Dropbox River Plugin] (by David Pilato) -* http://www.pilato.fr/fsriver/[FileSystem River Plugin] (by David Pilato) -* https://github.com/obazoud/elasticsearch-river-git[Git River Plugin] (by Olivier Bazoud) -* https://github.com/uberVU/elasticsearch-river-github[GitHub River Plugin] (by uberVU) -* https://github.com/sksamuel/elasticsearch-river-hazelcast[Hazelcast River Plugin] (by Steve Samuel) -* https://github.com/jprante/elasticsearch-river-jdbc[JDBC River Plugin] (by Jörg Prante) -* https://github.com/qotho/elasticsearch-river-jms[JMS River Plugin] (by Steve Sarandos) -* https://github.com/endgameinc/elasticsearch-river-kafka[Kafka River Plugin] (by Endgame Inc.) -* https://github.com/mariamhakobyan/elasticsearch-river-kafka[Kafka River Plugin 2] (by Mariam Hakobyan) -* https://github.com/tlrx/elasticsearch-river-ldap[LDAP River Plugin] (by Tanguy Leroux) -* https://github.com/richardwilly98/elasticsearch-river-mongodb/[MongoDB River Plugin] (by Richard Louapre) -* https://github.com/sksamuel/elasticsearch-river-neo4j[Neo4j River Plugin] (by Steve Samuel) -* https://github.com/jprante/elasticsearch-river-oai/[Open Archives Initiative (OAI) River Plugin] (by Jörg Prante) -* https://github.com/sksamuel/elasticsearch-river-redis[Redis River Plugin] (by Steve Samuel) -* https://github.com/rethinkdb/elasticsearch-river-rethinkdb[RethinkDB River Plugin] (by RethinkDB) -* http://dadoonet.github.com/rssriver/[RSS River Plugin] (by David Pilato) -* https://github.com/adamlofts/elasticsearch-river-sofa[Sofa River Plugin] (by adamlofts) -* https://github.com/javanna/elasticsearch-river-solr/[Solr River Plugin] (by Luca Cavanna) -* https://github.com/sunnygleason/elasticsearch-river-st9[St9 River Plugin] (by Sunny Gleason) -* https://github.com/plombard/SubversionRiver[Subversion River Plugin] (by Pascal Lombard) -* https://github.com/kzwang/elasticsearch-river-dynamodb[DynamoDB River Plugin] (by Kevin Wang) -* https://github.com/salyh/elasticsearch-river-imap[IMAP/POP3 Email River Plugin] (by Hendrik Saly) -* https://github.com/codelibs/elasticsearch-river-web[Web River Plugin] (by CodeLibs Project) -* https://github.com/eea/eea.elasticsearch.river.rdf[EEA ElasticSearch RDF River Plugin] (by the European Environment Agency) -* https://github.com/lbroudoux/es-amazon-s3-river[Amazon S3 River Plugin] (by Laurent Broudoux) -* https://github.com/lbroudoux/es-google-drive-river[Google Drive River Plugin] (by Laurent Broudoux) - [float] [[transport]] ==== Transport Plugins