Tribe nodes should apply cluster state updates in batches

This commit applies the general mechanism for applying cluster state
updates in batches to tribe nodes.

Relates #14899, relates #14725
This commit is contained in:
Jason Tedor 2015-11-24 20:39:26 -05:00
parent 6f2c36dcb7
commit 53b3cd83a5
1 changed files with 157 additions and 118 deletions

View File

@ -20,14 +20,14 @@
package org.elasticsearch.tribe; package org.elasticsearch.tribe;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
@ -37,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -208,142 +209,180 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
} }
} }
class TribeClusterStateListener implements ClusterStateListener {
class TribeClusterStateListener implements ClusterStateListener {
private final String tribeName; private final String tribeName;
private final TribeNodeClusterStateTaskExecutor executor;
TribeClusterStateListener(Node tribeNode) { TribeClusterStateListener(Node tribeNode) {
this.tribeName = tribeNode.settings().get(TRIBE_NAME); String tribeName = tribeNode.settings().get(TRIBE_NAME);
this.tribeName = tribeName;
executor = new TribeNodeClusterStateTaskExecutor(tribeName);
} }
@Override @Override
public void clusterChanged(final ClusterChangedEvent event) { public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source()); logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask(
@Override "cluster event from " + tribeName + ", " + event.source(),
public boolean runOnlyOnMaster() { event,
return false; ClusterStateTaskConfig.build(Priority.NORMAL),
executor,
(source, t) -> logger.warn("failed to process [{}]", t, source));
}
}
class TribeNodeClusterStateTaskExecutor implements ClusterStateTaskExecutor<ClusterChangedEvent> {
private final String tribeName;
TribeNodeClusterStateTaskExecutor(String tribeName) {
this.tribeName = tribeName;
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public BatchResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
ClusterState accumulator = ClusterState.builder(currentState).build();
BatchResult.Builder<ClusterChangedEvent> builder = BatchResult.builder();
try {
// we only need to apply the latest cluster state update
accumulator = applyUpdate(accumulator, tasks.get(tasks.size() - 1));
builder.successes(tasks);
} catch (Throwable t) {
builder.failures(tasks, t);
}
return builder.build(accumulator);
}
private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent task) {
boolean clusterStateChanged = false;
ClusterState tribeState = task.state();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
// -- merge nodes
// go over existing nodes, and see if they need to be removed
for (DiscoveryNode discoNode : currentState.nodes()) {
String markedTribeName = discoNode.attributes().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
if (tribeState.nodes().get(discoNode.id()) == null) {
clusterStateChanged = true;
logger.info("[{}] removing node [{}]", tribeName, discoNode);
nodes.remove(discoNode.id());
}
} }
}
// go over tribe nodes, and see if they need to be added
for (DiscoveryNode tribe : tribeState.nodes()) {
if (currentState.nodes().get(tribe.id()) == null) {
// a new node, add it, but also add the tribe name to the attributes
Map<String, String> tribeAttr = new HashMap<>();
for (ObjectObjectCursor<String, String> attr : tribe.attributes()) {
tribeAttr.put(attr.key, attr.value);
}
tribeAttr.put(TRIBE_NAME, tribeName);
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
clusterStateChanged = true;
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.put(discoNode);
}
}
@Override // -- merge metadata
public ClusterState execute(ClusterState currentState) throws Exception { ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
ClusterState tribeState = event.state(); MetaData.Builder metaData = MetaData.builder(currentState.metaData());
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
// -- merge nodes // go over existing indices, and see if they need to be removed
// go over existing nodes, and see if they need to be removed for (IndexMetaData index : currentState.metaData()) {
for (DiscoveryNode discoNode : currentState.nodes()) { String markedTribeName = index.getSettings().get(TRIBE_NAME);
String markedTribeName = discoNode.attributes().get(TRIBE_NAME); if (markedTribeName != null && markedTribeName.equals(tribeName)) {
if (markedTribeName != null && markedTribeName.equals(tribeName)) { IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex());
if (tribeState.nodes().get(discoNode.id()) == null) { clusterStateChanged = true;
logger.info("[{}] removing node [{}]", tribeName, discoNode); if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) {
nodes.remove(discoNode.id()); logger.info("[{}] removing index [{}]", tribeName, index.getIndex());
} removeIndex(blocks, metaData, routingTable, index);
} } else {
// always make sure to update the metadata and routing table, in case
// there are changes in them (new mapping, shards moving from initializing to started)
routingTable.add(tribeState.routingTable().index(index.getIndex()));
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
} }
// go over tribe nodes, and see if they need to be added }
for (DiscoveryNode tribe : tribeState.nodes()) { }
if (currentState.nodes().get(tribe.id()) == null) { // go over tribe one, and see if they need to be added
// a new node, add it, but also add the tribe name to the attributes for (IndexMetaData tribeIndex : tribeState.metaData()) {
Map<String, String> tribeAttr = new HashMap<>(); // if there is no routing table yet, do nothing with it...
for (ObjectObjectCursor<String, String> attr : tribe.attributes()) { IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex());
tribeAttr.put(attr.key, attr.value); if (table == null) {
} continue;
tribeAttr.put(TRIBE_NAME, tribeName); }
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version()); final IndexMetaData indexMetaData = currentState.metaData().index(tribeIndex.getIndex());
logger.info("[{}] adding node [{}]", tribeName, discoNode); if (indexMetaData == null) {
nodes.put(discoNode); if (!droppedIndices.contains(tribeIndex.getIndex())) {
} // a new index, add it, and add the tribe name as a setting
clusterStateChanged = true;
logger.info("[{}] adding index [{}]", tribeName, tribeIndex.getIndex());
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
} }
} else {
// -- merge metadata String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME);
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); if (!tribeName.equals(existingFromTribe)) {
MetaData.Builder metaData = MetaData.builder(currentState.metaData()); // we have a potential conflict on index names, decide what to do...
RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); if (ON_CONFLICT_ANY.equals(onConflict)) {
// go over existing indices, and see if they need to be removed // we chose any tribe, carry on
for (IndexMetaData index : currentState.metaData()) { } else if (ON_CONFLICT_DROP.equals(onConflict)) {
String markedTribeName = index.getSettings().get(TRIBE_NAME); // drop the indices, there is a conflict
if (markedTribeName != null && markedTribeName.equals(tribeName)) { clusterStateChanged = true;
IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex()); logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) { removeIndex(blocks, metaData, routingTable, tribeIndex);
logger.info("[{}] removing index [{}]", tribeName, index.getIndex()); droppedIndices.add(tribeIndex.getIndex());
removeIndex(blocks, metaData, routingTable, index); } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
} else { // on conflict, prefer a tribe...
// always make sure to update the metadata and routing table, in case String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length());
// there are changes in them (new mapping, shards moving from initializing to started) if (tribeName.equals(preferredTribeName)) {
routingTable.add(tribeState.routingTable().index(index.getIndex())); // the new one is hte preferred one, replace...
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); clusterStateChanged = true;
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
} removeIndex(blocks, metaData, routingTable, tribeIndex);
}
}
// go over tribe one, and see if they need to be added
for (IndexMetaData tribeIndex : tribeState.metaData()) {
// if there is no routing table yet, do nothing with it...
IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex());
if (table == null) {
continue;
}
final IndexMetaData indexMetaData = currentState.metaData().index(tribeIndex.getIndex());
if (indexMetaData == null) {
if (!droppedIndices.contains(tribeIndex.getIndex())) {
// a new index, add it, and add the tribe name as a setting
logger.info("[{}] adding index [{}]", tribeName, tribeIndex.getIndex());
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
} } // else: either the existing one is the preferred one, or we haven't seen one, carry on
} else {
String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME);
if (!tribeName.equals(existingFromTribe)) {
// we have a potential conflict on index names, decide what to do...
if (ON_CONFLICT_ANY.equals(onConflict)) {
// we chose any tribe, carry on
} else if (ON_CONFLICT_DROP.equals(onConflict)) {
// drop the indices, there is a conflict
logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
removeIndex(blocks, metaData, routingTable, tribeIndex);
droppedIndices.add(tribeIndex.getIndex());
} else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
// on conflict, prefer a tribe...
String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length());
if (tribeName.equals(preferredTribeName)) {
// the new one is hte preferred one, replace...
logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
removeIndex(blocks, metaData, routingTable, tribeIndex);
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
} // else: either the existing one is the preferred one, or we haven't seen one, carry on
}
}
} }
} }
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
} }
}
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) { if (!clusterStateChanged) {
metaData.remove(index.getIndex()); return currentState;
routingTable.remove(index.getIndex()); } else {
blocks.removeIndexBlocks(index.getIndex()); return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
} }
}
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); metaData.remove(index.getIndex());
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); routingTable.remove(index.getIndex());
routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex())); blocks.removeIndexBlocks(index.getIndex());
if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex())) { }
blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_METADATA_BLOCK);
}
if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex())) {
blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_READ_BLOCK);
}
if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex())) {
blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_WRITE_BLOCK);
}
}
@Override private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
public void onFailure(String source, Throwable t) { Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
logger.warn("failed to process [{}]", t, source); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
} routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));
}); if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex())) {
blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_METADATA_BLOCK);
}
if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex())) {
blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_READ_BLOCK);
}
if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex())) {
blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_WRITE_BLOCK);
}
} }
} }
} }