Add support for merging custom meta data in tribe node (#21552)

* Add support for merging custom meta data in tribe node

Currently, when any underlying cluster has custom metadata
(via plugin), tribe node does not store custom meta data in its
cluster state. This is because the tribe node has no idea how to
select the appropriate custom metadata from one or many custom
metadata (corresponding to the number of underlying clusters).

This change adds an interface that custom metadata implementations
can extend to add support for merging mulitple custom metadata of
the same type for storing in the tribe state.

Relates to #20544
Supersedes #20791

* Simplify updating tribe state

* Add tests for merging multiple custom metadata types in tribe node

* cleanup merging custom md logic in tribe service
This commit is contained in:
Areek Zillur 2016-11-21 12:03:01 -05:00 committed by GitHub
parent 3f2d22bc61
commit 0ccf8a742d
6 changed files with 555 additions and 24 deletions

View File

@ -20,17 +20,21 @@
package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -143,6 +147,33 @@ public class ClusterChangedEvent {
return state.metaData() != previousState.metaData();
}
/**
* Returns a set of custom meta data types when any custom metadata for the cluster has changed
* between the previous cluster state and the new cluster state. custom meta data types are
* returned iff they have been added, updated or removed between the previous and the current state
*/
public Set<String> changedCustomMetaDataSet() {
Set<String> result = new HashSet<>();
ImmutableOpenMap<String, MetaData.Custom> currentCustoms = state.metaData().customs();
ImmutableOpenMap<String, MetaData.Custom> previousCustoms = previousState.metaData().customs();
if (currentCustoms.equals(previousCustoms) == false) {
for (ObjectObjectCursor<String, MetaData.Custom> currentCustomMetaData : currentCustoms) {
// new custom md added or existing custom md changed
if (previousCustoms.containsKey(currentCustomMetaData.key) == false
|| currentCustomMetaData.value.equals(previousCustoms.get(currentCustomMetaData.key)) == false) {
result.add(currentCustomMetaData.key);
}
}
// existing custom md deleted
for (ObjectObjectCursor<String, MetaData.Custom> previousCustomMetaData : previousCustoms) {
if (currentCustoms.containsKey(previousCustomMetaData.key) == false) {
result.add(previousCustomMetaData.key);
}
}
}
return result;
}
/**
* Returns <code>true</code> iff the {@link IndexMetaData} for a given index
* has changed between the previous cluster state and the new cluster state.

View File

@ -58,7 +58,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportSettings;
@ -72,6 +71,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableMap;
@ -134,6 +134,29 @@ public class TribeService extends AbstractLifecycleComponent {
return sb.build();
}
/**
* Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom} in tribe node
* When multiple Mergable Custom metadata of the same type is found (from underlying clusters), the
* Custom metadata will be merged using {@link #merge(MetaData.Custom)} and the result will be stored
* in the tribe cluster state
*
* @param <T> type of custom meta data
*/
interface MergableCustomMetaData<T extends MetaData.Custom> {
/**
* Merges this custom metadata with other, returning either this or <code>other</code> custom metadata
* for tribe cluster state. This method should not mutate either <code>this</code> or the
* <code>other</code> custom metadata.
*
* @param other custom meta data
* @return the same instance or <code>other</code> custom metadata based on implementation
* if both the instances are considered equal, implementations should return this
* instance to avoid redundant cluster state changes.
*/
T merge(T other);
}
// internal settings only
public static final Setting<String> TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", Property.NodeScope);
private final ClusterService clusterService;
@ -270,7 +293,7 @@ public class TribeService extends AbstractLifecycleComponent {
public void startNodes() {
for (Node node : nodes) {
try {
node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node));
getClusterService(node).add(new TribeClusterStateListener(node));
node.start();
} catch (Exception e) {
// calling close is safe for non started nodes, we can just iterate over all
@ -348,23 +371,19 @@ public class TribeService extends AbstractLifecycleComponent {
@Override
public BatchResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
ClusterState accumulator = ClusterState.builder(currentState).build();
BatchResult.Builder<ClusterChangedEvent> builder = BatchResult.builder();
try {
// we only need to apply the latest cluster state update
accumulator = applyUpdate(accumulator, tasks.get(tasks.size() - 1));
builder.successes(tasks);
} catch (Exception e) {
builder.failures(tasks, e);
}
return builder.build(accumulator);
ClusterState.Builder newState = ClusterState.builder(currentState).incrementVersion();
boolean clusterStateChanged = updateNodes(currentState, tasks, newState);
clusterStateChanged |= updateIndicesAndMetaData(currentState, tasks, newState);
builder.successes(tasks);
return builder.build(clusterStateChanged ? newState.build() : currentState);
}
private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent task) {
private boolean updateNodes(ClusterState currentState, List<ClusterChangedEvent> tasks, ClusterState.Builder newState) {
boolean clusterStateChanged = false;
ClusterState tribeState = task.state();
// we only need to apply the latest cluster state update
ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1);
ClusterState tribeState = latestTask.state();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
// -- merge nodes
// go over existing nodes, and see if they need to be removed
@ -385,16 +404,25 @@ public class TribeService extends AbstractLifecycleComponent {
Map<String, String> tribeAttr = new HashMap<>(tribe.getAttributes());
tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName);
DiscoveryNode discoNode = new DiscoveryNode(tribe.getName(), tribe.getId(), tribe.getEphemeralId(),
tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(),
tribe.getVersion());
tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(),
tribe.getVersion());
clusterStateChanged = true;
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.remove(tribe.getId()); // remove any existing node with the same id but different ephemeral id
nodes.add(discoNode);
}
}
if (clusterStateChanged) {
newState.nodes(nodes);
}
return clusterStateChanged;
}
// -- merge metadata
private boolean updateIndicesAndMetaData(ClusterState currentState, List<ClusterChangedEvent> tasks, ClusterState.Builder newState) {
// we only need to apply the latest cluster state update
ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1);
ClusterState tribeState = latestTask.state();
boolean clusterStateChanged = false;
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
@ -462,13 +490,49 @@ public class TribeService extends AbstractLifecycleComponent {
}
}
}
if (!clusterStateChanged) {
return currentState;
} else {
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData)
.routingTable(routingTable.build()).build();
clusterStateChanged |= updateCustoms(currentState, tasks, metaData);
if (clusterStateChanged) {
newState.blocks(blocks);
newState.metaData(metaData);
newState.routingTable(routingTable.build());
}
return clusterStateChanged;
}
private boolean updateCustoms(ClusterState currentState, List<ClusterChangedEvent> tasks, MetaData.Builder metaData) {
boolean clusterStateChanged = false;
Set<String> changedCustomMetaDataTypeSet = tasks.stream()
.map(ClusterChangedEvent::changedCustomMetaDataSet)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
final List<Node> tribeClientNodes = TribeService.this.nodes;
Map<String, MetaData.Custom> mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet,
customMetaDataType -> tribeClientNodes.stream()
.map(TribeService::getClusterService).map(ClusterService::state)
.map(ClusterState::metaData)
.map(clusterMetaData -> ((MetaData.Custom) clusterMetaData.custom(customMetaDataType)))
.filter(custom1 -> custom1 != null && custom1 instanceof MergableCustomMetaData)
.map(custom2 -> (MergableCustomMetaData) custom2)
.collect(Collectors.toList())
);
for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) {
MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType);
if (mergedCustomMetaData == null) {
// we ignore merging custom md which doesn't implement MergableCustomMetaData interface
if (currentState.metaData().custom(changedCustomMetaDataType) instanceof MergableCustomMetaData) {
// custom md has been removed
clusterStateChanged = true;
logger.info("[{}] removing custom meta data type [{}]", tribeName, changedCustomMetaDataType);
metaData.removeCustom(changedCustomMetaDataType);
}
} else {
// custom md has been changed
clusterStateChanged = true;
logger.info("[{}] updating custom meta data type [{}] data [{}]", tribeName, changedCustomMetaDataType, mergedCustomMetaData);
metaData.putCustom(changedCustomMetaDataType, mergedCustomMetaData);
}
}
return clusterStateChanged;
}
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable,
@ -494,4 +558,23 @@ public class TribeService extends AbstractLifecycleComponent {
}
}
}
private static ClusterService getClusterService(Node node) {
return node.injector().getInstance(ClusterService.class);
}
// pkg-private for testing
static Map<String, MetaData.Custom> mergeChangedCustomMetaData(Set<String> changedCustomMetaDataTypeSet,
Function<String, List<MergableCustomMetaData>> customMetaDataByTribeNode) {
Map<String, MetaData.Custom> changedCustomMetaDataMap = new HashMap<>(changedCustomMetaDataTypeSet.size());
for (String customMetaDataType : changedCustomMetaDataTypeSet) {
customMetaDataByTribeNode.apply(customMetaDataType).stream()
.reduce((mergableCustomMD, mergableCustomMD2) ->
((MergableCustomMetaData) mergableCustomMD.merge((MetaData.Custom) mergableCustomMD2)))
.ifPresent(mergedCustomMetaData ->
changedCustomMetaDataMap.put(customMetaDataType, ((MetaData.Custom) mergedCustomMetaData)));
}
return changedCustomMetaDataMap;
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
@ -33,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestCustomMetaData;
import java.util.ArrayList;
import java.util.Arrays;
@ -222,6 +224,128 @@ public class ClusterChangedEventTests extends ESTestCase {
assertTrue("index routing table should not be the same object", event.indexRoutingTableChanged(initialIndices.get(0).getName()));
}
/**
* Test custom metadata change checks
*/
public void testChangedCustomMetaDataSet() {
final int numNodesInCluster = 3;
final ClusterState originalState = createState(numNodesInCluster, randomBoolean(), initialIndices);
CustomMetaData1 customMetaData1 = new CustomMetaData1("data");
final ClusterState stateWithCustomMetaData = nextState(originalState, Collections.singletonList(customMetaData1));
// no custom metadata present in any state
ClusterState nextState = ClusterState.builder(originalState).build();
ClusterChangedEvent event = new ClusterChangedEvent("_na_", originalState, nextState);
assertTrue(event.changedCustomMetaDataSet().isEmpty());
// next state has new custom metadata
nextState = nextState(originalState, Collections.singletonList(customMetaData1));
event = new ClusterChangedEvent("_na_", originalState, nextState);
Set<String> changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.size() == 1);
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type()));
// next state has same custom metadata
nextState = nextState(originalState, Collections.singletonList(customMetaData1));
event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.isEmpty());
// next state has equivalent custom metadata
nextState = nextState(originalState, Collections.singletonList(new CustomMetaData1("data")));
event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.isEmpty());
// next state removes custom metadata
nextState = originalState;
event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.size() == 1);
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type()));
// next state updates custom metadata
nextState = nextState(stateWithCustomMetaData, Collections.singletonList(new CustomMetaData1("data1")));
event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.size() == 1);
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type()));
// next state adds new custom metadata type
CustomMetaData2 customMetaData2 = new CustomMetaData2("data2");
nextState = nextState(stateWithCustomMetaData, Arrays.asList(customMetaData1, customMetaData2));
event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.size() == 1);
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type()));
// next state adds two custom metadata type
nextState = nextState(originalState, Arrays.asList(customMetaData1, customMetaData2));
event = new ClusterChangedEvent("_na_", originalState, nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.size() == 2);
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type()));
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type()));
// next state removes two custom metadata type
nextState = originalState;
event = new ClusterChangedEvent("_na_",
nextState(originalState, Arrays.asList(customMetaData1, customMetaData2)), nextState);
changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet();
assertTrue(changedCustomMetaDataTypeSet.size() == 2);
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type()));
assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type()));
}
private static class CustomMetaData2 extends TestCustomMetaData {
static {
MetaData.registerPrototype("2", new CustomMetaData2(""));
}
protected CustomMetaData2(String data) {
super(data);
}
@Override
protected TestCustomMetaData newTestCustomMetaData(String data) {
return new CustomMetaData2(data);
}
@Override
public String type() {
return "2";
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY);
}
}
private static class CustomMetaData1 extends TestCustomMetaData {
static {
MetaData.registerPrototype("1", new CustomMetaData1(""));
}
protected CustomMetaData1(String data) {
super(data);
}
@Override
protected TestCustomMetaData newTestCustomMetaData(String data) {
return new CustomMetaData1(data);
}
@Override
public String type() {
return "1";
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY);
}
}
private static ClusterState createSimpleClusterState() {
return ClusterState.builder(TEST_CLUSTER_NAME).build();
}
@ -244,6 +368,22 @@ public class ClusterChangedEventTests extends ESTestCase {
.build();
}
private static ClusterState nextState(final ClusterState previousState, List<TestCustomMetaData> customMetaDataList) {
final ClusterState.Builder builder = ClusterState.builder(previousState);
builder.stateUUID(UUIDs.randomBase64UUID());
MetaData.Builder metaDataBuilder = new MetaData.Builder(previousState.metaData());
for (ObjectObjectCursor<String, MetaData.Custom> customMetaData : previousState.metaData().customs()) {
if (customMetaData.value instanceof TestCustomMetaData) {
metaDataBuilder.removeCustom(customMetaData.key);
}
}
for (TestCustomMetaData testCustomMetaData : customMetaDataList) {
metaDataBuilder.putCustom(testCustomMetaData.type(), testCustomMetaData);
}
builder.metaData(metaDataBuilder);
return builder.build();
}
// Create a modified cluster state from another one, but with some number of indices added and deleted.
private static ClusterState nextState(final ClusterState previousState, final boolean changeClusterUUID,
final List<Index> addedIndices, final List<Index> deletedIndices, final int numNodesToRemove) {

View File

@ -22,12 +22,15 @@ package org.elasticsearch.tribe;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
@ -41,7 +44,10 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData1;
import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData2;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -52,9 +58,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@ -446,6 +455,132 @@ public class TribeIT extends ESIntegTestCase {
}
}
public void testMergingRemovedCustomMetaData() throws Exception {
MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1(""));
removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE);
removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE);
MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a");
MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b");
try (Releasable tribeNode = startTribeNode()) {
assertNodes(ALL);
putCustomMetaData(cluster1, customMetaData1);
putCustomMetaData(cluster2, customMetaData2);
assertCustomMetaDataUpdated(internalCluster(), customMetaData2);
removeCustomMetaData(cluster2, customMetaData2.type());
assertCustomMetaDataUpdated(internalCluster(), customMetaData1);
}
}
public void testMergingCustomMetaData() throws Exception {
MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1(""));
removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE);
removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE);
MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1(randomAsciiOfLength(10));
MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1(randomAsciiOfLength(10));
List<MergableCustomMetaData1> customMetaDatas = Arrays.asList(customMetaData1, customMetaData2);
Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0);
try (Releasable tribeNode = startTribeNode()) {
assertNodes(ALL);
putCustomMetaData(cluster1, customMetaData1);
assertCustomMetaDataUpdated(internalCluster(), customMetaData1);
putCustomMetaData(cluster2, customMetaData2);
assertCustomMetaDataUpdated(internalCluster(), tribeNodeCustomMetaData);
}
}
public void testMergingMultipleCustomMetaData() throws Exception {
MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1(""));
MetaData.registerPrototype(MergableCustomMetaData2.TYPE, new MergableCustomMetaData2(""));
removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE);
removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE);
MergableCustomMetaData1 firstCustomMetaDataType1 = new MergableCustomMetaData1(randomAsciiOfLength(10));
MergableCustomMetaData1 secondCustomMetaDataType1 = new MergableCustomMetaData1(randomAsciiOfLength(10));
MergableCustomMetaData2 firstCustomMetaDataType2 = new MergableCustomMetaData2(randomAsciiOfLength(10));
MergableCustomMetaData2 secondCustomMetaDataType2 = new MergableCustomMetaData2(randomAsciiOfLength(10));
List<MergableCustomMetaData1> mergedCustomMetaDataType1 = Arrays.asList(firstCustomMetaDataType1, secondCustomMetaDataType1);
List<MergableCustomMetaData2> mergedCustomMetaDataType2 = Arrays.asList(firstCustomMetaDataType2, secondCustomMetaDataType2);
Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
try (Releasable tribeNode = startTribeNode()) {
assertNodes(ALL);
// test putting multiple custom md types propagates to tribe
putCustomMetaData(cluster1, firstCustomMetaDataType1);
putCustomMetaData(cluster1, firstCustomMetaDataType2);
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
// test multiple same type custom md is merged and propagates to tribe
putCustomMetaData(cluster2, secondCustomMetaDataType1);
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0));
// test multiple same type custom md is merged and propagates to tribe
putCustomMetaData(cluster2, secondCustomMetaDataType2);
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0));
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0));
// test removing custom md is propagates to tribe
removeCustomMetaData(cluster2, secondCustomMetaDataType1.type());
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0));
removeCustomMetaData(cluster2, secondCustomMetaDataType2.type());
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
}
}
private static void assertCustomMetaDataUpdated(InternalTestCluster cluster,
TestCustomMetaData expectedCustomMetaData) throws Exception {
assertBusy(() -> {
ClusterState tribeState = cluster.getInstance(ClusterService.class, cluster.getNodeNames()[0]).state();
MetaData.Custom custom = tribeState.metaData().custom(expectedCustomMetaData.type());
assertNotNull(custom);
assertThat(custom, equalTo(expectedCustomMetaData));
});
}
private void removeCustomMetaData(InternalTestCluster cluster, final String customMetaDataType) {
logger.info("removing custom_md type [{}] from [{}]", customMetaDataType, cluster.getClusterName());
updateMetaData(cluster, builder -> builder.removeCustom(customMetaDataType));
}
private void putCustomMetaData(InternalTestCluster cluster, final TestCustomMetaData customMetaData) {
logger.info("putting custom_md type [{}] with data[{}] from [{}]", customMetaData.type(),
customMetaData.getData(), cluster.getClusterName());
updateMetaData(cluster, builder -> builder.putCustom(customMetaData.type(), customMetaData));
}
private static void updateMetaData(InternalTestCluster cluster, UnaryOperator<MetaData.Builder> addCustoms) {
ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName());
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData.Builder builder = MetaData.builder(currentState.metaData());
builder = addCustoms.apply(builder);
return new ClusterState.Builder(currentState).metaData(builder).build();
}
@Override
public void onFailure(String source, Exception e) {
fail("failed to apply cluster state from [" + source + "] with " + e.getMessage());
}
});
try {
latch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
fail("latch waiting on publishing custom md interrupted [" + e.getMessage() + "]");
}
assertThat("timed out trying to add custom metadata to " + cluster.getClusterName(), latch.getCount(), equalTo(0L));
}
private void assertIndicesExist(Client client, String... indices) throws Exception {
assertBusy(() -> {
ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState();

View File

@ -19,9 +19,22 @@
package org.elasticsearch.tribe;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestCustomMetaData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.instanceOf;
public class TribeServiceTests extends ESTestCase {
public void testMinimalSettings() {
@ -96,4 +109,128 @@ public class TribeServiceTests extends ESTestCase {
assertEquals("7.7.7.7", clientSettings.get("transport.bind_host"));
assertEquals("8.8.8.8", clientSettings.get("transport.publish_host"));
}
public void testMergeCustomMetaDataSimple() {
Map<String, MetaData.Custom> mergedCustoms =
TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE),
s -> Collections.singletonList(new MergableCustomMetaData1("data1")));
TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
assertNotNull(mergedCustom);
assertEquals(mergedCustom.getData(), "data1");
}
public void testMergeCustomMetaData() {
Map<String, MetaData.Custom> mergedCustoms =
TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE),
s -> Arrays.asList(new MergableCustomMetaData1("data1"), new MergableCustomMetaData1("data2")));
TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
assertNotNull(mergedCustom);
assertEquals(mergedCustom.getData(), "data2");
}
public void testMergeMultipleCustomMetaData() {
Map<String, List<TribeService.MergableCustomMetaData>> inputMap = new HashMap<>();
inputMap.put(MergableCustomMetaData1.TYPE,
Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11")));
inputMap.put(MergableCustomMetaData2.TYPE,
Arrays.asList(new MergableCustomMetaData2("data21"), new MergableCustomMetaData2("data20")));
Map<String, MetaData.Custom> mergedCustoms = TribeService.mergeChangedCustomMetaData(
Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get);
TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
assertNotNull(mergedCustom);
assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
assertEquals(mergedCustom.getData(), "data11");
mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE);
assertNotNull(mergedCustom);
assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class));
assertEquals(mergedCustom.getData(), "data21");
}
public void testMergeCustomMetaDataFromMany() {
Map<String, List<TribeService.MergableCustomMetaData>> inputMap = new HashMap<>();
int n = randomIntBetween(3, 5);
List<TribeService.MergableCustomMetaData> customList1 = new ArrayList<>();
for (int i = 0; i <= n; i++) {
customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i)));
}
Collections.shuffle(customList1, random());
inputMap.put(MergableCustomMetaData1.TYPE, customList1);
List<TribeService.MergableCustomMetaData> customList2 = new ArrayList<>();
for (int i = 0; i <= n; i++) {
customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i)));
}
Collections.shuffle(customList2, random());
inputMap.put(MergableCustomMetaData2.TYPE, customList2);
Map<String, MetaData.Custom> mergedCustoms = TribeService.mergeChangedCustomMetaData(
Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get);
TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
assertNotNull(mergedCustom);
assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
assertEquals(mergedCustom.getData(), "data1"+String.valueOf(n));
mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE);
assertNotNull(mergedCustom);
assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class));
assertEquals(mergedCustom.getData(), "data2"+String.valueOf(n));
}
static class MergableCustomMetaData1 extends TestCustomMetaData
implements TribeService.MergableCustomMetaData<MergableCustomMetaData1> {
public static final String TYPE = "custom_md_1";
protected MergableCustomMetaData1(String data) {
super(data);
}
@Override
protected TestCustomMetaData newTestCustomMetaData(String data) {
return new MergableCustomMetaData1(data);
}
@Override
public String type() {
return TYPE;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY);
}
@Override
public MergableCustomMetaData1 merge(MergableCustomMetaData1 other) {
return (getData().compareTo(other.getData()) >= 0) ? this : other;
}
}
static class MergableCustomMetaData2 extends TestCustomMetaData
implements TribeService.MergableCustomMetaData<MergableCustomMetaData2> {
public static final String TYPE = "custom_md_2";
protected MergableCustomMetaData2(String data) {
super(data);
}
@Override
protected TestCustomMetaData newTestCustomMetaData(String data) {
return new MergableCustomMetaData2(data);
}
@Override
public String type() {
return TYPE;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY);
}
@Override
public MergableCustomMetaData2 merge(MergableCustomMetaData2 other) {
return (getData().compareTo(other.getData()) >= 0) ? this : other;
}
}
}

View File

@ -99,4 +99,9 @@ public abstract class TestCustomMetaData extends AbstractDiffable<MetaData.Custo
builder.field("data", getData());
return builder;
}
@Override
public String toString() {
return "[" + type() + "][" + data +"]";
}
}