diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 519fc259840..db5c6f78fa7 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -79,6 +79,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.tribe.TribeService; import java.util.Arrays; import java.util.Collections; @@ -315,6 +316,12 @@ public final class ClusterSettings extends AbstractScopedSettings { ThreadContext.DEFAULT_HEADERS_SETTING, ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING, ESLoggerFactory.LOG_LEVEL_SETTING, + TribeService.BLOCKS_METADATA_SETTING, + TribeService.BLOCKS_WRITE_SETTING, + TribeService.BLOCKS_WRITE_INDICES_SETTING, + TribeService.BLOCKS_READ_INDICES_SETTING, + TribeService.BLOCKS_METADATA_INDICES_SETTING, + TribeService.ON_CONFLICT_SETTING, NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING, NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING, NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH))); diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index e576f26eb4c..ceb66532f6d 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.DiscoveryModule; @@ -51,12 +52,14 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.Node; import org.elasticsearch.rest.RestStatus; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; import static java.util.Collections.unmodifiableMap; @@ -84,12 +87,12 @@ public class TribeService extends AbstractLifecycleComponent { public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE)); public static Settings processSettings(Settings settings) { - if (settings.get(TRIBE_NAME) != null) { + if (TRIBE_NAME_SETTING.exists(settings)) { // if its a node client started by this service as tribe, remove any tribe group setting // to avoid recursive configuration Settings.Builder sb = Settings.builder().put(settings); for (String s : settings.getAsMap().keySet()) { - if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME)) { + if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME_SETTING.getKey())) { sb.remove(s); } } @@ -111,14 +114,26 @@ public class TribeService extends AbstractLifecycleComponent { return sb.build(); } - public static final String TRIBE_NAME = "tribe.name"; - + private static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER); // internal settings only private final ClusterService clusterService; private final String[] blockIndicesWrite; private final String[] blockIndicesRead; private final String[] blockIndicesMetadata; - private static final String ON_CONFLICT_ANY = "any", ON_CONFLICT_DROP = "drop", ON_CONFLICT_PREFER = "prefer_"; + + public static final Setting ON_CONFLICT_SETTING = new Setting<>("tribe.on_conflict", ON_CONFLICT_ANY, (s) -> { + if (ON_CONFLICT_ANY.equals(s) || ON_CONFLICT_DROP.equals(s) || s.startsWith(ON_CONFLICT_PREFER)) { + return s; + } + throw new IllegalArgumentException("Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " +s); + }, false, Setting.Scope.CLUSTER); + + public static final Setting BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false, Setting.Scope.CLUSTER); + public static final Setting BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false, Setting.Scope.CLUSTER); + public static final Setting> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); + public static final Setting> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); + public static final Setting> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); + private final String onConflict; private final Set droppedIndices = ConcurrentCollections.newConcurrentSet(); @@ -138,7 +153,7 @@ public class TribeService extends AbstractLifecycleComponent { if (Environment.PATH_CONF_SETTING.exists(settings)) { sb.put(Environment.PATH_CONF_SETTING.getKey(), Environment.PATH_CONF_SETTING.get(settings)); } - sb.put(TRIBE_NAME, entry.getKey()); + sb.put(TRIBE_NAME_SETTING.getKey(), entry.getKey()); if (sb.get("http.enabled") == null) { sb.put("http.enabled", false); } @@ -154,15 +169,15 @@ public class TribeService extends AbstractLifecycleComponent { // master elected in this single tribe node local "cluster" clusterService.removeInitialStateBlock(discoveryService.getNoMasterBlock()); clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); - if (settings.getAsBoolean("tribe.blocks.write", false)) { + if (BLOCKS_WRITE_SETTING.get(settings)) { clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK); } - blockIndicesWrite = settings.getAsArray("tribe.blocks.write.indices", Strings.EMPTY_ARRAY); - if (settings.getAsBoolean("tribe.blocks.metadata", false)) { + blockIndicesWrite = BLOCKS_WRITE_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); + if (BLOCKS_METADATA_SETTING.get(settings)) { clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK); } - blockIndicesMetadata = settings.getAsArray("tribe.blocks.metadata.indices", Strings.EMPTY_ARRAY); - blockIndicesRead = settings.getAsArray("tribe.blocks.read.indices", Strings.EMPTY_ARRAY); + blockIndicesMetadata = BLOCKS_METADATA_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); + blockIndicesRead = BLOCKS_READ_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); for (Node node : nodes) { node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); } @@ -171,7 +186,7 @@ public class TribeService extends AbstractLifecycleComponent { this.blockIndicesRead = blockIndicesRead; this.blockIndicesWrite = blockIndicesWrite; - this.onConflict = settings.get("tribe.on_conflict", ON_CONFLICT_ANY); + this.onConflict = ON_CONFLICT_SETTING.get(settings); } @Override @@ -218,7 +233,7 @@ public class TribeService extends AbstractLifecycleComponent { private final TribeNodeClusterStateTaskExecutor executor; TribeClusterStateListener(Node tribeNode) { - String tribeName = tribeNode.settings().get(TRIBE_NAME); + String tribeName = TRIBE_NAME_SETTING.get(tribeNode.settings()); this.tribeName = tribeName; executor = new TribeNodeClusterStateTaskExecutor(tribeName); } @@ -271,7 +286,7 @@ public class TribeService extends AbstractLifecycleComponent { // -- merge nodes // go over existing nodes, and see if they need to be removed for (DiscoveryNode discoNode : currentState.nodes()) { - String markedTribeName = discoNode.attributes().get(TRIBE_NAME); + String markedTribeName = discoNode.attributes().get(TRIBE_NAME_SETTING.getKey()); if (markedTribeName != null && markedTribeName.equals(tribeName)) { if (tribeState.nodes().get(discoNode.id()) == null) { clusterStateChanged = true; @@ -288,7 +303,7 @@ public class TribeService extends AbstractLifecycleComponent { for (ObjectObjectCursor attr : tribe.attributes()) { tribeAttr.put(attr.key, attr.value); } - tribeAttr.put(TRIBE_NAME, tribeName); + tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName); DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version()); clusterStateChanged = true; logger.info("[{}] adding node [{}]", tribeName, discoNode); @@ -302,7 +317,7 @@ public class TribeService extends AbstractLifecycleComponent { RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); // go over existing indices, and see if they need to be removed for (IndexMetaData index : currentState.metaData()) { - String markedTribeName = index.getSettings().get(TRIBE_NAME); + String markedTribeName = TRIBE_NAME_SETTING.get(index.getSettings()); if (markedTribeName != null && markedTribeName.equals(tribeName)) { IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex()); clusterStateChanged = true; @@ -313,7 +328,7 @@ public class TribeService extends AbstractLifecycleComponent { // always make sure to update the metadata and routing table, in case // there are changes in them (new mapping, shards moving from initializing to started) routingTable.add(tribeState.routingTable().index(index.getIndex())); - Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); + Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build(); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); } } @@ -334,7 +349,7 @@ public class TribeService extends AbstractLifecycleComponent { addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); } } else { - String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME); + String existingFromTribe = TRIBE_NAME_SETTING.get(indexMetaData.getSettings()); if (!tribeName.equals(existingFromTribe)) { // we have a potential conflict on index names, decide what to do... if (ON_CONFLICT_ANY.equals(onConflict)) { @@ -374,7 +389,7 @@ public class TribeService extends AbstractLifecycleComponent { } private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { - Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build(); + Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build(); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex())); if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex().getName())) { diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 506321684ba..654c827b6bd 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -244,8 +244,8 @@ public class TribeIT extends ESIntegTestCase { logger.info("wait till test1 and test2 exists in the tribe node state"); awaitIndicesInClusterState("test1", "test2"); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get("tribe.name"), equalTo("t1")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get("tribe.name"), equalTo("t2")); assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().hasIndex("conflict"), equalTo(false)); } @@ -271,9 +271,9 @@ public class TribeIT extends ESIntegTestCase { logger.info("wait till test1 and test2 exists in the tribe node state"); awaitIndicesInClusterState("test1", "test2", "conflict"); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get(TribeService.TRIBE_NAME), equalTo(tribe)); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get("tribe.name"), equalTo("t1")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get("tribe.name"), equalTo("t2")); + assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get("tribe.name"), equalTo(tribe)); } public void testTribeOnOneCluster() throws Exception { @@ -438,7 +438,7 @@ public class TribeIT extends ESIntegTestCase { if (!node.dataNode()) { continue; } - if (tribeName.equals(node.getAttributes().get(TribeService.TRIBE_NAME))) { + if (tribeName.equals(node.getAttributes().get("tribe.name"))) { count++; } }