Schedule retry if the river type is available but the _meta document isn't
With #3782 we changed the execution order of dynamic mapping updates and index operations. We now first send the mapping update to the master node, and then we index the document. This makes sense but caused issues with rivers as they are started due to the cluster changed event that is triggered on the master node right after the mapping update has been applied, but in order for the river to be started its _meta document needs to be available, which is not the case anymore as the index operation most likely hasn't happened yet. As a result in most of the cases rivers don't get started. What we want to do is retry a few times if the _meta document wasn't found, so that the river gets started anyway. Closes #4089, #3840
This commit is contained in:
parent
d390f5250b
commit
b7cc378aeb
|
@ -85,6 +85,13 @@ public class RiverClusterService extends AbstractLifecycleComponent<RiverCluster
|
||||||
clusterStateListeners.remove(listener);
|
clusterStateListeners.remove(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current state.
|
||||||
|
*/
|
||||||
|
public ClusterState state() {
|
||||||
|
return clusterService.state();
|
||||||
|
}
|
||||||
|
|
||||||
public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) {
|
public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) {
|
||||||
if (!lifecycle.started()) {
|
if (!lifecycle.started()) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateListener;
|
import org.elasticsearch.cluster.ClusterStateListener;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -35,6 +36,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||||
import org.elasticsearch.indices.IndexMissingException;
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
|
@ -44,6 +48,7 @@ import org.elasticsearch.river.cluster.RiverClusterService;
|
||||||
import org.elasticsearch.river.cluster.RiverClusterState;
|
import org.elasticsearch.river.cluster.RiverClusterState;
|
||||||
import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask;
|
import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask;
|
||||||
import org.elasticsearch.river.cluster.RiverNodeHelper;
|
import org.elasticsearch.river.cluster.RiverNodeHelper;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -54,18 +59,24 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> implements ClusterStateListener {
|
public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> implements ClusterStateListener {
|
||||||
|
|
||||||
|
private static final TimeValue RIVER_START_RETRY_INTERVAL = TimeValue.timeValueMillis(1000);
|
||||||
|
private static final int RIVER_START_MAX_RETRIES = 5;
|
||||||
|
|
||||||
private final String riverIndexName;
|
private final String riverIndexName;
|
||||||
|
|
||||||
private final Client client;
|
private final Client client;
|
||||||
|
|
||||||
private final RiverClusterService riverClusterService;
|
private final RiverClusterService riverClusterService;
|
||||||
|
|
||||||
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService) {
|
public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService, ThreadPool threadPool) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.riverIndexName = RiverIndexName.Conf.indexName(settings);
|
this.riverIndexName = RiverIndexName.Conf.indexName(settings);
|
||||||
this.riverClusterService = riverClusterService;
|
this.riverClusterService = riverClusterService;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
this.threadPool = threadPool;
|
||||||
clusterService.add(this);
|
clusterService.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,113 +97,142 @@ public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> imple
|
||||||
if (!event.localNodeMaster()) {
|
if (!event.localNodeMaster()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
riverClusterService.submitStateUpdateTask("reroute_rivers_node_changed", new RiverClusterStateUpdateTask() {
|
final String source = "reroute_rivers_node_changed";
|
||||||
|
//we'll try again a few times if we don't find the river _meta document while the type is there
|
||||||
|
final CountDown countDown = new CountDown(RIVER_START_MAX_RETRIES);
|
||||||
|
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public RiverClusterState execute(RiverClusterState currentState) {
|
public RiverClusterState execute(RiverClusterState currentState) {
|
||||||
if (!event.state().metaData().hasIndex(riverIndexName)) {
|
return updateRiverClusterState(source, currentState, event.state(), countDown);
|
||||||
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
|
|
||||||
if (!currentState.routing().isEmpty()) {
|
|
||||||
return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build();
|
|
||||||
}
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
|
|
||||||
RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing());
|
|
||||||
boolean dirty = false;
|
|
||||||
|
|
||||||
IndexMetaData indexMetaData = event.state().metaData().index(riverIndexName);
|
|
||||||
// go over and create new river routing (with no node) for new types (rivers names)
|
|
||||||
for (MappingMetaData mappingMd : indexMetaData.mappings().values()) {
|
|
||||||
String mappingType = mappingMd.type(); // mapping type is the name of the river
|
|
||||||
if (!currentState.routing().hasRiverByName(mappingType)) {
|
|
||||||
// no river, we need to add it to the routing with no node allocation
|
|
||||||
try {
|
|
||||||
GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").execute().actionGet();
|
|
||||||
if (getResponse.isExists()) {
|
|
||||||
String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null);
|
|
||||||
if (riverType == null) {
|
|
||||||
logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
|
|
||||||
} else {
|
|
||||||
routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
|
|
||||||
dirty = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (NoShardAvailableActionException e) {
|
|
||||||
// ignore, we will get it next time...
|
|
||||||
} catch (ClusterBlockException e) {
|
|
||||||
// ignore, we will get it next time
|
|
||||||
} catch (IndexMissingException e) {
|
|
||||||
// ignore, we will get it next time
|
|
||||||
} catch (IllegalIndexShardStateException e) {
|
|
||||||
// ignore, we will get it next time
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// now, remove routings that were deleted
|
|
||||||
// also, apply nodes that were removed and rivers were running on
|
|
||||||
for (RiverRouting routing : currentState.routing()) {
|
|
||||||
if (!indexMetaData.mappings().containsKey(routing.riverName().name())) {
|
|
||||||
routingBuilder.remove(routing);
|
|
||||||
dirty = true;
|
|
||||||
} else if (routing.node() != null && !event.state().nodes().nodeExists(routing.node().id())) {
|
|
||||||
routingBuilder.remove(routing);
|
|
||||||
routingBuilder.put(new RiverRouting(routing.riverName(), null));
|
|
||||||
dirty = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// build a list from nodes to rivers
|
|
||||||
Map<DiscoveryNode, List<RiverRouting>> nodesToRivers = Maps.newHashMap();
|
|
||||||
|
|
||||||
for (DiscoveryNode node : event.state().nodes()) {
|
|
||||||
if (RiverNodeHelper.isRiverNode(node)) {
|
|
||||||
nodesToRivers.put(node, Lists.<RiverRouting>newArrayList());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
List<RiverRouting> unassigned = Lists.newArrayList();
|
|
||||||
for (RiverRouting routing : routingBuilder.build()) {
|
|
||||||
if (routing.node() == null) {
|
|
||||||
unassigned.add(routing);
|
|
||||||
} else {
|
|
||||||
List<RiverRouting> l = nodesToRivers.get(routing.node());
|
|
||||||
if (l == null) {
|
|
||||||
l = Lists.newArrayList();
|
|
||||||
nodesToRivers.put(routing.node(), l);
|
|
||||||
}
|
|
||||||
l.add(routing);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (Iterator<RiverRouting> it = unassigned.iterator(); it.hasNext(); ) {
|
|
||||||
RiverRouting routing = it.next();
|
|
||||||
DiscoveryNode smallest = null;
|
|
||||||
int smallestSize = Integer.MAX_VALUE;
|
|
||||||
for (Map.Entry<DiscoveryNode, List<RiverRouting>> entry : nodesToRivers.entrySet()) {
|
|
||||||
if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) {
|
|
||||||
if (entry.getValue().size() < smallestSize) {
|
|
||||||
smallestSize = entry.getValue().size();
|
|
||||||
smallest = entry.getKey();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (smallest != null) {
|
|
||||||
dirty = true;
|
|
||||||
it.remove();
|
|
||||||
routing.node(smallest);
|
|
||||||
nodesToRivers.get(smallest).add(routing);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// add relocation logic...
|
|
||||||
|
|
||||||
if (dirty) {
|
|
||||||
return RiverClusterState.builder().state(currentState).routing(routingBuilder).build();
|
|
||||||
}
|
|
||||||
return currentState;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RiverClusterState updateRiverClusterState(final String source, final RiverClusterState currentState,
|
||||||
|
ClusterState newClusterState, final CountDown countDown) {
|
||||||
|
if (!newClusterState.metaData().hasIndex(riverIndexName)) {
|
||||||
|
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
|
||||||
|
if (!currentState.routing().isEmpty()) {
|
||||||
|
return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build();
|
||||||
|
}
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
|
||||||
|
RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing());
|
||||||
|
boolean dirty = false;
|
||||||
|
|
||||||
|
IndexMetaData indexMetaData = newClusterState.metaData().index(riverIndexName);
|
||||||
|
// go over and create new river routing (with no node) for new types (rivers names)
|
||||||
|
for (MappingMetaData mappingMd : indexMetaData.mappings().values()) {
|
||||||
|
String mappingType = mappingMd.type(); // mapping type is the name of the river
|
||||||
|
if (!currentState.routing().hasRiverByName(mappingType)) {
|
||||||
|
// no river, we need to add it to the routing with no node allocation
|
||||||
|
try {
|
||||||
|
GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").setPreference("_primary").get();
|
||||||
|
if (!getResponse.isExists()) {
|
||||||
|
if (countDown.countDown()) {
|
||||||
|
logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES);
|
||||||
|
} else {
|
||||||
|
logger.info("no river _meta document found, retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis());
|
||||||
|
try {
|
||||||
|
threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
|
||||||
|
@Override
|
||||||
|
public RiverClusterState execute(RiverClusterState currentState) {
|
||||||
|
return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch(EsRejectedExecutionException ex) {
|
||||||
|
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null);
|
||||||
|
if (riverType == null) {
|
||||||
|
logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
|
||||||
|
} else {
|
||||||
|
routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
} catch (NoShardAvailableActionException e) {
|
||||||
|
// ignore, we will get it next time...
|
||||||
|
} catch (ClusterBlockException e) {
|
||||||
|
// ignore, we will get it next time
|
||||||
|
} catch (IndexMissingException e) {
|
||||||
|
// ignore, we will get it next time
|
||||||
|
} catch (IllegalIndexShardStateException e) {
|
||||||
|
// ignore, we will get it next time
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// now, remove routings that were deleted
|
||||||
|
// also, apply nodes that were removed and rivers were running on
|
||||||
|
for (RiverRouting routing : currentState.routing()) {
|
||||||
|
if (!indexMetaData.mappings().containsKey(routing.riverName().name())) {
|
||||||
|
routingBuilder.remove(routing);
|
||||||
|
dirty = true;
|
||||||
|
} else if (routing.node() != null && !newClusterState.nodes().nodeExists(routing.node().id())) {
|
||||||
|
routingBuilder.remove(routing);
|
||||||
|
routingBuilder.put(new RiverRouting(routing.riverName(), null));
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// build a list from nodes to rivers
|
||||||
|
Map<DiscoveryNode, List<RiverRouting>> nodesToRivers = Maps.newHashMap();
|
||||||
|
|
||||||
|
for (DiscoveryNode node : newClusterState.nodes()) {
|
||||||
|
if (RiverNodeHelper.isRiverNode(node)) {
|
||||||
|
nodesToRivers.put(node, Lists.<RiverRouting>newArrayList());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<RiverRouting> unassigned = Lists.newArrayList();
|
||||||
|
for (RiverRouting routing : routingBuilder.build()) {
|
||||||
|
if (routing.node() == null) {
|
||||||
|
unassigned.add(routing);
|
||||||
|
} else {
|
||||||
|
List<RiverRouting> l = nodesToRivers.get(routing.node());
|
||||||
|
if (l == null) {
|
||||||
|
l = Lists.newArrayList();
|
||||||
|
nodesToRivers.put(routing.node(), l);
|
||||||
|
}
|
||||||
|
l.add(routing);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (Iterator<RiverRouting> it = unassigned.iterator(); it.hasNext(); ) {
|
||||||
|
RiverRouting routing = it.next();
|
||||||
|
DiscoveryNode smallest = null;
|
||||||
|
int smallestSize = Integer.MAX_VALUE;
|
||||||
|
for (Map.Entry<DiscoveryNode, List<RiverRouting>> entry : nodesToRivers.entrySet()) {
|
||||||
|
if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) {
|
||||||
|
if (entry.getValue().size() < smallestSize) {
|
||||||
|
smallestSize = entry.getValue().size();
|
||||||
|
smallest = entry.getKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (smallest != null) {
|
||||||
|
dirty = true;
|
||||||
|
it.remove();
|
||||||
|
routing.node(smallest);
|
||||||
|
nodesToRivers.get(smallest).add(routing);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// add relocation logic...
|
||||||
|
|
||||||
|
if (dirty) {
|
||||||
|
return RiverClusterState.builder().state(currentState).routing(routingBuilder).build();
|
||||||
|
}
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue