Merge pull request #11568 from jpountz/remove/rivers

Rivers removal.
This commit is contained in:
Adrien Grand 2015-06-17 08:20:48 +02:00
commit 17fac6dad5
33 changed files with 21 additions and 2345 deletions

View File

@ -54,7 +54,6 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -64,7 +63,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -1068,34 +1066,18 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexShard.indexService().mapperService();
mapperService.merge(request.type(), new CompressedXContent(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
} else {
created = operation.execute(indexShard);
}
final boolean created = operation.execute(indexShard);
// update the version on request so it will happen on the replicas
final long version = operation.version();

View File

@ -53,7 +53,6 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -62,7 +61,6 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.*;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -93,7 +91,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final AllocationService allocationService;
private final MetaDataService metaDataService;
private final Version version;
private final String riverIndexName;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final NodeEnvironment nodeEnv;
@ -101,7 +98,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
@Inject
public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator,
Version version, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, NodeEnvironment nodeEnv) {
super(settings);
this.threadPool = threadPool;
@ -110,7 +107,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
this.allocationService = allocationService;
this.metaDataService = metaDataService;
this.version = version;
this.riverIndexName = riverIndexName;
this.aliasValidator = aliasValidator;
this.nodeEnv = nodeEnv;
@ -163,7 +159,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (index.contains("#")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain '#'");
}
if (!index.equals(riverIndexName) && index.charAt(0) == '_') {
if (index.charAt(0) == '_') {
throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'");
}
if (!index.toLowerCase(Locale.ROOT).equals(index)) {
@ -306,11 +302,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
}
if (request.index().equals(ScriptService.SCRIPT_INDEX)) {
@ -318,11 +310,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.Classes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.river.RiverName;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -71,15 +70,6 @@ public class Loggers {
return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0]));
}
public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) {
List<String> l = Lists.newArrayList();
l.add(SPACE);
l.add(riverName.type());
l.add(riverName.name());
l.addAll(Lists.newArrayList(prefixes));
return getLogger(clazz, settings, l.toArray(new String[l.size()]));
}
public static ESLogger getLogger(Class clazz, Settings settings, String... prefixes) {
return getLogger(buildClassLoggerName(clazz), settings, prefixes);
}

View File

@ -81,8 +81,6 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestModule;
import org.elasticsearch.river.RiversManager;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
@ -182,7 +180,6 @@ public class Node implements Releasable {
if (settings.getAsBoolean(HTTP_ENABLED, true)) {
modules.add(new HttpServerModule(settings));
}
modules.add(new RiversModule(settings));
modules.add(new IndicesModule(settings));
modules.add(new SearchModule(settings));
modules.add(new ActionModule(false));
@ -247,7 +244,6 @@ public class Node implements Releasable {
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(RiversManager.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(ClusterService.class).start();
@ -289,8 +285,6 @@ public class Node implements Releasable {
injector.getInstance(HttpServer.class).stop();
}
injector.getInstance(RiversManager.class).stop();
injector.getInstance(SnapshotsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
@ -339,10 +333,6 @@ public class Node implements Releasable {
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
}
stopWatch.stop().start("rivers");
injector.getInstance(RiversManager.class).close();
stopWatch.stop().start("snapshot_service");
injector.getInstance(SnapshotsService.class).close();
stopWatch.stop().start("client");

View File

@ -1,52 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
/**
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public class AbstractRiverComponent implements RiverComponent {
protected final ESLogger logger;
protected final RiverName riverName;
protected final RiverSettings settings;
protected AbstractRiverComponent(RiverName riverName, RiverSettings settings) {
this.riverName = riverName;
this.settings = settings;
this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
}
@Override
public RiverName riverName() {
return riverName;
}
public String nodeName() {
return settings.globalSettings().get("name", "");
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
/**
* Allows to import data into elasticsearch via plugin
* Gets allocated on a node and eventually automatically re-allocated if needed
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public interface River extends RiverComponent {
/**
* Called whenever the river is registered on a node, which can happen when:
* 1) the river _meta document gets indexed
* 2) an already registered river gets started on a node
*/
void start();
/**
* Called when the river is closed on a node, which can happen when:
* 1) the river is deleted by deleting its type through the delete mapping api
* 2) the node where the river is allocated is shut down or the river gets rerouted to another node
*/
void close();
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
/**
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public interface RiverComponent {
RiverName riverName();
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.ElasticsearchException;
/**
*
*/
public class RiverException extends ElasticsearchException {
private final RiverName river;
public RiverException(RiverName river, String msg) {
this(river, msg, null);
}
public RiverException(RiverName river, String msg, Throwable cause) {
this(river, true, msg, cause);
}
protected RiverException(RiverName river, boolean withSpace, String msg, Throwable cause) {
super("[" + river.type() + "][" + river.name() + "]" + (withSpace ? " " : "") + msg, cause);
this.river = river;
}
public RiverName riverName() {
return river;
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.inject.BindingAnnotation;
import org.elasticsearch.common.settings.Settings;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
*
*/
@BindingAnnotation
@Target({FIELD, PARAMETER})
@Retention(RUNTIME)
@Documented
public @interface RiverIndexName {
static class Conf {
public static final String DEFAULT_INDEX_NAME = "_river";
public static String indexName(Settings settings) {
return settings.get("river.index_name", DEFAULT_INDEX_NAME);
}
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.NoClassSettingsException;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.common.Strings.toCamelCase;
/**
*
*/
public class RiverModule extends AbstractModule implements SpawnModules {
private RiverName riverName;
private final Settings globalSettings;
private final Map<String, Object> settings;
private final RiversTypesRegistry typesRegistry;
public RiverModule(RiverName riverName, Map<String, Object> settings, Settings globalSettings, RiversTypesRegistry typesRegistry) {
this.riverName = riverName;
this.globalSettings = globalSettings;
this.settings = settings;
this.typesRegistry = typesRegistry;
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(loadTypeModule(riverName.type(), "org.elasticsearch.river.", "RiverModule"), globalSettings));
}
@Override
protected void configure() {
bind(RiverSettings.class).toInstance(new RiverSettings(globalSettings, settings));
}
private Class<? extends Module> loadTypeModule(String type, String prefixPackage, String suffixClassName) {
Class<? extends Module> registered = typesRegistry.type(type);
if (registered != null) {
return registered;
}
String fullClassName = type;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e) {
fullClassName = prefixPackage + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e1) {
fullClassName = prefixPackage + toCamelCase(type) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e2) {
fullClassName = prefixPackage + toCamelCase(type).toLowerCase(Locale.ROOT) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e3) {
throw new NoClassSettingsException("Failed to load class with value [" + type + "]", e);
}
}
}
}
}
}

View File

@ -1,74 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import java.io.Serializable;
/**
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public class RiverName implements Serializable {
private final String type;
private final String name;
public RiverName(String type, String name) {
this.type = type;
this.name = name;
}
public String type() {
return this.type;
}
public String getType() {
return type();
}
public String name() {
return this.name;
}
public String getName() {
return name();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RiverName that = (RiverName) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
return true;
}
@Override
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.inject.AbstractModule;
/**
*
*/
public class RiverNameModule extends AbstractModule {
private final RiverName riverName;
public RiverNameModule(RiverName riverName) {
this.riverName = riverName;
}
@Override
protected void configure() {
bind(RiverName.class).toInstance(riverName);
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
* (shayy.banon)
*/
public class RiverSettings {
private final Settings globalSettings;
private final Map<String, Object> settings;
public RiverSettings(Settings globalSettings, Map<String, Object> settings) {
this.globalSettings = globalSettings;
this.settings = settings;
}
public Settings globalSettings() {
return globalSettings;
}
public Map<String, Object> settings() {
return settings;
}
}

View File

@ -1,67 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.routing.RiversRouter;
/**
*
*/
public class RiversManager extends AbstractLifecycleComponent<RiversManager> {
private final RiversService riversService;
private final RiverClusterService clusterService;
private final RiversRouter riversRouter;
@Inject
public RiversManager(Settings settings, RiversService riversService, RiverClusterService clusterService, RiversRouter riversRouter) {
super(settings);
this.riversService = riversService;
this.clusterService = clusterService;
this.riversRouter = riversRouter;
}
@Override
protected void doStart() {
riversRouter.start();
riversService.start();
clusterService.start();
}
@Override
protected void doStop() {
riversRouter.stop();
clusterService.stop();
riversService.stop();
}
@Override
protected void doClose() {
riversRouter.close();
clusterService.close();
riversService.close();
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.routing.RiversRouter;
import java.util.Map;
/**
*
*/
public class RiversModule extends AbstractModule {
private final Settings settings;
private Map<String, Class<? extends Module>> riverTypes = Maps.newHashMap();
public RiversModule(Settings settings) {
this.settings = settings;
}
/**
* Registers a custom river type name against a module.
*
* @param type The type
* @param module The module
*/
public void registerRiver(String type, Class<? extends Module> module) {
riverTypes.put(type, module);
}
@Override
protected void configure() {
bind(String.class).annotatedWith(RiverIndexName.class).toInstance(RiverIndexName.Conf.indexName(settings));
bind(RiversService.class).asEagerSingleton();
bind(RiverClusterService.class).asEagerSingleton();
bind(RiversRouter.class).asEagerSingleton();
bind(RiversManager.class).asEagerSingleton();
bind(RiversTypesRegistry.class).toInstance(new RiversTypesRegistry(ImmutableMap.copyOf(riverTypes)));
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
/**
* A module that simply calls the {@link PluginsService#processModule(org.elasticsearch.common.inject.Module)}
* in order to allow plugins to pre process specific river modules.
*/
public class RiversPluginsModule extends AbstractModule implements PreProcessModule {
private final Settings settings;
private final PluginsService pluginsService;
public RiversPluginsModule(Settings settings, PluginsService pluginsService) {
this.settings = settings;
this.pluginsService = pluginsService;
}
@Override
public void processModule(Module module) {
pluginsService.processModule(module);
}
@Override
protected void configure() {
}
}

View File

@ -1,279 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateListener;
import org.elasticsearch.river.routing.RiverRouting;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/**
*
*/
public class RiversService extends AbstractLifecycleComponent<RiversService> {
private final String riverIndexName;
private Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final RiversTypesRegistry typesRegistry;
private final Injector injector;
private final Map<RiverName, Injector> riversInjectors = Maps.newHashMap();
private volatile ImmutableMap<RiverName, River> rivers = ImmutableMap.of();
@Inject
public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiversTypesRegistry typesRegistry, RiverClusterService riverClusterService, Injector injector) {
super(settings);
this.riverIndexName = RiverIndexName.Conf.indexName(settings);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.typesRegistry = typesRegistry;
this.injector = injector;
riverClusterService.add(new ApplyRivers());
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
ImmutableSet<RiverName> indices = ImmutableSet.copyOf(this.rivers.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size());
for (final RiverName riverName : indices) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
closeRiver(riverName);
} catch (Exception e) {
logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name());
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
}
@Override
protected void doClose() {
}
public synchronized void createRiver(RiverName riverName, Map<String, Object> settings) {
if (riversInjectors.containsKey(riverName)) {
logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name());
return;
}
logger.info("rivers have been deprecated. Read https://www.elastic.co/blog/deprecating_rivers");
logger.debug("creating river [{}][{}]", riverName.type(), riverName.name());
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new RiverNameModule(riverName));
modules.add(new RiverModule(riverName, settings, this.settings, typesRegistry));
modules.add(new RiversPluginsModule(this.settings, injector.getInstance(PluginsService.class)));
Injector indexInjector = modules.createChildInjector(injector);
riversInjectors.put(riverName, indexInjector);
River river = indexInjector.getInstance(River.class);
rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();
// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
river.start();
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();
builder.endObject();
client.prepareIndex(riverIndexName, riverName.name(), "_status")
.setConsistencyLevel(WriteConsistencyLevel.ONE)
.setSource(builder).execute().actionGet();
} catch (Exception e) {
logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name());
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("error", ExceptionsHelper.detailedMessage(e));
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();
builder.endObject();
client.prepareIndex(riverIndexName, riverName.name(), "_status")
.setConsistencyLevel(WriteConsistencyLevel.ONE)
.setSource(builder).execute().actionGet();
} catch (Exception e1) {
logger.warn("failed to write failed status for river creation", e);
}
}
}
public synchronized void closeRiver(RiverName riverName) {
Injector riverInjector;
River river;
synchronized (this) {
riverInjector = riversInjectors.remove(riverName);
if (riverInjector == null) {
throw new RiverException(riverName, "missing");
}
logger.debug("closing river [{}][{}]", riverName.type(), riverName.name());
Map<RiverName, River> tmpMap = Maps.newHashMap(rivers);
river = tmpMap.remove(riverName);
rivers = ImmutableMap.copyOf(tmpMap);
}
river.close();
}
private class ApplyRivers implements RiverClusterStateListener {
@Override
public void riverClusterChanged(RiverClusterChangedEvent event) {
DiscoveryNode localNode = clusterService.localNode();
RiverClusterState state = event.state();
// first, go over and delete ones that either don't exists or are not allocated
for (final RiverName riverName : rivers.keySet()) {
RiverRouting routing = state.routing().routing(riverName);
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
closeRiver(riverName);
}
}
for (final RiverRouting routing : state.routing()) {
// not allocated
if (routing.node() == null) {
logger.trace("river {} has no routing node", routing.riverName().getName());
continue;
}
// only apply changes to the local node
if (!routing.node().equals(localNode)) {
logger.trace("river {} belongs to node {}", routing.riverName().getName(), routing.node());
continue;
}
// if its already created, ignore it
if (rivers.containsKey(routing.riverName())) {
logger.trace("river {} is already allocated", routing.riverName().getName());
continue;
}
prepareGetMetaDocument(routing.riverName().name()).execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
if (!rivers.containsKey(routing.riverName())) {
if (getResponse.isExists()) {
// only create the river if it exists, otherwise, the indexing meta data has not been visible yet...
createRiver(routing.riverName(), getResponse.getSourceAsMap());
} else {
//this should never happen as we've just found the _meta document in RiversRouter
logger.warn("{}/{}/_meta document not found", riverIndexName, routing.riverName().getName());
}
}
}
@Override
public void onFailure(Throwable e) {
// if its this is a failure that need to be retried, then do it
// this might happen if the state of the river index has not been propagated yet to this node, which
// should happen pretty fast since we managed to get the _meta in the RiversRouter
Throwable failure = ExceptionsHelper.unwrapCause(e);
if (isShardNotAvailableException(failure)) {
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
final ActionListener<GetResponse> listener = this;
try {
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.LISTENER, new Runnable() {
@Override
public void run() {
prepareGetMetaDocument(routing.riverName().name()).execute(listener);
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
}
} else {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
}
}
}));
}
}
private GetRequestBuilder prepareGetMetaDocument(String riverName) {
return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary");
}
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Module;
/**
* A type registry for rivers
*/
public class RiversTypesRegistry {
private final ImmutableMap<String, Class<? extends Module>> riverTypes;
public RiversTypesRegistry(ImmutableMap<String, Class<? extends Module>> riverTypes) {
this.riverTypes = riverTypes;
}
public Class<? extends Module> type(String type) {
return riverTypes.get(type);
}
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
/**
*
*/
public class PublishRiverClusterStateAction extends AbstractComponent {
public static final String ACTION_NAME = "internal:river/state/publish";
public interface NewClusterStateListener {
void onNewClusterState(RiverClusterState clusterState);
}
private final TransportService transportService;
private final ClusterService clusterService;
private final NewClusterStateListener listener;
public PublishRiverClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService,
NewClusterStateListener listener) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.listener = listener;
transportService.registerRequestHandler(ACTION_NAME, PublishClusterStateRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler());
}
public void close() {
transportService.removeHandler(ACTION_NAME);
}
public void publish(RiverClusterState clusterState) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.localNode();
for (final DiscoveryNode node : discoNodes) {
if (node.equals(localNode)) {
// no need to send to our self
continue;
}
// we only want to send nodes that are either possible master nodes or river nodes
// master nodes because they will handle the state and the allocation of rivers
// and river nodes since they will end up creating indexes
if (!node.masterNode() && !RiverNodeHelper.isRiverNode(node)) {
continue;
}
transportService.sendRequest(node, ACTION_NAME, new PublishClusterStateRequest(clusterState), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
});
}
}
static class PublishClusterStateRequest extends TransportRequest {
private RiverClusterState clusterState;
PublishClusterStateRequest() {
}
private PublishClusterStateRequest(RiverClusterState clusterState) {
this.clusterState = clusterState;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterState = RiverClusterState.Builder.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
RiverClusterState.Builder.writeTo(clusterState, out);
}
}
private class PublishClusterStateRequestHandler implements TransportRequestHandler<PublishClusterStateRequest> {
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
/**
*
*/
public class RiverClusterChangedEvent {
private final String source;
private final RiverClusterState previousState;
private final RiverClusterState state;
public RiverClusterChangedEvent(String source, RiverClusterState state, RiverClusterState previousState) {
this.source = source;
this.state = state;
this.previousState = previousState;
}
/**
* The source that caused this cluster event to be raised.
*/
public String source() {
return this.source;
}
public RiverClusterState state() {
return this.state;
}
public RiverClusterState previousState() {
return this.previousState;
}
}

View File

@ -1,170 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/**
*
*/
public class RiverClusterService extends AbstractLifecycleComponent<RiverClusterService> {
private final ClusterService clusterService;
private final PublishRiverClusterStateAction publishAction;
private final List<RiverClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ExecutorService updateTasksExecutor;
private volatile RiverClusterState clusterState = RiverClusterState.builder().build();
@Inject
public RiverClusterService(Settings settings, TransportService transportService, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
this.publishAction = new PublishRiverClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener());
}
@Override
protected void doStart() {
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "riverClusterService#updateTask"));
}
@Override
protected void doStop() {
updateTasksExecutor.shutdown();
try {
updateTasksExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
}
@Override
protected void doClose() {
}
public void add(RiverClusterStateListener listener) {
clusterStateListeners.add(listener);
}
public void remove(RiverClusterStateListener listener) {
clusterStateListeners.remove(listener);
}
/**
* The current state.
*/
public ClusterState state() {
return clusterService.state();
}
public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) {
return;
}
updateTasksExecutor.execute(new Runnable() {
@Override
public void run() {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
RiverClusterState previousClusterState = clusterState;
try {
clusterState = updateTask.execute(previousClusterState);
} catch (Exception e) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
logger.warn(sb.toString(), e);
return;
}
if (previousClusterState != clusterState) {
if (clusterService.state().nodes().localNodeMaster()) {
// only the master controls the version numbers
clusterState = new RiverClusterState(clusterState.version() + 1, clusterState);
} else {
// we got this cluster state from the master, filter out based on versions (don't call listeners)
if (clusterState.version() < previousClusterState.version()) {
logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
return;
}
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
}
RiverClusterChangedEvent clusterChangedEvent = new RiverClusterChangedEvent(source, clusterState, previousClusterState);
for (RiverClusterStateListener listener : clusterStateListeners) {
listener.riverClusterChanged(clusterChangedEvent);
}
// if we are the master, publish the new state to all nodes
if (clusterService.state().nodes().localNodeMaster()) {
publishAction.publish(clusterState);
}
logger.debug("processing [{}]: done applying updated cluster_state", source);
} else {
logger.debug("processing [{}]: no change in cluster_state", source);
}
}
});
}
private class UpdateClusterStateListener implements PublishRiverClusterStateAction.NewClusterStateListener {
@Override
public void onNewClusterState(final RiverClusterState clusterState) {
ClusterState state = clusterService.state();
if (state.nodes().localNodeMaster()) {
logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode());
return;
}
submitStateUpdateTask("received_state", new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return clusterState;
}
});
}
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.river.routing.RiversRouting;
import java.io.IOException;
/**
*
*/
public class RiverClusterState {
private final long version;
private final RiversRouting routing;
public RiverClusterState(long version, RiverClusterState state) {
this.version = version;
this.routing = state.routing();
}
RiverClusterState(long version, RiversRouting routing) {
this.version = version;
this.routing = routing;
}
public long version() {
return this.version;
}
public RiversRouting routing() {
return routing;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long version = 0;
private RiversRouting routing = RiversRouting.EMPTY;
public Builder state(RiverClusterState state) {
this.version = state.version();
this.routing = state.routing();
return this;
}
public Builder routing(RiversRouting.Builder builder) {
return routing(builder.build());
}
public Builder routing(RiversRouting routing) {
this.routing = routing;
return this;
}
public RiverClusterState build() {
return new RiverClusterState(version, routing);
}
public static RiverClusterState readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
builder.version = in.readVLong();
builder.routing = RiversRouting.Builder.readFrom(in);
return builder.build();
}
public static void writeTo(RiverClusterState clusterState, StreamOutput out) throws IOException {
out.writeVLong(clusterState.version);
RiversRouting.Builder.writeTo(clusterState.routing, out);
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
/**
*
*/
public interface RiverClusterStateListener {
void riverClusterChanged(RiverClusterChangedEvent event);
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
/**
*
*/
public interface RiverClusterStateUpdateTask {
RiverClusterState execute(RiverClusterState currentState);
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.river.RiverName;
/**
*
*/
public class RiverNodeHelper {
public static boolean isRiverNode(DiscoveryNode node) {
// we don't allocate rivers on client nodes
if (node.clientNode()) {
return false;
}
String river = node.attributes().get("river");
// by default, if not set, it's a river node (better OOB exp)
if (river == null) {
return true;
}
if ("_none_".equals(river)) {
return false;
}
// there is at least one river settings, we need it
return true;
}
public static boolean isRiverNode(DiscoveryNode node, RiverName riverName) {
if (!isRiverNode(node)) {
return false;
}
String river = node.attributes().get("river");
// by default, if not set, its an river node (better OOB exp)
return river == null || river.contains(riverName.type()) || river.contains(riverName.name());
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.dummy;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
/**
*
*/
public class DummyRiver extends AbstractRiverComponent implements River {
@Inject
public DummyRiver(RiverName riverName, RiverSettings settings) {
super(riverName, settings);
logger.info("create");
}
@Override
public void start() {
logger.info("start");
}
@Override
public void close() {
logger.info("close");
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.dummy;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;
/**
*
*/
public class DummyRiverModule extends AbstractModule {
@Override
protected void configure() {
bind(River.class).to(DummyRiver.class).asEagerSingleton();
}
}

View File

@ -1,87 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.river.RiverName;
import java.io.IOException;
/**
*
*/
public class RiverRouting implements Streamable {
private RiverName riverName;
private DiscoveryNode node;
private RiverRouting() {
}
RiverRouting(RiverName riverName, DiscoveryNode node) {
this.riverName = riverName;
this.node = node;
}
public RiverName riverName() {
return riverName;
}
/**
* The node the river is allocated to, <tt>null</tt> if its not allocated.
*/
public DiscoveryNode node() {
return node;
}
void node(DiscoveryNode node) {
this.node = node;
}
public static RiverRouting readRiverRouting(StreamInput in) throws IOException {
RiverRouting routing = new RiverRouting();
routing.readFrom(in);
return routing;
}
@Override
public void readFrom(StreamInput in) throws IOException {
riverName = new RiverName(in.readString(), in.readString());
if (in.readBoolean()) {
node = DiscoveryNode.readNode(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(riverName.type());
out.writeString(riverName.name());
if (node == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
node.writeTo(out);
}
}
}

View File

@ -1,255 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask;
import org.elasticsearch.river.cluster.RiverNodeHelper;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
*/
public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> implements ClusterStateListener {
private static final TimeValue RIVER_START_RETRY_INTERVAL = TimeValue.timeValueMillis(1000);
private static final int RIVER_START_MAX_RETRIES = 5;
private final String riverIndexName;
private final Client client;
private final RiverClusterService riverClusterService;
private final ThreadPool threadPool;
@Inject
public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService, ThreadPool threadPool) {
super(settings);
this.riverIndexName = RiverIndexName.Conf.indexName(settings);
this.riverClusterService = riverClusterService;
this.client = client;
this.threadPool = threadPool;
clusterService.add(this);
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
return;
}
final String source = "reroute_rivers_node_changed";
//we'll try again a few times if we don't find the river _meta document while the type is there
final CountDown countDown = new CountDown(RIVER_START_MAX_RETRIES);
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return updateRiverClusterState(source, currentState, event.state(), countDown);
}
});
}
protected RiverClusterState updateRiverClusterState(final String source, final RiverClusterState currentState,
ClusterState newClusterState, final CountDown countDown) {
if (!newClusterState.metaData().hasIndex(riverIndexName)) {
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
if (!currentState.routing().isEmpty()) {
return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build();
}
return currentState;
}
RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing());
boolean dirty = false;
IndexMetaData indexMetaData = newClusterState.metaData().index(riverIndexName);
boolean metaFound = true;
// go over and create new river routing (with no node) for new types (rivers names)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.mappings().values()) {
String mappingType = cursor.value.type(); // mapping type is the name of the river
if (MapperService.DEFAULT_MAPPING.equals(mappingType)) {
continue;
}
if (!currentState.routing().hasRiverByName(mappingType)) {
// no river, we need to add it to the routing with no node allocation
try {
GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").setPreference("_primary").get();
if (getResponse.isExists()) {
logger.debug("{}/{}/_meta document found.", riverIndexName, mappingType);
String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null);
if (riverType == null) {
logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
} else {
routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
dirty = true;
}
} else {
// At least one type does not have _meta
metaFound = false;
}
} catch (NoShardAvailableActionException e) {
// ignore, we will get it next time...
} catch (ClusterBlockException e) {
// ignore, we will get it next time
} catch (IndexMissingException e) {
// ignore, we will get it next time
} catch (IllegalIndexShardStateException e) {
// ignore, we will get it next time
} catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
}
}
}
// At least one type does not have _meta, so we are
// going to reschedule some checks
if (!metaFound) {
if (countDown.countDown()) {
logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES);
} else {
logger.debug("no river _meta document found retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis());
try {
threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown);
}
});
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
}
}
}
// now, remove routings that were deleted
// also, apply nodes that were removed and rivers were running on
for (RiverRouting routing : currentState.routing()) {
if (!indexMetaData.mappings().containsKey(routing.riverName().name())) {
routingBuilder.remove(routing);
dirty = true;
} else if (routing.node() != null && !newClusterState.nodes().nodeExists(routing.node().id())) {
routingBuilder.remove(routing);
routingBuilder.put(new RiverRouting(routing.riverName(), null));
dirty = true;
}
}
// build a list from nodes to rivers
Map<DiscoveryNode, List<RiverRouting>> nodesToRivers = Maps.newHashMap();
for (DiscoveryNode node : newClusterState.nodes()) {
if (RiverNodeHelper.isRiverNode(node)) {
nodesToRivers.put(node, Lists.<RiverRouting>newArrayList());
}
}
List<RiverRouting> unassigned = Lists.newArrayList();
for (RiverRouting routing : routingBuilder.build()) {
if (routing.node() == null) {
unassigned.add(routing);
} else {
List<RiverRouting> l = nodesToRivers.get(routing.node());
if (l == null) {
l = Lists.newArrayList();
nodesToRivers.put(routing.node(), l);
}
l.add(routing);
}
}
for (Iterator<RiverRouting> it = unassigned.iterator(); it.hasNext(); ) {
RiverRouting routing = it.next();
DiscoveryNode smallest = null;
int smallestSize = Integer.MAX_VALUE;
for (Map.Entry<DiscoveryNode, List<RiverRouting>> entry : nodesToRivers.entrySet()) {
if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) {
if (entry.getValue().size() < smallestSize) {
smallestSize = entry.getValue().size();
smallest = entry.getKey();
}
}
}
if (smallest != null) {
dirty = true;
it.remove();
routing.node(smallest);
nodesToRivers.get(smallest).add(routing);
logger.debug("going to allocate river [{}] on node {}", routing.riverName().getName(), smallest);
}
}
// add relocation logic...
if (dirty) {
return RiverClusterState.builder().state(currentState).routing(routingBuilder).build();
}
return currentState;
}
}

View File

@ -1,123 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.river.RiverName;
import java.io.IOException;
import java.util.Iterator;
/**
*
*/
public class RiversRouting implements Iterable<RiverRouting> {
public static final RiversRouting EMPTY = RiversRouting.builder().build();
private final ImmutableMap<RiverName, RiverRouting> rivers;
private RiversRouting(ImmutableMap<RiverName, RiverRouting> rivers) {
this.rivers = rivers;
}
public boolean isEmpty() {
return rivers.isEmpty();
}
public RiverRouting routing(RiverName riverName) {
return rivers.get(riverName);
}
public boolean hasRiverByName(String name) {
for (RiverName riverName : rivers.keySet()) {
if (riverName.name().equals(name)) {
return true;
}
}
return false;
}
@Override
public Iterator<RiverRouting> iterator() {
return rivers.values().iterator();
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private MapBuilder<RiverName, RiverRouting> rivers = MapBuilder.newMapBuilder();
public Builder routing(RiversRouting routing) {
rivers.putAll(routing.rivers);
return this;
}
public Builder put(RiverRouting routing) {
rivers.put(routing.riverName(), routing);
return this;
}
public Builder remove(RiverRouting routing) {
rivers.remove(routing.riverName());
return this;
}
public Builder remove(RiverName riverName) {
rivers.remove(riverName);
return this;
}
public Builder remote(String riverName) {
for (RiverName name : rivers.map().keySet()) {
if (name.name().equals(riverName)) {
rivers.remove(name);
}
}
return this;
}
public RiversRouting build() {
return new RiversRouting(rivers.immutableMap());
}
public static RiversRouting readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(RiverRouting.readRiverRouting(in));
}
return builder.build();
}
public static void writeTo(RiversRouting routing, StreamOutput out) throws IOException {
out.writeVInt(routing.rivers.size());
for (RiverRouting riverRouting : routing) {
riverRouting.writeTo(out);
}
}
}
}

View File

@ -1,168 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.river.dummy.DummyRiverModule;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
@AwaitsFix(bugUrl="occasionally fails apparently due to synchronous mappings updates")
public class RiverTests extends ElasticsearchIntegrationTest {
@Override
protected void beforeIndexDeletion() {
}
@Test
public void testRiverStart() throws Exception {
startAndCheckRiverIsStarted("dummy-river-test");
}
@Test
public void testMultipleRiversStart() throws Exception {
int nbRivers = between(2,10);
logger.info("--> testing with {} rivers...", nbRivers);
Thread[] riverCreators = new Thread[nbRivers];
final CountDownLatch latch = new CountDownLatch(nbRivers);
final MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
for (int i = 0; i < nbRivers; i++) {
final String riverName = "dummy-river-test-" + i;
riverCreators[i] = new Thread() {
@Override
public void run() {
try {
startRiver(riverName);
} catch (Throwable t) {
logger.warn("failed to register river {}", t, riverName);
} finally {
latch.countDown();
}
}
};
riverCreators[i].start();
multiGetRequestBuilder.add(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status");
}
latch.await();
logger.info("--> checking that all rivers were created");
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object obj) {
MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.get();
for (MultiGetItemResponse getItemResponse : multiGetItemResponse) {
if (getItemResponse.isFailed() || !getItemResponse.getResponse().isExists()) {
return false;
}
}
return true;
}
}, 5, TimeUnit.SECONDS), equalTo(true));
}
/**
* Test case for https://github.com/elasticsearch/elasticsearch/issues/4577
* River does not start when using config/templates files
*/
@Test
public void startDummyRiverWithDefaultTemplate() throws Exception {
logger.info("--> create empty template");
client().admin().indices().preparePutTemplate("template_1")
.setTemplate("*")
.setOrder(0)
.addMapping(MapperService.DEFAULT_MAPPING,
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject())
.get();
startAndCheckRiverIsStarted("dummy-river-default-template-test");
}
/**
* Test case for https://github.com/elasticsearch/elasticsearch/issues/4577
* River does not start when using config/templates files
*/
@Test
public void startDummyRiverWithSomeTemplates() throws Exception {
logger.info("--> create some templates");
client().admin().indices().preparePutTemplate("template_1")
.setTemplate("*")
.setOrder(0)
.addMapping(MapperService.DEFAULT_MAPPING,
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject())
.get();
client().admin().indices().preparePutTemplate("template_2")
.setTemplate("*")
.setOrder(0)
.addMapping("atype",
JsonXContent.contentBuilder().startObject().startObject("atype")
.endObject().endObject())
.get();
startAndCheckRiverIsStarted("dummy-river-template-test");
}
/**
* Create a Dummy river then check it has been started. We will fail after 5 seconds.
* @param riverName Dummy river needed to be started
*/
private void startAndCheckRiverIsStarted(final String riverName) throws InterruptedException {
startRiver(riverName);
checkRiverIsStarted(riverName);
}
private void startRiver(final String riverName) {
logger.info("--> starting river [{}]", riverName);
IndexResponse indexResponse = client().prepareIndex(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_meta")
.setSource("type", DummyRiverModule.class.getCanonicalName()).get();
assertTrue(indexResponse.isCreated());
ensureGreen();
}
private void checkRiverIsStarted(final String riverName) throws InterruptedException {
logger.info("--> checking that river [{}] was created", riverName);
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object obj) {
GetResponse response = client().prepareGet(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status").get();
return response.isExists();
}
}, 5, TimeUnit.SECONDS), equalTo(true));
}
}

View File

@ -9,6 +9,16 @@ your application to Elasticsearch 2.0.
Elasticsearch now binds to the loopback interface by default (usually 127.0.0.1
or ::1), the setting `network.host` can be specified to change this behavior.
=== Rivers removal
Elasticsearch does not support rivers anymore. While we had first planned to
keep them around to ease migration, keeping support for rivers proved to be
challenging as it conflicted with other important changes that we wanted to
bring to 2.0 like synchronous dynamic mappings updates, so we eventually
decided to remove them entirely. See
https://www.elastic.co/blog/deprecating_rivers for more background about why
we are moving away from rivers.
=== Indices API
The <<alias-retrieving, get alias api>> will, by default produce an error response

View File

@ -214,49 +214,6 @@ You can disable that check using `plugins.check_lucene: false`.
* https://github.com/shikhar/eskka[eskka Discovery Plugin] (by Shikhar Bhushan)
* https://github.com/grantr/elasticsearch-srv-discovery[DNS SRV Discovery Plugin] (by Grant Rodgers)
[float]
[[river]]
==== River Plugins
deprecated[1.5.0,Rivers have been deprecated. See https://www.elastic.co/blog/deprecating_rivers for more details]
.Supported by Elasticsearch
* https://github.com/elasticsearch/elasticsearch-river-couchdb[CouchDB River Plugin]
* https://github.com/elasticsearch/elasticsearch-river-rabbitmq[RabbitMQ River Plugin]
* https://github.com/elasticsearch/elasticsearch-river-twitter[Twitter River Plugin]
* https://github.com/elasticsearch/elasticsearch-river-wikipedia[Wikipedia River Plugin]
.Supported by the community
* https://github.com/domdorn/elasticsearch-river-activemq/[ActiveMQ River Plugin] (by Dominik Dorn)
* https://github.com/albogdano/elasticsearch-river-amazonsqs[Amazon SQS River Plugin] (by Alex Bogdanovski)
* https://github.com/xxBedy/elasticsearch-river-csv[CSV River Plugin] (by Martin Bednar)
* http://www.pilato.fr/dropbox/[Dropbox River Plugin] (by David Pilato)
* http://www.pilato.fr/fsriver/[FileSystem River Plugin] (by David Pilato)
* https://github.com/obazoud/elasticsearch-river-git[Git River Plugin] (by Olivier Bazoud)
* https://github.com/uberVU/elasticsearch-river-github[GitHub River Plugin] (by uberVU)
* https://github.com/sksamuel/elasticsearch-river-hazelcast[Hazelcast River Plugin] (by Steve Samuel)
* https://github.com/jprante/elasticsearch-river-jdbc[JDBC River Plugin] (by Jörg Prante)
* https://github.com/qotho/elasticsearch-river-jms[JMS River Plugin] (by Steve Sarandos)
* https://github.com/endgameinc/elasticsearch-river-kafka[Kafka River Plugin] (by Endgame Inc.)
* https://github.com/mariamhakobyan/elasticsearch-river-kafka[Kafka River Plugin 2] (by Mariam Hakobyan)
* https://github.com/tlrx/elasticsearch-river-ldap[LDAP River Plugin] (by Tanguy Leroux)
* https://github.com/richardwilly98/elasticsearch-river-mongodb/[MongoDB River Plugin] (by Richard Louapre)
* https://github.com/sksamuel/elasticsearch-river-neo4j[Neo4j River Plugin] (by Steve Samuel)
* https://github.com/jprante/elasticsearch-river-oai/[Open Archives Initiative (OAI) River Plugin] (by Jörg Prante)
* https://github.com/sksamuel/elasticsearch-river-redis[Redis River Plugin] (by Steve Samuel)
* https://github.com/rethinkdb/elasticsearch-river-rethinkdb[RethinkDB River Plugin] (by RethinkDB)
* http://dadoonet.github.com/rssriver/[RSS River Plugin] (by David Pilato)
* https://github.com/adamlofts/elasticsearch-river-sofa[Sofa River Plugin] (by adamlofts)
* https://github.com/javanna/elasticsearch-river-solr/[Solr River Plugin] (by Luca Cavanna)
* https://github.com/sunnygleason/elasticsearch-river-st9[St9 River Plugin] (by Sunny Gleason)
* https://github.com/plombard/SubversionRiver[Subversion River Plugin] (by Pascal Lombard)
* https://github.com/kzwang/elasticsearch-river-dynamodb[DynamoDB River Plugin] (by Kevin Wang)
* https://github.com/salyh/elasticsearch-river-imap[IMAP/POP3 Email River Plugin] (by Hendrik Saly)
* https://github.com/codelibs/elasticsearch-river-web[Web River Plugin] (by CodeLibs Project)
* https://github.com/eea/eea.elasticsearch.river.rdf[EEA ElasticSearch RDF River Plugin] (by the European Environment Agency)
* https://github.com/lbroudoux/es-amazon-s3-river[Amazon S3 River Plugin] (by Laurent Broudoux)
* https://github.com/lbroudoux/es-google-drive-river[Google Drive River Plugin] (by Laurent Broudoux)
[float]
[[transport]]
==== Transport Plugins