Notify affixMap settings when any under the registered prefix matches (#28317)

* Notify affixMap settings when any under the registered prefix matches

Previously if an affixMap setting was registered, and then a completely
different setting was applied, the affixMap update consumer would be notified
with an empty map. This caused settings that were previously set to be unset in
local state in a consumer that assumed it would only be called when the affixMap
setting was changed.

This commit changes the behavior so if a prefix `foo.` is registered, any
setting under the prefix will have the update consumer notified if there are
changes starting with `foo.`.

Resolves #28316

* Add unit test

* Address feedback
This commit is contained in:
Lee Hinman 2018-01-22 11:55:54 -07:00 committed by GitHub
parent 0c83ee2a5d
commit ba5b583203
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 2 deletions

View File

@ -597,7 +597,7 @@ public class Setting<T> implements ToXContentObject {
@Override
public boolean hasChanged(Settings current, Settings previous) {
return Stream.concat(matchStream(current), matchStream(previous)).findAny().isPresent();
return current.filter(k -> match(k)).equals(previous.filter(k -> match(k))) == false;
}
@Override
@ -612,7 +612,7 @@ public class Setting<T> implements ToXContentObject {
if (updater.hasChanged(current, previous)) {
// only the ones that have changed otherwise we might get too many updates
// the hasChanged above checks only if there are any changes
T value = updater.getValue(current, previous);
T value = updater.getValue(current, previous);
if ((omitDefaults && value.equals(concreteSetting.getDefault(current))) == false) {
result.put(namespace, value);
}

View File

@ -21,11 +21,14 @@ package org.elasticsearch.cluster.allocation;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -34,7 +37,9 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
@ -156,5 +161,58 @@ public class FilteringAllocationIT extends ESIntegTestCase {
.execute().actionGet());
assertEquals("invalid IP address [192.168.1.1.] for [" + filterSetting.getKey() + ipKey + "]", e.getMessage());
}
public void testTransientSettingsStillApplied() throws Exception {
List<String> nodes = internalCluster().startNodes(6);
Set<String> excludeNodes = new HashSet<>(nodes.subList(0, 3));
Set<String> includeNodes = new HashSet<>(nodes.subList(3, 6));
logger.info("--> exclude: [{}], include: [{}]",
Strings.collectionToCommaDelimitedString(excludeNodes),
Strings.collectionToCommaDelimitedString(includeNodes));
ensureStableCluster(6);
client().admin().indices().prepareCreate("test").get();
ensureGreen("test");
Settings exclude = Settings.builder().put("cluster.routing.allocation.exclude._name",
Strings.collectionToCommaDelimitedString(excludeNodes)).build();
logger.info("--> updating settings");
client().admin().cluster().prepareUpdateSettings().setTransientSettings(exclude).get();
logger.info("--> waiting for relocation");
waitForRelocation(ClusterHealthStatus.GREEN);
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ShardRouting shard : state.getRoutingTable().shardsWithState(ShardRoutingState.STARTED)) {
String node = state.getRoutingNodes().node(shard.currentNodeId()).node().getName();
logger.info("--> shard on {} - {}", node, shard);
assertTrue("shard on " + node + " but should only be on the include node list: " +
Strings.collectionToCommaDelimitedString(includeNodes),
includeNodes.contains(node));
}
Settings other = Settings.builder().put("cluster.info.update.interval", "45s").build();
logger.info("--> updating settings with random persistent setting");
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(other).setTransientSettings(exclude).get();
logger.info("--> waiting for relocation");
waitForRelocation(ClusterHealthStatus.GREEN);
state = client().admin().cluster().prepareState().get().getState();
// The transient settings still exist in the state
assertThat(state.metaData().transientSettings(), equalTo(exclude));
for (ShardRouting shard : state.getRoutingTable().shardsWithState(ShardRoutingState.STARTED)) {
String node = state.getRoutingNodes().node(shard.currentNodeId()).node().getName();
logger.info("--> shard on {} - {}", node, shard);
assertTrue("shard on " + node + " but should only be on the include node list: " +
Strings.collectionToCommaDelimitedString(includeNodes),
includeNodes.contains(node));
}
}
}

View File

@ -261,6 +261,21 @@ public class ScopedSettingsTests extends ESTestCase {
assertEquals(2, listResults.size());
assertEquals(2, intResults.size());
service.applySettings(Settings.builder()
.put("foo.test.bar", 2)
.put("foo.test_1.bar", 7)
.putList("foo.test_list.list", "16", "17")
.putList("foo.test_list_1.list", "18", "19", "20")
.build());
assertEquals(2, intResults.get("test").intValue());
assertEquals(7, intResults.get("test_1").intValue());
assertEquals(Arrays.asList(16, 17), listResults.get("test_list"));
assertEquals(Arrays.asList(18, 19, 20), listResults.get("test_list_1"));
assertEquals(2, listResults.size());
assertEquals(2, intResults.size());
listResults.clear();
intResults.clear();
@ -286,6 +301,35 @@ public class ScopedSettingsTests extends ESTestCase {
}
public void testAffixMapConsumerNotCalledWithNull() {
Setting.AffixSetting<Integer> prefixSetting = Setting.prefixKeySetting("eggplant.",
(k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope));
Setting.AffixSetting<Integer> otherSetting = Setting.prefixKeySetting("other.",
(k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope));
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,new HashSet<>(Arrays.asList(prefixSetting, otherSetting)));
Map<String, Integer> affixResults = new HashMap<>();
Consumer<Map<String,Integer>> consumer = (map) -> {
logger.info("--> consuming settings {}", map);
affixResults.clear();
affixResults.putAll(map);
};
service.addAffixMapUpdateConsumer(prefixSetting, consumer, (s, k) -> {}, randomBoolean());
assertEquals(0, affixResults.size());
service.applySettings(Settings.builder()
.put("eggplant._name", 2)
.build());
assertThat(affixResults.size(), equalTo(1));
assertThat(affixResults.get("_name"), equalTo(2));
service.applySettings(Settings.builder()
.put("eggplant._name", 2)
.put("other.thing", 3)
.build());
assertThat(affixResults.get("_name"), equalTo(2));
}
public void testApply() {
Setting<Integer> testSetting = Setting.intSetting("foo.bar", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> testSetting2 = Setting.intSetting("foo.bar.baz", 1, Property.Dynamic, Property.NodeScope);