Merge pull request #16273 from nik9000/convert_node_environment_settings
Switch NodeEnvironment's settings to new settings
This commit is contained in:
commit
d4c40fcbd8
|
@ -53,6 +53,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||||
|
@ -313,5 +314,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
EsExecutors.PROCESSORS_SETTING,
|
EsExecutors.PROCESSORS_SETTING,
|
||||||
ThreadContext.DEFAULT_HEADERS_SETTING,
|
ThreadContext.DEFAULT_HEADERS_SETTING,
|
||||||
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
|
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
|
||||||
ESLoggerFactory.LOG_LEVEL_SETTING)));
|
ESLoggerFactory.LOG_LEVEL_SETTING,
|
||||||
|
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
|
||||||
|
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
|
||||||
|
NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
import org.elasticsearch.common.settings.Setting.Scope;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -74,7 +76,6 @@ import static java.util.Collections.unmodifiableSet;
|
||||||
* A component that holds all data paths for a single node.
|
* A component that holds all data paths for a single node.
|
||||||
*/
|
*/
|
||||||
public class NodeEnvironment extends AbstractComponent implements Closeable {
|
public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
|
|
||||||
public static class NodePath {
|
public static class NodePath {
|
||||||
/* ${data.paths}/nodes/{node.id} */
|
/* ${data.paths}/nodes/{node.id} */
|
||||||
public final Path path;
|
public final Path path;
|
||||||
|
@ -130,22 +131,33 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
private final Map<ShardLockKey, InternalShardLock> shardLocks = new HashMap<>();
|
private final Map<ShardLockKey, InternalShardLock> shardLocks = new HashMap<>();
|
||||||
|
|
||||||
// Setting to automatically append node id to custom data paths
|
/**
|
||||||
public static final String ADD_NODE_ID_TO_CUSTOM_PATH = "node.add_id_to_custom_path";
|
* Maximum number of data nodes that should run in an environment.
|
||||||
|
*/
|
||||||
|
public static final Setting<Integer> MAX_LOCAL_STORAGE_NODES_SETTING = Setting.intSetting("node.max_local_storage_nodes", 50, 1, false,
|
||||||
|
Scope.CLUSTER);
|
||||||
|
|
||||||
// If enabled, the [verbose] SegmentInfos.infoStream logging is sent to System.out:
|
/**
|
||||||
public static final String SETTING_ENABLE_LUCENE_SEGMENT_INFOS_TRACE = "node.enable_lucene_segment_infos_trace";
|
* If true automatically append node id to custom data paths.
|
||||||
|
*/
|
||||||
|
public static final Setting<Boolean> ADD_NODE_ID_TO_CUSTOM_PATH = Setting.boolSetting("node.add_id_to_custom_path", true, false,
|
||||||
|
Scope.CLUSTER);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If true the [verbose] SegmentInfos.infoStream logging is sent to System.out.
|
||||||
|
*/
|
||||||
|
public static final Setting<Boolean> ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING = Setting
|
||||||
|
.boolSetting("node.enable_lucene_segment_infos_trace", false, false, Scope.CLUSTER);
|
||||||
|
|
||||||
public static final String NODES_FOLDER = "nodes";
|
public static final String NODES_FOLDER = "nodes";
|
||||||
public static final String INDICES_FOLDER = "indices";
|
public static final String INDICES_FOLDER = "indices";
|
||||||
public static final String NODE_LOCK_FILENAME = "node.lock";
|
public static final String NODE_LOCK_FILENAME = "node.lock";
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@SuppressForbidden(reason = "System.out.*")
|
|
||||||
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
|
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
|
||||||
this.addNodeId = settings.getAsBoolean(ADD_NODE_ID_TO_CUSTOM_PATH, true);
|
this.addNodeId = ADD_NODE_ID_TO_CUSTOM_PATH.get(settings);
|
||||||
|
|
||||||
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
|
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
|
||||||
nodePaths = null;
|
nodePaths = null;
|
||||||
|
@ -161,7 +173,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
|
|
||||||
int localNodeId = -1;
|
int localNodeId = -1;
|
||||||
IOException lastException = null;
|
IOException lastException = null;
|
||||||
int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50);
|
int maxLocalStorageNodes = MAX_LOCAL_STORAGE_NODES_SETTING.get(settings);
|
||||||
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
|
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
|
||||||
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
|
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
|
||||||
Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(NODES_FOLDER).resolve(Integer.toString(possibleLockId));
|
Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(NODES_FOLDER).resolve(Integer.toString(possibleLockId));
|
||||||
|
@ -210,9 +222,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
maybeLogPathDetails();
|
maybeLogPathDetails();
|
||||||
maybeLogHeapDetails();
|
maybeLogHeapDetails();
|
||||||
|
|
||||||
if (settings.getAsBoolean(SETTING_ENABLE_LUCENE_SEGMENT_INFOS_TRACE, false)) {
|
applySegmentInfosTrace(settings);
|
||||||
SegmentInfos.setInfoStream(System.out);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void releaseAndNullLocks(Lock[] locks) {
|
private static void releaseAndNullLocks(Lock[] locks) {
|
||||||
|
@ -303,6 +313,13 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
logger.info("heap size [{}], compressed ordinary object pointers [{}]", maxHeapSize, useCompressedOops);
|
logger.info("heap size [{}], compressed ordinary object pointers [{}]", maxHeapSize, useCompressedOops);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressForbidden(reason = "System.out.*")
|
||||||
|
static void applySegmentInfosTrace(Settings settings) {
|
||||||
|
if (ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.get(settings)) {
|
||||||
|
SegmentInfos.setInfoStream(System.out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static String toString(Collection<String> items) {
|
private static String toString(Collection<String> items) {
|
||||||
StringBuilder b = new StringBuilder();
|
StringBuilder b = new StringBuilder();
|
||||||
for(String item : items) {
|
for(String item : items) {
|
||||||
|
|
|
@ -18,10 +18,12 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.env;
|
package org.elasticsearch.env;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
import org.apache.lucene.store.LockObtainFailedException;
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.io.PathUtils;
|
import org.elasticsearch.common.io.PathUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
@ -41,35 +43,75 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.arrayWithSize;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
|
||||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS") // TODO: fix test to allow extras
|
@LuceneTestCase.SuppressFileSystems("ExtrasFS") // TODO: fix test to allow extras
|
||||||
public class NodeEnvironmentTests extends ESTestCase {
|
public class NodeEnvironmentTests extends ESTestCase {
|
||||||
private final IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", Settings.EMPTY);
|
private final IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", Settings.EMPTY);
|
||||||
|
|
||||||
|
public void testNodeLockSillySettings() {
|
||||||
|
try {
|
||||||
|
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.get(Settings.builder()
|
||||||
|
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), between(Integer.MIN_VALUE, 0)).build());
|
||||||
|
fail("expected failure");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
assertThat(e.getMessage(), containsString("must be >= 1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Even though its silly MAXINT nodes is a-ok!
|
||||||
|
int value = between(1, Integer.MAX_VALUE);
|
||||||
|
int max = NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.get(
|
||||||
|
Settings.builder().put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), value).build());
|
||||||
|
assertEquals(value, max);
|
||||||
|
}
|
||||||
|
|
||||||
public void testNodeLockSingleEnvironment() throws IOException {
|
public void testNodeLockSingleEnvironment() throws IOException {
|
||||||
NodeEnvironment env = newNodeEnvironment(Settings.builder()
|
NodeEnvironment env = newNodeEnvironment(Settings.builder()
|
||||||
.put("node.max_local_storage_nodes", 1).build());
|
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 1).build());
|
||||||
Settings settings = env.getSettings();
|
Settings settings = env.getSettings();
|
||||||
List<String> dataPaths = Environment.PATH_DATA_SETTING.get(env.getSettings());
|
List<String> dataPaths = Environment.PATH_DATA_SETTING.get(env.getSettings());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Reuse the same location and attempt to lock again
|
||||||
new NodeEnvironment(settings, new Environment(settings));
|
new NodeEnvironment(settings, new Environment(settings));
|
||||||
fail("env is already locked");
|
fail("env has already locked all the data directories it is allowed");
|
||||||
} catch (IllegalStateException ex) {
|
} catch (IllegalStateException ex) {
|
||||||
|
assertThat(ex.getMessage(), containsString("Failed to obtain node lock"));
|
||||||
}
|
}
|
||||||
env.close();
|
|
||||||
|
|
||||||
// now can recreate and lock it
|
// Close the environment that holds the lock and make sure we can get the lock after release
|
||||||
|
env.close();
|
||||||
env = new NodeEnvironment(settings, new Environment(settings));
|
env = new NodeEnvironment(settings, new Environment(settings));
|
||||||
assertEquals(env.nodeDataPaths().length, dataPaths.size());
|
assertThat(env.nodeDataPaths(), arrayWithSize(dataPaths.size()));
|
||||||
|
|
||||||
for (int i = 0; i < dataPaths.size(); i++) {
|
for (int i = 0; i < dataPaths.size(); i++) {
|
||||||
assertTrue(env.nodeDataPaths()[i].startsWith(PathUtils.get(dataPaths.get(i))));
|
assertTrue(env.nodeDataPaths()[i].startsWith(PathUtils.get(dataPaths.get(i))));
|
||||||
}
|
}
|
||||||
env.close();
|
env.close();
|
||||||
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
|
assertThat(env.lockedShards(), empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressForbidden(reason = "System.out.*")
|
||||||
|
public void testSegmentInfosTracing() {
|
||||||
|
// Defaults to not hooking up std out
|
||||||
|
assertNull(SegmentInfos.getInfoStream());
|
||||||
|
|
||||||
|
try {
|
||||||
|
// False means don't hook up std out
|
||||||
|
NodeEnvironment.applySegmentInfosTrace(
|
||||||
|
Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), false).build());
|
||||||
|
assertNull(SegmentInfos.getInfoStream());
|
||||||
|
|
||||||
|
// But true means hook std out up statically
|
||||||
|
NodeEnvironment.applySegmentInfosTrace(
|
||||||
|
Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), true).build());
|
||||||
|
assertEquals(System.out, SegmentInfos.getInfoStream());
|
||||||
|
} finally {
|
||||||
|
// Clean up after ourselves
|
||||||
|
SegmentInfos.setInfoStream(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNodeLockMultipleEnvironment() throws IOException {
|
public void testNodeLockMultipleEnvironment() throws IOException {
|
||||||
|
@ -312,7 +354,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
|
|
||||||
env.close();
|
env.close();
|
||||||
NodeEnvironment env2 = newNodeEnvironment(dataPaths, "/tmp",
|
NodeEnvironment env2 = newNodeEnvironment(dataPaths, "/tmp",
|
||||||
Settings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, false).build());
|
Settings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH.getKey(), false).build());
|
||||||
|
|
||||||
assertThat(env2.availableShardPaths(sid), equalTo(env2.availableShardPaths(sid)));
|
assertThat(env2.availableShardPaths(sid), equalTo(env2.availableShardPaths(sid)));
|
||||||
assertThat(env2.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/myindex/0")));
|
assertThat(env2.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/myindex/0")));
|
||||||
|
|
|
@ -120,7 +120,7 @@ public class ShardPathTests extends ESTestCase {
|
||||||
final boolean includeNodeId = randomBoolean();
|
final boolean includeNodeId = randomBoolean();
|
||||||
indexSetttings = indexSettingsBuilder.put(IndexMetaData.SETTING_DATA_PATH, "custom").build();
|
indexSetttings = indexSettingsBuilder.put(IndexMetaData.SETTING_DATA_PATH, "custom").build();
|
||||||
nodeSettings = settingsBuilder().put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath())
|
nodeSettings = settingsBuilder().put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath())
|
||||||
.put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, includeNodeId).build();
|
.put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH.getKey(), includeNodeId).build();
|
||||||
if (includeNodeId) {
|
if (includeNodeId) {
|
||||||
customPath = path.resolve("custom").resolve("0");
|
customPath = path.resolve("custom").resolve("0");
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue