Rivers: Add a _status doc for each river, closes #468.

This commit is contained in:
kimchy 2010-11-02 12:09:45 +02:00
parent ef4c44577a
commit 2cdaf6357b
1 changed files with 49 additions and 13 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.river;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
@ -35,6 +36,8 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
@ -103,27 +106,60 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
@Override protected void doClose() throws ElasticSearchException {
}
public synchronized River createRiver(RiverName riverName, Map<String, Object> settings) throws ElasticSearchException {
public synchronized void createRiver(RiverName riverName, Map<String, Object> settings) throws ElasticSearchException {
if (riversInjectors.containsKey(riverName)) {
throw new RiverException(riverName, "river already exists");
logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name());
return;
}
logger.debug("creating river [{}][{}]", riverName.type(), riverName.name());
ModulesBuilder modules = new ModulesBuilder();
modules.add(new RiverNameModule(riverName));
modules.add(new RiverModule(riverName, settings, this.settings));
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new RiverNameModule(riverName));
modules.add(new RiverModule(riverName, settings, this.settings));
Injector indexInjector = modules.createChildInjector(injector);
riversInjectors.put(riverName, indexInjector);
River river = indexInjector.getInstance(River.class);
rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();
Injector indexInjector = modules.createChildInjector(injector);
riversInjectors.put(riverName, indexInjector);
River river = indexInjector.getInstance(River.class);
rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();
// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
river.start();
return river;
// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
river.start();
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("ok", true);
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();
builder.endObject();
client.prepareIndex(riverIndexName, riverName.name(), "_status").setSource(builder).execute().actionGet();
} catch (Exception e) {
logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name());
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("error", ExceptionsHelper.detailedMessage(e));
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();
client.prepareIndex(riverIndexName, riverName.name(), "_status").setSource(builder).execute().actionGet();
} catch (Exception e1) {
logger.warn("failed to write failed status for river creation", e);
}
}
}
public synchronized void closeRiver(RiverName riverName) throws ElasticSearchException {