diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 87da13fad4a..62647ad3829 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -20,14 +20,14 @@ package org.elasticsearch.tribe; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -208,142 +209,180 @@ public class TribeService extends AbstractLifecycleComponent { } } - class TribeClusterStateListener implements ClusterStateListener { + class TribeClusterStateListener implements ClusterStateListener { private final String tribeName; + private final TribeNodeClusterStateTaskExecutor executor; TribeClusterStateListener(Node tribeNode) { - this.tribeName = tribeNode.settings().get(TRIBE_NAME); + String tribeName = tribeNode.settings().get(TRIBE_NAME); + this.tribeName = tribeName; + executor = new TribeNodeClusterStateTaskExecutor(tribeName); } @Override public void clusterChanged(final ClusterChangedEvent event) { logger.debug("[{}] received cluster event, [{}]", tribeName, event.source()); - clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; + clusterService.submitStateUpdateTask( + "cluster event from " + tribeName + ", " + event.source(), + event, + ClusterStateTaskConfig.build(Priority.NORMAL), + executor, + (source, t) -> logger.warn("failed to process [{}]", t, source)); + } + } + + class TribeNodeClusterStateTaskExecutor implements ClusterStateTaskExecutor { + private final String tribeName; + + TribeNodeClusterStateTaskExecutor(String tribeName) { + this.tribeName = tribeName; + } + + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterState accumulator = ClusterState.builder(currentState).build(); + BatchResult.Builder builder = BatchResult.builder(); + + try { + // we only need to apply the latest cluster state update + accumulator = applyUpdate(accumulator, tasks.get(tasks.size() - 1)); + builder.successes(tasks); + } catch (Throwable t) { + builder.failures(tasks, t); + } + + return builder.build(accumulator); + } + + private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent task) { + boolean clusterStateChanged = false; + ClusterState tribeState = task.state(); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); + // -- merge nodes + // go over existing nodes, and see if they need to be removed + for (DiscoveryNode discoNode : currentState.nodes()) { + String markedTribeName = discoNode.attributes().get(TRIBE_NAME); + if (markedTribeName != null && markedTribeName.equals(tribeName)) { + if (tribeState.nodes().get(discoNode.id()) == null) { + clusterStateChanged = true; + logger.info("[{}] removing node [{}]", tribeName, discoNode); + nodes.remove(discoNode.id()); + } } + } + // go over tribe nodes, and see if they need to be added + for (DiscoveryNode tribe : tribeState.nodes()) { + if (currentState.nodes().get(tribe.id()) == null) { + // a new node, add it, but also add the tribe name to the attributes + Map tribeAttr = new HashMap<>(); + for (ObjectObjectCursor attr : tribe.attributes()) { + tribeAttr.put(attr.key, attr.value); + } + tribeAttr.put(TRIBE_NAME, tribeName); + DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version()); + clusterStateChanged = true; + logger.info("[{}] adding node [{}]", tribeName, discoNode); + nodes.put(discoNode); + } + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState tribeState = event.state(); - DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); - // -- merge nodes - // go over existing nodes, and see if they need to be removed - for (DiscoveryNode discoNode : currentState.nodes()) { - String markedTribeName = discoNode.attributes().get(TRIBE_NAME); - if (markedTribeName != null && markedTribeName.equals(tribeName)) { - if (tribeState.nodes().get(discoNode.id()) == null) { - logger.info("[{}] removing node [{}]", tribeName, discoNode); - nodes.remove(discoNode.id()); - } - } + // -- merge metadata + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + MetaData.Builder metaData = MetaData.builder(currentState.metaData()); + RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + // go over existing indices, and see if they need to be removed + for (IndexMetaData index : currentState.metaData()) { + String markedTribeName = index.getSettings().get(TRIBE_NAME); + if (markedTribeName != null && markedTribeName.equals(tribeName)) { + IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex()); + clusterStateChanged = true; + if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) { + logger.info("[{}] removing index [{}]", tribeName, index.getIndex()); + removeIndex(blocks, metaData, routingTable, index); + } else { + // always make sure to update the metadata and routing table, in case + // there are changes in them (new mapping, shards moving from initializing to started) + routingTable.add(tribeState.routingTable().index(index.getIndex())); + Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); + metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); } - // go over tribe nodes, and see if they need to be added - for (DiscoveryNode tribe : tribeState.nodes()) { - if (currentState.nodes().get(tribe.id()) == null) { - // a new node, add it, but also add the tribe name to the attributes - Map tribeAttr = new HashMap<>(); - for (ObjectObjectCursor attr : tribe.attributes()) { - tribeAttr.put(attr.key, attr.value); - } - tribeAttr.put(TRIBE_NAME, tribeName); - DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version()); - logger.info("[{}] adding node [{}]", tribeName, discoNode); - nodes.put(discoNode); - } + } + } + // go over tribe one, and see if they need to be added + for (IndexMetaData tribeIndex : tribeState.metaData()) { + // if there is no routing table yet, do nothing with it... + IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex()); + if (table == null) { + continue; + } + final IndexMetaData indexMetaData = currentState.metaData().index(tribeIndex.getIndex()); + if (indexMetaData == null) { + if (!droppedIndices.contains(tribeIndex.getIndex())) { + // a new index, add it, and add the tribe name as a setting + clusterStateChanged = true; + logger.info("[{}] adding index [{}]", tribeName, tribeIndex.getIndex()); + addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); } - - // -- merge metadata - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - MetaData.Builder metaData = MetaData.builder(currentState.metaData()); - RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); - // go over existing indices, and see if they need to be removed - for (IndexMetaData index : currentState.metaData()) { - String markedTribeName = index.getSettings().get(TRIBE_NAME); - if (markedTribeName != null && markedTribeName.equals(tribeName)) { - IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex()); - if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) { - logger.info("[{}] removing index [{}]", tribeName, index.getIndex()); - removeIndex(blocks, metaData, routingTable, index); - } else { - // always make sure to update the metadata and routing table, in case - // there are changes in them (new mapping, shards moving from initializing to started) - routingTable.add(tribeState.routingTable().index(index.getIndex())); - Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); - metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); - } - } - } - // go over tribe one, and see if they need to be added - for (IndexMetaData tribeIndex : tribeState.metaData()) { - // if there is no routing table yet, do nothing with it... - IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex()); - if (table == null) { - continue; - } - final IndexMetaData indexMetaData = currentState.metaData().index(tribeIndex.getIndex()); - if (indexMetaData == null) { - if (!droppedIndices.contains(tribeIndex.getIndex())) { - // a new index, add it, and add the tribe name as a setting - logger.info("[{}] adding index [{}]", tribeName, tribeIndex.getIndex()); + } else { + String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME); + if (!tribeName.equals(existingFromTribe)) { + // we have a potential conflict on index names, decide what to do... + if (ON_CONFLICT_ANY.equals(onConflict)) { + // we chose any tribe, carry on + } else if (ON_CONFLICT_DROP.equals(onConflict)) { + // drop the indices, there is a conflict + clusterStateChanged = true; + logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe); + removeIndex(blocks, metaData, routingTable, tribeIndex); + droppedIndices.add(tribeIndex.getIndex()); + } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) { + // on conflict, prefer a tribe... + String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length()); + if (tribeName.equals(preferredTribeName)) { + // the new one is hte preferred one, replace... + clusterStateChanged = true; + logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe); + removeIndex(blocks, metaData, routingTable, tribeIndex); addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); - } - } else { - String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME); - if (!tribeName.equals(existingFromTribe)) { - // we have a potential conflict on index names, decide what to do... - if (ON_CONFLICT_ANY.equals(onConflict)) { - // we chose any tribe, carry on - } else if (ON_CONFLICT_DROP.equals(onConflict)) { - // drop the indices, there is a conflict - logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe); - removeIndex(blocks, metaData, routingTable, tribeIndex); - droppedIndices.add(tribeIndex.getIndex()); - } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) { - // on conflict, prefer a tribe... - String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length()); - if (tribeName.equals(preferredTribeName)) { - // the new one is hte preferred one, replace... - logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe); - removeIndex(blocks, metaData, routingTable, tribeIndex); - addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); - } // else: either the existing one is the preferred one, or we haven't seen one, carry on - } - } + } // else: either the existing one is the preferred one, or we haven't seen one, carry on } } - - return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build(); } + } - private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) { - metaData.remove(index.getIndex()); - routingTable.remove(index.getIndex()); - blocks.removeIndexBlocks(index.getIndex()); - } + if (!clusterStateChanged) { + return currentState; + } else { + return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build(); + } + } - private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { - Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); - metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); - routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex())); - if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex())) { - blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_METADATA_BLOCK); - } - if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex())) { - blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_READ_BLOCK); - } - if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex())) { - blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_WRITE_BLOCK); - } - } + private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) { + metaData.remove(index.getIndex()); + routingTable.remove(index.getIndex()); + blocks.removeIndexBlocks(index.getIndex()); + } - @Override - public void onFailure(String source, Throwable t) { - logger.warn("failed to process [{}]", t, source); - } - }); + private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { + Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); + metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); + routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex())); + if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex())) { + blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_METADATA_BLOCK); + } + if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex())) { + blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_READ_BLOCK); + } + if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex())) { + blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_WRITE_BLOCK); + } } } }