diff --git a/docs/reference/modules/tribe.asciidoc b/docs/reference/modules/tribe.asciidoc index fb998bf6b69..9f075685eea 100644 --- a/docs/reference/modules/tribe.asciidoc +++ b/docs/reference/modules/tribe.asciidoc @@ -36,7 +36,7 @@ indexing, etc. However, there are a few exceptions: * The merged view cannot handle indices with the same name in multiple - clusters. It will pick one of them and discard the other. + clusters. By default it will pick one of them, see later for on_conflict options. * Master level read operations (eg <>, <>) will automatically execute with a local flag set to true since there is @@ -56,3 +56,21 @@ tribe: metadata: true -------------------------------- +coming[1.2.0] + +The tribe node can also configure blocks on indices explicitly: + +[source,yaml] +-------------------------------- +tribe: + blocks: + indices.write: hk*,ldn* +-------------------------------- + +coming[1.2.0] + +When there is a conflict and multiple clusters hold the same index, by default +the tribe node will pick one of them. This can be configured using the `tribe.on_conflict` +setting. It defaults to `any`, but can be set to `drop` (drop indices that have +a conflict), or `prefer_[tribeName]` to prefer the index from a specific tribe. + diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java index c3dc23efa04..1cdde802085 100644 --- a/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -22,23 +22,28 @@ package org.elasticsearch.tribe; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.NodeBuilder; @@ -47,6 +52,7 @@ import org.elasticsearch.rest.RestStatus; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; /** @@ -104,6 +110,13 @@ public class TribeService extends AbstractLifecycleComponent { public static final String TRIBE_NAME = "tribe.name"; private final ClusterService clusterService; + private final String[] blockIndicesWrite; + private final String[] blockIndicesRead; + private final String[] blockIndicesMetadata; + + private static final String ON_CONFLICT_ANY = "any", ON_CONFLICT_DROP = "drop", ON_CONFLICT_PREFER = "prefer_"; + private final String onConflict; + private final Set droppedIndices = ConcurrentCollections.newConcurrentSet(); private final List nodes = Lists.newCopyOnWriteArrayList(); @@ -113,6 +126,7 @@ public class TribeService extends AbstractLifecycleComponent { this.clusterService = clusterService; Map nodesSettings = Maps.newHashMap(settings.getGroups("tribe", true)); nodesSettings.remove("blocks"); // remove prefix settings that don't indicate a client + nodesSettings.remove("on_conflict"); // remove prefix settings that don't indicate a client for (Map.Entry entry : nodesSettings.entrySet()) { ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue()); sb.put("node.name", settings.get("name") + "/" + entry.getKey()); @@ -123,6 +137,9 @@ public class TribeService extends AbstractLifecycleComponent { nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build()); } + String[] blockIndicesWrite = Strings.EMPTY_ARRAY; + String[] blockIndicesRead = Strings.EMPTY_ARRAY; + String[] blockIndicesMetadata = Strings.EMPTY_ARRAY; if (!nodes.isEmpty()) { // remove the initial election / recovery blocks since we are not going to have a // master elected in this single tribe node local "cluster" @@ -131,13 +148,21 @@ public class TribeService extends AbstractLifecycleComponent { if (settings.getAsBoolean("tribe.blocks.write", false)) { clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK); } + blockIndicesWrite = settings.getAsArray("tribe.blocks.write.indices", Strings.EMPTY_ARRAY); if (settings.getAsBoolean("tribe.blocks.metadata", false)) { clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK); } + blockIndicesMetadata = settings.getAsArray("tribe.blocks.metadata.indices", Strings.EMPTY_ARRAY); + blockIndicesRead = settings.getAsArray("tribe.blocks.read.indices", Strings.EMPTY_ARRAY); for (InternalNode node : nodes) { node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); } } + this.blockIndicesMetadata = blockIndicesMetadata; + this.blockIndicesRead = blockIndicesRead; + this.blockIndicesWrite = blockIndicesWrite; + + this.onConflict = settings.get("tribe.on_conflict", ON_CONFLICT_ANY); } @Override @@ -255,6 +280,7 @@ public class TribeService extends AbstractLifecycleComponent { } // -- merge metadata + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); MetaData.Builder metaData = MetaData.builder(currentState.metaData()); RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); // go over existing indices, and see if they need to be removed @@ -264,8 +290,7 @@ public class TribeService extends AbstractLifecycleComponent { IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); if (tribeIndex == null) { logger.info("[{}] removing index [{}]", tribeName, index.index()); - metaData.remove(index.index()); - routingTable.remove(index.index()); + removeIndex(blocks, metaData, routingTable, index); } else { // always make sure to update the metadata and routing table, in case // there are changes in them (new mapping, shards moving from initializing to started) @@ -277,16 +302,62 @@ public class TribeService extends AbstractLifecycleComponent { } // go over tribe one, and see if they need to be added for (IndexMetaData tribeIndex : tribeState.metaData()) { - if (!currentState.metaData().hasIndex(tribeIndex.index())) { + // if there is no routing table yet, do nothing with it... + IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.index()); + if (table == null) { + continue; + } + if (!currentState.metaData().hasIndex(tribeIndex.index()) && !droppedIndices.contains(tribeIndex.index())) { // a new index, add it, and add the tribe name as a setting logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index()); - Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); - metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); - routingTable.add(tribeState.routingTable().index(tribeIndex.index())); + addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); + } else { + String existingFromTribe = currentState.metaData().index(tribeIndex.index()).getSettings().get(TRIBE_NAME); + if (!tribeName.equals(existingFromTribe)) { + // we have a potential conflict on index names, decide what to do... + if (ON_CONFLICT_ANY.equals(onConflict)) { + // we chose any tribe, carry on + } else if (ON_CONFLICT_DROP.equals(onConflict)) { + // drop the indices, there is a conflict + logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.index(), existingFromTribe); + removeIndex(blocks, metaData, routingTable, tribeIndex); + droppedIndices.add(tribeIndex.index()); + } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) { + // on conflict, prefer a tribe... + String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length()); + if (tribeName.equals(preferredTribeName)) { + // the new one is hte preferred one, replace... + logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.index(), existingFromTribe); + removeIndex(blocks, metaData, routingTable, tribeIndex); + addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); + } // else: either the existing one is the preferred one, or we haven't seen one, carry on + } + } } } - return ClusterState.builder(currentState).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + return ClusterState.builder(currentState).blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + } + + private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) { + metaData.remove(index.index()); + routingTable.remove(index.index()); + blocks.removeIndexBlocks(index.index()); + } + + private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { + Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); + metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); + routingTable.add(tribeState.routingTable().index(tribeIndex.index())); + if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.index())) { + blocks.addIndexBlock(tribeIndex.index(), IndexMetaData.INDEX_METADATA_BLOCK); + } + if (Regex.simpleMatch(blockIndicesRead, tribeIndex.index())) { + blocks.addIndexBlock(tribeIndex.index(), IndexMetaData.INDEX_READ_BLOCK); + } + if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.index())) { + blocks.addIndexBlock(tribeIndex.index(), IndexMetaData.INDEX_WRITE_BLOCK); + } } @Override diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java index a1c3c44fa12..f01fde398fe 100644 --- a/src/test/java/org/elasticsearch/tribe/TribeTests.java +++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java @@ -23,8 +23,10 @@ import com.google.common.base.Predicate; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.MasterNotDiscoveredException; @@ -33,9 +35,12 @@ import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.TestCluster; import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import java.util.concurrent.TimeUnit; + import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -45,39 +50,180 @@ import static org.hamcrest.Matchers.equalTo; */ public class TribeTests extends ElasticsearchIntegrationTest { - private TestCluster cluster2; + private static TestCluster cluster2; + private Node tribeNode; private Client tribeClient; - @Before - public void setupSecondCluster() { + @BeforeClass + public static void setupSecondCluster() throws Exception { + ElasticsearchIntegrationTest.beforeClass(); // create another cluster - cluster2 = new TestCluster(randomLong(), 2, 2, cluster().getClusterName() + "-2"); - cluster2.beforeTest(getRandom(), getPerTestTransportClientRatio()); + cluster2 = new TestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom())); + cluster2.beforeTest(getRandom(), 0.1); cluster2.ensureAtLeastNumNodes(2); + } - Settings settings = ImmutableSettings.builder() + @AfterClass + public static void tearDownSecondCluster() { + if (cluster2 != null) { + cluster2.afterTest(); + cluster2.close(); + } + } + + @After + public void tearDownTribeNode() { + if (cluster2 != null) { + cluster2.client().admin().indices().prepareDelete("_all").execute().actionGet(); + } + if (tribeNode != null) { + tribeNode.close(); + } + } + + private void setupTribeNode(Settings settings) { + Settings merged = ImmutableSettings.builder() .put("tribe.t1.cluster.name", cluster().getClusterName()) .put("tribe.t2.cluster.name", cluster2.getClusterName()) .put("tribe.blocks.write", false) .put("tribe.blocks.read", false) + .put(settings) .build(); tribeNode = NodeBuilder.nodeBuilder() - .settings(settings) + .settings(merged) .node(); tribeClient = tribeNode.client(); } - @After - public void tearDownSecondCluster() { - tribeNode.close(); - cluster2.afterTest(); - cluster2.close(); + @Test + public void testGlobalReadWriteBlocks() throws Exception { + logger.info("create 2 indices, test1 on t1, and test2 on t2"); + cluster().client().admin().indices().prepareCreate("test1").get(); + cluster2.client().admin().indices().prepareCreate("test2").get(); + + + setupTribeNode(ImmutableSettings.builder() + .put("tribe.blocks.write", true) + .put("tribe.blocks.metadata", true) + .build()); + + logger.info("wait till tribe has the same nodes as the 2 clusters"); + awaitSameNodeCounts(); + // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state + logger.info("wait till test1 and test2 exists in the tribe node state"); + awaitIndicesInClusterState("test1", "test2"); + + try { + tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").execute().actionGet(); + fail("cluster block should be thrown"); + } catch (ClusterBlockException e) { + // all is well! + } + try { + tribeClient.admin().indices().prepareOptimize("test1").execute().actionGet(); + fail("cluster block should be thrown"); + } catch (ClusterBlockException e) { + // all is well! + } + try { + tribeClient.admin().indices().prepareOptimize("test2").execute().actionGet(); + fail("cluster block should be thrown"); + } catch (ClusterBlockException e) { + // all is well! + } + } + + @Test + public void testIndexWriteBlocks() throws Exception { + logger.info("create 2 indices, test1 on t1, and test2 on t2"); + cluster().client().admin().indices().prepareCreate("test1").get(); + cluster().client().admin().indices().prepareCreate("block_test1").get(); + cluster2.client().admin().indices().prepareCreate("test2").get(); + cluster2.client().admin().indices().prepareCreate("block_test2").get(); + + setupTribeNode(ImmutableSettings.builder() + .put("tribe.blocks.write.indices", "block_*") + .build()); + logger.info("wait till tribe has the same nodes as the 2 clusters"); + awaitSameNodeCounts(); + // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state + logger.info("wait till test1 and test2 exists in the tribe node state"); + awaitIndicesInClusterState("test1", "test2", "block_test1", "block_test2"); + + tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get(); + try { + tribeClient.prepareIndex("block_test1", "type1", "1").setSource("field1", "value1").get(); + fail("cluster block should be thrown"); + } catch (ClusterBlockException e) { + // all is well! + } + + tribeClient.prepareIndex("test2", "type1", "1").setSource("field1", "value1").get(); + try { + tribeClient.prepareIndex("block_test2", "type1", "1").setSource("field1", "value1").get(); + fail("cluster block should be thrown"); + } catch (ClusterBlockException e) { + // all is well! + } + } + + @Test + public void testOnConflictDrop() throws Exception { + logger.info("create 2 indices, test1 on t1, and test2 on t2"); + cluster().client().admin().indices().prepareCreate("conflict").get(); + cluster2.client().admin().indices().prepareCreate("conflict").get(); + cluster().client().admin().indices().prepareCreate("test1").get(); + cluster2.client().admin().indices().prepareCreate("test2").get(); + + setupTribeNode(ImmutableSettings.builder() + .put("tribe.on_conflict", "drop") + .build()); + + logger.info("wait till tribe has the same nodes as the 2 clusters"); + awaitSameNodeCounts(); + + // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state + logger.info("wait till test1 and test2 exists in the tribe node state"); + awaitIndicesInClusterState("test1", "test2"); + + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().hasIndex("conflict"), equalTo(false)); + } + + @Test + public void testOnConflictPrefer() throws Exception { + testOnConflictPrefer(randomBoolean() ? "t1" : "t2"); + } + + private void testOnConflictPrefer(String tribe) throws Exception { + logger.info("testing preference for tribe {}", tribe); + + logger.info("create 2 indices, test1 on t1, and test2 on t2"); + cluster().client().admin().indices().prepareCreate("conflict").get(); + cluster2.client().admin().indices().prepareCreate("conflict").get(); + cluster().client().admin().indices().prepareCreate("test1").get(); + cluster2.client().admin().indices().prepareCreate("test2").get(); + + setupTribeNode(ImmutableSettings.builder() + .put("tribe.on_conflict", "prefer_" + tribe) + .build()); + logger.info("wait till tribe has the same nodes as the 2 clusters"); + awaitSameNodeCounts(); + // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state + logger.info("wait till test1 and test2 exists in the tribe node state"); + awaitIndicesInClusterState("test1", "test2", "conflict"); + + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get(TribeService.TRIBE_NAME), equalTo(tribe)); } @Test public void testTribeOnOneCluster() throws Exception { + setupTribeNode(ImmutableSettings.EMPTY); logger.info("create 2 indices, test1 on t1, and test2 on t2"); cluster().client().admin().indices().prepareCreate("test1").get(); cluster2.client().admin().indices().prepareCreate("test2").get(); @@ -85,14 +231,7 @@ public class TribeTests extends ElasticsearchIntegrationTest { // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state logger.info("wait till test1 and test2 exists in the tribe node state"); - awaitBusy(new Predicate() { - @Override - public boolean apply(Object o) { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - return tribeState.getMetaData().hasIndex("test1") && tribeState.getMetaData().hasIndex("test2") && - tribeState.getRoutingTable().hasIndex("test1") && tribeState.getRoutingTable().hasIndex("test2"); - } - }); + awaitIndicesInClusterState("test1", "test2"); logger.info("wait till tribe has the same nodes as the 2 clusters"); awaitSameNodeCounts(); @@ -159,6 +298,24 @@ public class TribeTests extends ElasticsearchIntegrationTest { awaitSameNodeCounts(); } + private void awaitIndicesInClusterState(final String... indices) throws Exception { + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + if (!tribeState.getMetaData().hasIndex(index)) { + return false; + } + if (!tribeState.getRoutingTable().hasIndex(index)) { + return false; + } + } + return true; + } + }); + } + private void awaitSameNodeCounts() throws Exception { awaitBusy(new Predicate() { @Override