River does not start when using config/templates files

From elasticsearch 0.90.6, when you have templates files defined in `config/templates` dir, rivers don't start anymore.

Steps to reproduce:

Create `config/templates/default.json`:

```javascript
{
  default:
  {
    "template" : "*",
    "mappings" : {
      "_default_" : {
      }
    }
  }
}
```

Start a dummy river:

```sh
curl -XPUT 'localhost:9200/_river/my_river/_meta' -d '{ "type" : "dummy" }'
```

It gives:

```
[2014-01-01 22:08:38,151][INFO ][cluster.metadata         ] [Forge] [_river] creating index, cause [auto(index api)], shards [1]/[1], mappings [_default_]
[2014-01-01 22:08:38,239][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:38,245][INFO ][cluster.metadata         ] [Forge] [_river] update_mapping [my_river] (dynamic)
[2014-01-01 22:08:38,250][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:39,244][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:39,252][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:40,246][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:40,254][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:41,246][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:41,255][INFO ][river.routing            ] [Forge] no river _meta document found, retrying in 1000 ms
[2014-01-01 22:08:42,249][WARN ][river.routing            ] [Forge] no river _meta document found after 5 attempts
[2014-01-01 22:08:42,257][WARN ][river.routing            ] [Forge] no river _meta document found after 5 attempts
```

With elasticsearch 0.90.2 or with no template file in `config/templates` dir, it gives:

```
[2014-01-01 22:22:32,096][INFO ][cluster.metadata         ] [Forge] [_river] creating index, cause [auto(index api)], shards [1]/[1], mappings []
[2014-01-01 22:22:32,221][INFO ][cluster.metadata         ] [Forge] [_river] update_mapping [my_river] (dynamic)
[2014-01-01 22:22:32,228][INFO ][river.dummy              ] [Forge] [dummy][my_river] create
[2014-01-01 22:22:32,228][INFO ][river.dummy              ] [Forge] [dummy][my_river] start
[2014-01-01 22:22:32,234][INFO ][cluster.metadata         ] [Forge] [_river] update_mapping [my_river] (dynamic)
```

Closes #4577.
Closes #4656.
This commit is contained in:
David Pilato 2014-01-08 09:45:26 +01:00
parent 8c7bbbcc8f
commit 48aaf34f4f
2 changed files with 111 additions and 32 deletions

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.river.RiverIndexName;
@ -110,7 +111,7 @@ public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> imple
}
protected RiverClusterState updateRiverClusterState(final String source, final RiverClusterState currentState,
ClusterState newClusterState, final CountDown countDown) {
ClusterState newClusterState, final CountDown countDown) {
if (!newClusterState.metaData().hasIndex(riverIndexName)) {
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
if (!currentState.routing().isEmpty()) {
@ -121,44 +122,33 @@ public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> imple
RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing());
boolean dirty = false;
IndexMetaData indexMetaData = newClusterState.metaData().index(riverIndexName);
boolean metaFound = true;
// go over and create new river routing (with no node) for new types (rivers names)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.mappings().values()) {
String mappingType = cursor.value.type(); // mapping type is the name of the river
if (MapperService.DEFAULT_MAPPING.equals(mappingType)) {
continue;
}
if (!currentState.routing().hasRiverByName(mappingType)) {
// no river, we need to add it to the routing with no node allocation
try {
GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").setPreference("_primary").get();
if (!getResponse.isExists()) {
if (countDown.countDown()) {
logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES);
if (getResponse.isExists()) {
logger.debug("{}/{}/_meta document found.", riverIndexName, mappingType);
String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null);
if (riverType == null) {
logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
} else {
logger.info("no river _meta document found, retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis());
try {
threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown);
}
});
}
});
} catch(EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
}
routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
dirty = true;
}
return currentState;
}
String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null);
if (riverType == null) {
logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
} else {
routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
dirty = true;
// At least one type does not have _meta
metaFound = false;
}
} catch (NoShardAvailableActionException e) {
// ignore, we will get it next time...
@ -173,6 +163,32 @@ public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> imple
}
}
}
// At least one type does not have _meta, so we are
// going to reschedule some checks
if (!metaFound) {
if (countDown.countDown()) {
logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES);
} else {
logger.debug("no river _meta document found retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis());
try {
threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown);
}
});
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
}
}
}
// now, remove routings that were deleted
// also, apply nodes that were removed and rivers were running on
for (RiverRouting routing : currentState.routing()) {

View File

@ -22,22 +22,84 @@ package org.elasticsearch.river;
import com.google.common.base.Predicate;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.river.dummy.DummyRiverModule;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = Scope.TEST)
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
public class RiverTests extends ElasticsearchIntegrationTest {
@Test
public void testRiverStart() throws Exception {
final String riverName = "dummy-river-test";
logger.info("--> creating river [{}]", riverName);
startAndCheckRiverIsStarted("dummy-river-test");
}
@Test
public void testMultipleRiversStart() throws Exception {
int nbRivers = between(2,10);
logger.info("--> testing with {} rivers...", nbRivers);
for (int i = 0; i < nbRivers; i++) {
final String riverName = "dummy-river-test-" + i;
startAndCheckRiverIsStarted(riverName);
}
}
/**
* Test case for https://github.com/elasticsearch/elasticsearch/issues/4577
* River does not start when using config/templates files
*/
@Test
public void startDummyRiverWithDefaultTemplate() throws Exception {
logger.info("--> create empty template");
client().admin().indices().preparePutTemplate("template_1")
.setTemplate("*")
.setOrder(0)
.addMapping(MapperService.DEFAULT_MAPPING,
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject())
.get();
startAndCheckRiverIsStarted("dummy-river-default-template-test");
}
/**
* Test case for https://github.com/elasticsearch/elasticsearch/issues/4577
* River does not start when using config/templates files
*/
@Test
public void startDummyRiverWithSomeTemplates() throws Exception {
logger.info("--> create some templates");
client().admin().indices().preparePutTemplate("template_1")
.setTemplate("*")
.setOrder(0)
.addMapping(MapperService.DEFAULT_MAPPING,
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject())
.get();
client().admin().indices().preparePutTemplate("template_2")
.setTemplate("*")
.setOrder(0)
.addMapping("atype",
JsonXContent.contentBuilder().startObject().startObject("atype")
.endObject().endObject())
.get();
startAndCheckRiverIsStarted("dummy-river-template-test");
}
/**
* Create a Dummy river then check it has been started. We will fail after 5 seconds.
* @param riverName Dummy river needed to be started
*/
private void startAndCheckRiverIsStarted(final String riverName) throws InterruptedException {
logger.info("--> starting river [{}]", riverName);
IndexResponse indexResponse = client().prepareIndex(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_meta")
.setSource("type", DummyRiverModule.class.getCanonicalName()).get();
assertTrue(indexResponse.isCreated());
@ -50,4 +112,5 @@ public class RiverTests extends ElasticsearchIntegrationTest {
}
}, 5, TimeUnit.SECONDS), equalTo(true));
}
}