Cut over tribe node settings to new settings infra
This commit is contained in:
parent
ce89039926
commit
4c8768eeb7
|
@ -79,6 +79,7 @@ import org.elasticsearch.transport.Transport;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.elasticsearch.tribe.TribeService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -315,6 +316,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
ThreadContext.DEFAULT_HEADERS_SETTING,
|
||||
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
|
||||
ESLoggerFactory.LOG_LEVEL_SETTING,
|
||||
TribeService.BLOCKS_METADATA_SETTING,
|
||||
TribeService.BLOCKS_WRITE_SETTING,
|
||||
TribeService.BLOCKS_WRITE_INDICES_SETTING,
|
||||
TribeService.BLOCKS_READ_INDICES_SETTING,
|
||||
TribeService.BLOCKS_METADATA_INDICES_SETTING,
|
||||
TribeService.ON_CONFLICT_SETTING,
|
||||
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
|
||||
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
|
||||
NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH)));
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
|
@ -51,12 +52,14 @@ import org.elasticsearch.gateway.GatewayService;
|
|||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
|
@ -84,12 +87,12 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
|
||||
|
||||
public static Settings processSettings(Settings settings) {
|
||||
if (settings.get(TRIBE_NAME) != null) {
|
||||
if (TRIBE_NAME_SETTING.exists(settings)) {
|
||||
// if its a node client started by this service as tribe, remove any tribe group setting
|
||||
// to avoid recursive configuration
|
||||
Settings.Builder sb = Settings.builder().put(settings);
|
||||
for (String s : settings.getAsMap().keySet()) {
|
||||
if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME)) {
|
||||
if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME_SETTING.getKey())) {
|
||||
sb.remove(s);
|
||||
}
|
||||
}
|
||||
|
@ -111,14 +114,26 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
return sb.build();
|
||||
}
|
||||
|
||||
public static final String TRIBE_NAME = "tribe.name";
|
||||
|
||||
private static final Setting<String> TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER); // internal settings only
|
||||
private final ClusterService clusterService;
|
||||
private final String[] blockIndicesWrite;
|
||||
private final String[] blockIndicesRead;
|
||||
private final String[] blockIndicesMetadata;
|
||||
|
||||
private static final String ON_CONFLICT_ANY = "any", ON_CONFLICT_DROP = "drop", ON_CONFLICT_PREFER = "prefer_";
|
||||
|
||||
public static final Setting<String> ON_CONFLICT_SETTING = new Setting<>("tribe.on_conflict", ON_CONFLICT_ANY, (s) -> {
|
||||
if (ON_CONFLICT_ANY.equals(s) || ON_CONFLICT_DROP.equals(s) || s.startsWith(ON_CONFLICT_PREFER)) {
|
||||
return s;
|
||||
}
|
||||
throw new IllegalArgumentException("Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " +s);
|
||||
}, false, Setting.Scope.CLUSTER);
|
||||
|
||||
public static final Setting<Boolean> BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
|
||||
private final String onConflict;
|
||||
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
|
@ -138,7 +153,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
if (Environment.PATH_CONF_SETTING.exists(settings)) {
|
||||
sb.put(Environment.PATH_CONF_SETTING.getKey(), Environment.PATH_CONF_SETTING.get(settings));
|
||||
}
|
||||
sb.put(TRIBE_NAME, entry.getKey());
|
||||
sb.put(TRIBE_NAME_SETTING.getKey(), entry.getKey());
|
||||
if (sb.get("http.enabled") == null) {
|
||||
sb.put("http.enabled", false);
|
||||
}
|
||||
|
@ -154,15 +169,15 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
// master elected in this single tribe node local "cluster"
|
||||
clusterService.removeInitialStateBlock(discoveryService.getNoMasterBlock());
|
||||
clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
|
||||
if (settings.getAsBoolean("tribe.blocks.write", false)) {
|
||||
if (BLOCKS_WRITE_SETTING.get(settings)) {
|
||||
clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK);
|
||||
}
|
||||
blockIndicesWrite = settings.getAsArray("tribe.blocks.write.indices", Strings.EMPTY_ARRAY);
|
||||
if (settings.getAsBoolean("tribe.blocks.metadata", false)) {
|
||||
blockIndicesWrite = BLOCKS_WRITE_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
||||
if (BLOCKS_METADATA_SETTING.get(settings)) {
|
||||
clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK);
|
||||
}
|
||||
blockIndicesMetadata = settings.getAsArray("tribe.blocks.metadata.indices", Strings.EMPTY_ARRAY);
|
||||
blockIndicesRead = settings.getAsArray("tribe.blocks.read.indices", Strings.EMPTY_ARRAY);
|
||||
blockIndicesMetadata = BLOCKS_METADATA_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
||||
blockIndicesRead = BLOCKS_READ_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
||||
for (Node node : nodes) {
|
||||
node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node));
|
||||
}
|
||||
|
@ -171,7 +186,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
this.blockIndicesRead = blockIndicesRead;
|
||||
this.blockIndicesWrite = blockIndicesWrite;
|
||||
|
||||
this.onConflict = settings.get("tribe.on_conflict", ON_CONFLICT_ANY);
|
||||
this.onConflict = ON_CONFLICT_SETTING.get(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -218,7 +233,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
private final TribeNodeClusterStateTaskExecutor executor;
|
||||
|
||||
TribeClusterStateListener(Node tribeNode) {
|
||||
String tribeName = tribeNode.settings().get(TRIBE_NAME);
|
||||
String tribeName = TRIBE_NAME_SETTING.get(tribeNode.settings());
|
||||
this.tribeName = tribeName;
|
||||
executor = new TribeNodeClusterStateTaskExecutor(tribeName);
|
||||
}
|
||||
|
@ -271,7 +286,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
// -- merge nodes
|
||||
// go over existing nodes, and see if they need to be removed
|
||||
for (DiscoveryNode discoNode : currentState.nodes()) {
|
||||
String markedTribeName = discoNode.attributes().get(TRIBE_NAME);
|
||||
String markedTribeName = discoNode.attributes().get(TRIBE_NAME_SETTING.getKey());
|
||||
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
|
||||
if (tribeState.nodes().get(discoNode.id()) == null) {
|
||||
clusterStateChanged = true;
|
||||
|
@ -288,7 +303,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
for (ObjectObjectCursor<String, String> attr : tribe.attributes()) {
|
||||
tribeAttr.put(attr.key, attr.value);
|
||||
}
|
||||
tribeAttr.put(TRIBE_NAME, tribeName);
|
||||
tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName);
|
||||
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
|
||||
clusterStateChanged = true;
|
||||
logger.info("[{}] adding node [{}]", tribeName, discoNode);
|
||||
|
@ -302,7 +317,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
|
||||
// go over existing indices, and see if they need to be removed
|
||||
for (IndexMetaData index : currentState.metaData()) {
|
||||
String markedTribeName = index.getSettings().get(TRIBE_NAME);
|
||||
String markedTribeName = TRIBE_NAME_SETTING.get(index.getSettings());
|
||||
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
|
||||
IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex());
|
||||
clusterStateChanged = true;
|
||||
|
@ -313,7 +328,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
// always make sure to update the metadata and routing table, in case
|
||||
// there are changes in them (new mapping, shards moving from initializing to started)
|
||||
routingTable.add(tribeState.routingTable().index(index.getIndex()));
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
|
||||
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
|
||||
}
|
||||
}
|
||||
|
@ -334,7 +349,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
|
||||
}
|
||||
} else {
|
||||
String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME);
|
||||
String existingFromTribe = TRIBE_NAME_SETTING.get(indexMetaData.getSettings());
|
||||
if (!tribeName.equals(existingFromTribe)) {
|
||||
// we have a potential conflict on index names, decide what to do...
|
||||
if (ON_CONFLICT_ANY.equals(onConflict)) {
|
||||
|
@ -374,7 +389,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
}
|
||||
|
||||
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
|
||||
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
|
||||
routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));
|
||||
if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex().getName())) {
|
||||
|
|
|
@ -244,8 +244,8 @@ public class TribeIT extends ESIntegTestCase {
|
|||
logger.info("wait till test1 and test2 exists in the tribe node state");
|
||||
awaitIndicesInClusterState("test1", "test2");
|
||||
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get("tribe.name"), equalTo("t1"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get("tribe.name"), equalTo("t2"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().hasIndex("conflict"), equalTo(false));
|
||||
}
|
||||
|
||||
|
@ -271,9 +271,9 @@ public class TribeIT extends ESIntegTestCase {
|
|||
logger.info("wait till test1 and test2 exists in the tribe node state");
|
||||
awaitIndicesInClusterState("test1", "test2", "conflict");
|
||||
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get(TribeService.TRIBE_NAME), equalTo(tribe));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get("tribe.name"), equalTo("t1"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get("tribe.name"), equalTo("t2"));
|
||||
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get("tribe.name"), equalTo(tribe));
|
||||
}
|
||||
|
||||
public void testTribeOnOneCluster() throws Exception {
|
||||
|
@ -438,7 +438,7 @@ public class TribeIT extends ESIntegTestCase {
|
|||
if (!node.dataNode()) {
|
||||
continue;
|
||||
}
|
||||
if (tribeName.equals(node.getAttributes().get(TribeService.TRIBE_NAME))) {
|
||||
if (tribeName.equals(node.getAttributes().get("tribe.name"))) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue