Possible failure to start a river after cluster restart, closes #902.

This commit is contained in:
kimchy 2011-05-04 20:27:07 +03:00
parent 0386317aba
commit 015e46930d
1 changed files with 18 additions and 1 deletions

View File

@ -22,9 +22,11 @@ package org.elasticsearch.river;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
@ -36,6 +38,7 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.PluginsService;
@ -225,7 +228,21 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
}
@Override public void onFailure(Throwable e) {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
// if its this is a failure that need to be retried, then do it
// this might happen if the state of the river index has not been propagated yet to this node, which
// should happen pretty fast since we managed to get the _meta in the RiversRouter
Throwable failure = ExceptionsHelper.unwrapCause(e);
if ((failure instanceof NoShardAvailableActionException) || (failure instanceof ClusterBlockException)) {
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
final ActionListener<GetResponse> listener = this;
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").execute(listener);
}
});
} else {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
}
}
});
}