diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index e3164eacdbb..701656db9ce 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -20,17 +20,21 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** @@ -143,6 +147,33 @@ public class ClusterChangedEvent { return state.metaData() != previousState.metaData(); } + /** + * Returns a set of custom meta data types when any custom metadata for the cluster has changed + * between the previous cluster state and the new cluster state. custom meta data types are + * returned iff they have been added, updated or removed between the previous and the current state + */ + public Set changedCustomMetaDataSet() { + Set result = new HashSet<>(); + ImmutableOpenMap currentCustoms = state.metaData().customs(); + ImmutableOpenMap previousCustoms = previousState.metaData().customs(); + if (currentCustoms.equals(previousCustoms) == false) { + for (ObjectObjectCursor currentCustomMetaData : currentCustoms) { + // new custom md added or existing custom md changed + if (previousCustoms.containsKey(currentCustomMetaData.key) == false + || currentCustomMetaData.value.equals(previousCustoms.get(currentCustomMetaData.key)) == false) { + result.add(currentCustomMetaData.key); + } + } + // existing custom md deleted + for (ObjectObjectCursor previousCustomMetaData : previousCustoms) { + if (currentCustoms.containsKey(previousCustomMetaData.key) == false) { + result.add(previousCustomMetaData.key); + } + } + } + return result; + } + /** * Returns true iff the {@link IndexMetaData} for a given index * has changed between the previous cluster state and the new cluster state. diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 69ad77fc91e..06d0fffd75e 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -58,7 +58,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.TransportSettings; @@ -72,6 +71,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableMap; @@ -134,6 +134,29 @@ public class TribeService extends AbstractLifecycleComponent { return sb.build(); } + /** + * Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom} in tribe node + * When multiple Mergable Custom metadata of the same type is found (from underlying clusters), the + * Custom metadata will be merged using {@link #merge(MetaData.Custom)} and the result will be stored + * in the tribe cluster state + * + * @param type of custom meta data + */ + interface MergableCustomMetaData { + + /** + * Merges this custom metadata with other, returning either this or other custom metadata + * for tribe cluster state. This method should not mutate either this or the + * other custom metadata. + * + * @param other custom meta data + * @return the same instance or other custom metadata based on implementation + * if both the instances are considered equal, implementations should return this + * instance to avoid redundant cluster state changes. + */ + T merge(T other); + } + // internal settings only public static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", Property.NodeScope); private final ClusterService clusterService; @@ -270,7 +293,7 @@ public class TribeService extends AbstractLifecycleComponent { public void startNodes() { for (Node node : nodes) { try { - node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); + getClusterService(node).add(new TribeClusterStateListener(node)); node.start(); } catch (Exception e) { // calling close is safe for non started nodes, we can just iterate over all @@ -348,23 +371,19 @@ public class TribeService extends AbstractLifecycleComponent { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterState accumulator = ClusterState.builder(currentState).build(); BatchResult.Builder builder = BatchResult.builder(); - - try { - // we only need to apply the latest cluster state update - accumulator = applyUpdate(accumulator, tasks.get(tasks.size() - 1)); - builder.successes(tasks); - } catch (Exception e) { - builder.failures(tasks, e); - } - - return builder.build(accumulator); + ClusterState.Builder newState = ClusterState.builder(currentState).incrementVersion(); + boolean clusterStateChanged = updateNodes(currentState, tasks, newState); + clusterStateChanged |= updateIndicesAndMetaData(currentState, tasks, newState); + builder.successes(tasks); + return builder.build(clusterStateChanged ? newState.build() : currentState); } - private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent task) { + private boolean updateNodes(ClusterState currentState, List tasks, ClusterState.Builder newState) { boolean clusterStateChanged = false; - ClusterState tribeState = task.state(); + // we only need to apply the latest cluster state update + ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); + ClusterState tribeState = latestTask.state(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); // -- merge nodes // go over existing nodes, and see if they need to be removed @@ -385,16 +404,25 @@ public class TribeService extends AbstractLifecycleComponent { Map tribeAttr = new HashMap<>(tribe.getAttributes()); tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName); DiscoveryNode discoNode = new DiscoveryNode(tribe.getName(), tribe.getId(), tribe.getEphemeralId(), - tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(), - tribe.getVersion()); + tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(), + tribe.getVersion()); clusterStateChanged = true; logger.info("[{}] adding node [{}]", tribeName, discoNode); nodes.remove(tribe.getId()); // remove any existing node with the same id but different ephemeral id nodes.add(discoNode); } } + if (clusterStateChanged) { + newState.nodes(nodes); + } + return clusterStateChanged; + } - // -- merge metadata + private boolean updateIndicesAndMetaData(ClusterState currentState, List tasks, ClusterState.Builder newState) { + // we only need to apply the latest cluster state update + ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); + ClusterState tribeState = latestTask.state(); + boolean clusterStateChanged = false; ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); MetaData.Builder metaData = MetaData.builder(currentState.metaData()); RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); @@ -462,13 +490,49 @@ public class TribeService extends AbstractLifecycleComponent { } } } - - if (!clusterStateChanged) { - return currentState; - } else { - return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData) - .routingTable(routingTable.build()).build(); + clusterStateChanged |= updateCustoms(currentState, tasks, metaData); + if (clusterStateChanged) { + newState.blocks(blocks); + newState.metaData(metaData); + newState.routingTable(routingTable.build()); } + return clusterStateChanged; + } + + private boolean updateCustoms(ClusterState currentState, List tasks, MetaData.Builder metaData) { + boolean clusterStateChanged = false; + Set changedCustomMetaDataTypeSet = tasks.stream() + .map(ClusterChangedEvent::changedCustomMetaDataSet) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + final List tribeClientNodes = TribeService.this.nodes; + Map mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet, + customMetaDataType -> tribeClientNodes.stream() + .map(TribeService::getClusterService).map(ClusterService::state) + .map(ClusterState::metaData) + .map(clusterMetaData -> ((MetaData.Custom) clusterMetaData.custom(customMetaDataType))) + .filter(custom1 -> custom1 != null && custom1 instanceof MergableCustomMetaData) + .map(custom2 -> (MergableCustomMetaData) custom2) + .collect(Collectors.toList()) + ); + for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) { + MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType); + if (mergedCustomMetaData == null) { + // we ignore merging custom md which doesn't implement MergableCustomMetaData interface + if (currentState.metaData().custom(changedCustomMetaDataType) instanceof MergableCustomMetaData) { + // custom md has been removed + clusterStateChanged = true; + logger.info("[{}] removing custom meta data type [{}]", tribeName, changedCustomMetaDataType); + metaData.removeCustom(changedCustomMetaDataType); + } + } else { + // custom md has been changed + clusterStateChanged = true; + logger.info("[{}] updating custom meta data type [{}] data [{}]", tribeName, changedCustomMetaDataType, mergedCustomMetaData); + metaData.putCustom(changedCustomMetaDataType, mergedCustomMetaData); + } + } + return clusterStateChanged; } private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, @@ -494,4 +558,23 @@ public class TribeService extends AbstractLifecycleComponent { } } } + + private static ClusterService getClusterService(Node node) { + return node.injector().getInstance(ClusterService.class); + } + + // pkg-private for testing + static Map mergeChangedCustomMetaData(Set changedCustomMetaDataTypeSet, + Function> customMetaDataByTribeNode) { + + Map changedCustomMetaDataMap = new HashMap<>(changedCustomMetaDataTypeSet.size()); + for (String customMetaDataType : changedCustomMetaDataTypeSet) { + customMetaDataByTribeNode.apply(customMetaDataType).stream() + .reduce((mergableCustomMD, mergableCustomMD2) -> + ((MergableCustomMetaData) mergableCustomMD.merge((MetaData.Custom) mergableCustomMD2))) + .ifPresent(mergedCustomMetaData -> + changedCustomMetaDataMap.put(customMetaDataType, ((MetaData.Custom) mergedCustomMetaData))); + } + return changedCustomMetaDataMap; + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index 939954c4560..63d34f683de 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -33,6 +34,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCustomMetaData; import java.util.ArrayList; import java.util.Arrays; @@ -222,6 +224,128 @@ public class ClusterChangedEventTests extends ESTestCase { assertTrue("index routing table should not be the same object", event.indexRoutingTableChanged(initialIndices.get(0).getName())); } + /** + * Test custom metadata change checks + */ + public void testChangedCustomMetaDataSet() { + final int numNodesInCluster = 3; + + final ClusterState originalState = createState(numNodesInCluster, randomBoolean(), initialIndices); + CustomMetaData1 customMetaData1 = new CustomMetaData1("data"); + final ClusterState stateWithCustomMetaData = nextState(originalState, Collections.singletonList(customMetaData1)); + + // no custom metadata present in any state + ClusterState nextState = ClusterState.builder(originalState).build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", originalState, nextState); + assertTrue(event.changedCustomMetaDataSet().isEmpty()); + + // next state has new custom metadata + nextState = nextState(originalState, Collections.singletonList(customMetaData1)); + event = new ClusterChangedEvent("_na_", originalState, nextState); + Set changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state has same custom metadata + nextState = nextState(originalState, Collections.singletonList(customMetaData1)); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.isEmpty()); + + // next state has equivalent custom metadata + nextState = nextState(originalState, Collections.singletonList(new CustomMetaData1("data"))); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.isEmpty()); + + // next state removes custom metadata + nextState = originalState; + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state updates custom metadata + nextState = nextState(stateWithCustomMetaData, Collections.singletonList(new CustomMetaData1("data1"))); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state adds new custom metadata type + CustomMetaData2 customMetaData2 = new CustomMetaData2("data2"); + nextState = nextState(stateWithCustomMetaData, Arrays.asList(customMetaData1, customMetaData2)); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type())); + + // next state adds two custom metadata type + nextState = nextState(originalState, Arrays.asList(customMetaData1, customMetaData2)); + event = new ClusterChangedEvent("_na_", originalState, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 2); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type())); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state removes two custom metadata type + nextState = originalState; + event = new ClusterChangedEvent("_na_", + nextState(originalState, Arrays.asList(customMetaData1, customMetaData2)), nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 2); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type())); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + } + + private static class CustomMetaData2 extends TestCustomMetaData { + static { + MetaData.registerPrototype("2", new CustomMetaData2("")); + } + protected CustomMetaData2(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData2(data); + } + + @Override + public String type() { + return "2"; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + + private static class CustomMetaData1 extends TestCustomMetaData { + static { + MetaData.registerPrototype("1", new CustomMetaData1("")); + } + protected CustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData1(data); + } + + @Override + public String type() { + return "1"; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + private static ClusterState createSimpleClusterState() { return ClusterState.builder(TEST_CLUSTER_NAME).build(); } @@ -244,6 +368,22 @@ public class ClusterChangedEventTests extends ESTestCase { .build(); } + private static ClusterState nextState(final ClusterState previousState, List customMetaDataList) { + final ClusterState.Builder builder = ClusterState.builder(previousState); + builder.stateUUID(UUIDs.randomBase64UUID()); + MetaData.Builder metaDataBuilder = new MetaData.Builder(previousState.metaData()); + for (ObjectObjectCursor customMetaData : previousState.metaData().customs()) { + if (customMetaData.value instanceof TestCustomMetaData) { + metaDataBuilder.removeCustom(customMetaData.key); + } + } + for (TestCustomMetaData testCustomMetaData : customMetaDataList) { + metaDataBuilder.putCustom(testCustomMetaData.type(), testCustomMetaData); + } + builder.metaData(metaDataBuilder); + return builder.build(); + } + // Create a modified cluster state from another one, but with some number of indices added and deleted. private static ClusterState nextState(final ClusterState previousState, final boolean changeClusterUUID, final List addedIndices, final List deletedIndices, final int numNodesToRemove) { diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index cf4fe03893f..179d977ea5d 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -22,12 +22,15 @@ package org.elasticsearch.tribe; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; @@ -41,7 +44,10 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; +import org.elasticsearch.test.TestCustomMetaData; import org.elasticsearch.transport.MockTcpTransportPlugin; +import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData1; +import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData2; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -52,9 +58,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -446,6 +455,132 @@ public class TribeIT extends ESIntegTestCase { } } + public void testMergingRemovedCustomMetaData() throws Exception { + MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE); + removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE); + MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a"); + MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b"); + try (Releasable tribeNode = startTribeNode()) { + assertNodes(ALL); + putCustomMetaData(cluster1, customMetaData1); + putCustomMetaData(cluster2, customMetaData2); + assertCustomMetaDataUpdated(internalCluster(), customMetaData2); + removeCustomMetaData(cluster2, customMetaData2.type()); + assertCustomMetaDataUpdated(internalCluster(), customMetaData1); + } + } + + public void testMergingCustomMetaData() throws Exception { + MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE); + removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE); + MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + List customMetaDatas = Arrays.asList(customMetaData1, customMetaData2); + Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); + final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0); + try (Releasable tribeNode = startTribeNode()) { + assertNodes(ALL); + putCustomMetaData(cluster1, customMetaData1); + assertCustomMetaDataUpdated(internalCluster(), customMetaData1); + putCustomMetaData(cluster2, customMetaData2); + assertCustomMetaDataUpdated(internalCluster(), tribeNodeCustomMetaData); + } + } + + public void testMergingMultipleCustomMetaData() throws Exception { + MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + MetaData.registerPrototype(MergableCustomMetaData2.TYPE, new MergableCustomMetaData2("")); + removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE); + removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE); + MergableCustomMetaData1 firstCustomMetaDataType1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + MergableCustomMetaData1 secondCustomMetaDataType1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + MergableCustomMetaData2 firstCustomMetaDataType2 = new MergableCustomMetaData2(randomAsciiOfLength(10)); + MergableCustomMetaData2 secondCustomMetaDataType2 = new MergableCustomMetaData2(randomAsciiOfLength(10)); + List mergedCustomMetaDataType1 = Arrays.asList(firstCustomMetaDataType1, secondCustomMetaDataType1); + List mergedCustomMetaDataType2 = Arrays.asList(firstCustomMetaDataType2, secondCustomMetaDataType2); + Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); + Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); + try (Releasable tribeNode = startTribeNode()) { + assertNodes(ALL); + // test putting multiple custom md types propagates to tribe + putCustomMetaData(cluster1, firstCustomMetaDataType1); + putCustomMetaData(cluster1, firstCustomMetaDataType2); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2); + + // test multiple same type custom md is merged and propagates to tribe + putCustomMetaData(cluster2, secondCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0)); + + // test multiple same type custom md is merged and propagates to tribe + putCustomMetaData(cluster2, secondCustomMetaDataType2); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0)); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0)); + + // test removing custom md is propagates to tribe + removeCustomMetaData(cluster2, secondCustomMetaDataType1.type()); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0)); + removeCustomMetaData(cluster2, secondCustomMetaDataType2.type()); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2); + } + } + + private static void assertCustomMetaDataUpdated(InternalTestCluster cluster, + TestCustomMetaData expectedCustomMetaData) throws Exception { + assertBusy(() -> { + ClusterState tribeState = cluster.getInstance(ClusterService.class, cluster.getNodeNames()[0]).state(); + MetaData.Custom custom = tribeState.metaData().custom(expectedCustomMetaData.type()); + assertNotNull(custom); + assertThat(custom, equalTo(expectedCustomMetaData)); + }); + } + + private void removeCustomMetaData(InternalTestCluster cluster, final String customMetaDataType) { + logger.info("removing custom_md type [{}] from [{}]", customMetaDataType, cluster.getClusterName()); + updateMetaData(cluster, builder -> builder.removeCustom(customMetaDataType)); + } + + private void putCustomMetaData(InternalTestCluster cluster, final TestCustomMetaData customMetaData) { + logger.info("putting custom_md type [{}] with data[{}] from [{}]", customMetaData.type(), + customMetaData.getData(), cluster.getClusterName()); + updateMetaData(cluster, builder -> builder.putCustom(customMetaData.type(), customMetaData)); + } + + private static void updateMetaData(InternalTestCluster cluster, UnaryOperator addCustoms) { + ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName()); + final CountDownLatch latch = new CountDownLatch(1); + clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + latch.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MetaData.Builder builder = MetaData.builder(currentState.metaData()); + builder = addCustoms.apply(builder); + return new ClusterState.Builder(currentState).metaData(builder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + fail("failed to apply cluster state from [" + source + "] with " + e.getMessage()); + } + }); + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + fail("latch waiting on publishing custom md interrupted [" + e.getMessage() + "]"); + } + assertThat("timed out trying to add custom metadata to " + cluster.getClusterName(), latch.getCount(), equalTo(0L)); + + } + private void assertIndicesExist(Client client, String... indices) throws Exception { assertBusy(() -> { ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState(); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java index 43ee8fee151..7aea02c552b 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java @@ -19,9 +19,22 @@ package org.elasticsearch.tribe; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCustomMetaData; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.instanceOf; public class TribeServiceTests extends ESTestCase { public void testMinimalSettings() { @@ -96,4 +109,128 @@ public class TribeServiceTests extends ESTestCase { assertEquals("7.7.7.7", clientSettings.get("transport.bind_host")); assertEquals("8.8.8.8", clientSettings.get("transport.publish_host")); } + + public void testMergeCustomMetaDataSimple() { + Map mergedCustoms = + TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE), + s -> Collections.singletonList(new MergableCustomMetaData1("data1"))); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertNotNull(mergedCustom); + assertEquals(mergedCustom.getData(), "data1"); + } + + public void testMergeCustomMetaData() { + Map mergedCustoms = + TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE), + s -> Arrays.asList(new MergableCustomMetaData1("data1"), new MergableCustomMetaData1("data2"))); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertNotNull(mergedCustom); + assertEquals(mergedCustom.getData(), "data2"); + } + + public void testMergeMultipleCustomMetaData() { + Map> inputMap = new HashMap<>(); + inputMap.put(MergableCustomMetaData1.TYPE, + Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11"))); + inputMap.put(MergableCustomMetaData2.TYPE, + Arrays.asList(new MergableCustomMetaData2("data21"), new MergableCustomMetaData2("data20"))); + Map mergedCustoms = TribeService.mergeChangedCustomMetaData( + Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertEquals(mergedCustom.getData(), "data11"); + mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class)); + assertEquals(mergedCustom.getData(), "data21"); + } + + public void testMergeCustomMetaDataFromMany() { + Map> inputMap = new HashMap<>(); + int n = randomIntBetween(3, 5); + List customList1 = new ArrayList<>(); + for (int i = 0; i <= n; i++) { + customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i))); + } + Collections.shuffle(customList1, random()); + inputMap.put(MergableCustomMetaData1.TYPE, customList1); + List customList2 = new ArrayList<>(); + for (int i = 0; i <= n; i++) { + customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i))); + } + Collections.shuffle(customList2, random()); + inputMap.put(MergableCustomMetaData2.TYPE, customList2); + + Map mergedCustoms = TribeService.mergeChangedCustomMetaData( + Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertEquals(mergedCustom.getData(), "data1"+String.valueOf(n)); + mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class)); + assertEquals(mergedCustom.getData(), "data2"+String.valueOf(n)); + } + + static class MergableCustomMetaData1 extends TestCustomMetaData + implements TribeService.MergableCustomMetaData { + public static final String TYPE = "custom_md_1"; + + protected MergableCustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new MergableCustomMetaData1(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + + @Override + public MergableCustomMetaData1 merge(MergableCustomMetaData1 other) { + return (getData().compareTo(other.getData()) >= 0) ? this : other; + } + } + + static class MergableCustomMetaData2 extends TestCustomMetaData + implements TribeService.MergableCustomMetaData { + public static final String TYPE = "custom_md_2"; + + protected MergableCustomMetaData2(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new MergableCustomMetaData2(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + + @Override + public MergableCustomMetaData2 merge(MergableCustomMetaData2 other) { + return (getData().compareTo(other.getData()) >= 0) ? this : other; + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java b/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java index 92d5b95cfac..a655f17faca 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java @@ -99,4 +99,9 @@ public abstract class TestCustomMetaData extends AbstractDiffable