Merge pull request #16295 from s1monw/convert_multicast_settings
Convert multcast plugin settings to the new infra
This commit is contained in:
commit
7f01771021
|
@ -328,6 +328,10 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
}, dynamic, scope);
|
||||
}
|
||||
|
||||
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, int maxValue, boolean dynamic, Scope scope) {
|
||||
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, maxValue, key), dynamic, scope);
|
||||
}
|
||||
|
||||
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, boolean dynamic, Scope scope) {
|
||||
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), dynamic, scope);
|
||||
}
|
||||
|
@ -341,10 +345,17 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
}
|
||||
|
||||
public static int parseInt(String s, int minValue, String key) {
|
||||
return parseInt(s, minValue, Integer.MAX_VALUE, key);
|
||||
}
|
||||
|
||||
public static int parseInt(String s, int minValue, int maxValue, String key) {
|
||||
int value = Integer.parseInt(s);
|
||||
if (value < minValue) {
|
||||
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
|
||||
}
|
||||
if (value > maxValue) {
|
||||
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be =< " + maxValue);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class SettingTests extends ESTestCase {
|
||||
|
@ -364,4 +365,24 @@ public class SettingTests extends ESTestCase {
|
|||
assertEquals("key must match setting but didn't [foo]", ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testMinMaxInt() {
|
||||
Setting<Integer> integerSetting = Setting.intSetting("foo.bar", 1, 0, 10, false, Setting.Scope.CLUSTER);
|
||||
try {
|
||||
integerSetting.get(Settings.builder().put("foo.bar", 11).build());
|
||||
fail();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("Failed to parse value [11] for setting [foo.bar] must be =< 10", ex.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
integerSetting.get(Settings.builder().put("foo.bar", -1).build());
|
||||
fail();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("Failed to parse value [-1] for setting [foo.bar] must be >= 0", ex.getMessage());
|
||||
}
|
||||
|
||||
assertEquals(5, integerSetting.get(Settings.builder().put("foo.bar", 5).build()).intValue());
|
||||
assertEquals(1, integerSetting.get(Settings.EMPTY).intValue());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.plugin.discovery.multicast;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
|
@ -46,4 +47,15 @@ public class MulticastDiscoveryPlugin extends Plugin {
|
|||
module.addZenPing(MulticastZenPing.class);
|
||||
}
|
||||
}
|
||||
|
||||
public void onModule(SettingsModule module) {
|
||||
module.registerSetting(MulticastZenPing.ADDRESS_SETTING);
|
||||
module.registerSetting(MulticastZenPing.GROUP_SETTING);
|
||||
module.registerSetting(MulticastZenPing.PORT_SETTING);
|
||||
module.registerSetting(MulticastZenPing.SHARED_SETTING);
|
||||
module.registerSetting(MulticastZenPing.TTL_SETTING);
|
||||
module.registerSetting(MulticastZenPing.BUFFER_SIZE_SETTING);
|
||||
module.registerSetting(MulticastZenPing.PING_ENABLED_SETTING);
|
||||
module.registerSetting(MulticastZenPing.DEFERE_TO_INTERFACE_SETTING);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
@ -47,7 +48,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
@ -102,6 +106,14 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
|
||||
private final AtomicInteger pingIdGenerator = new AtomicInteger();
|
||||
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
|
||||
public static final Setting<String> ADDRESS_SETTING = Setting.simpleString("discovery.zen.ping.multicast.address", false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> PORT_SETTING = Setting.intSetting("discovery.zen.ping.multicast.port", 54328, 0, (1<<16)-1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<String> GROUP_SETTING = new Setting<>("discovery.zen.ping.multicast.group", "224.2.2.4", Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("discovery.zen.ping.multicast.buffer_size", new ByteSizeValue(2048, ByteSizeUnit.BYTES), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> TTL_SETTING = Setting.intSetting("discovery.zen.ping.multicast.ttl", 3, 0, 255, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> PING_ENABLED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.ping.enabled", true, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> SHARED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.shared", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> DEFERE_TO_INTERFACE_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.defer_group_to_set_interface", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER);
|
||||
|
||||
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
|
||||
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
|
||||
|
@ -116,13 +128,12 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
this.networkService = networkService;
|
||||
this.version = version;
|
||||
|
||||
this.address = this.settings.get("discovery.zen.ping.multicast.address");
|
||||
this.port = this.settings.getAsInt("discovery.zen.ping.multicast.port", 54328);
|
||||
this.group = this.settings.get("discovery.zen.ping.multicast.group", "224.2.2.4");
|
||||
this.bufferSize = this.settings.getAsInt("discovery.zen.ping.multicast.buffer_size", 2048);
|
||||
this.ttl = this.settings.getAsInt("discovery.zen.ping.multicast.ttl", 3);
|
||||
|
||||
this.pingEnabled = this.settings.getAsBoolean("discovery.zen.ping.multicast.ping.enabled", true);
|
||||
this.address = ADDRESS_SETTING.exists(settings) ? ADDRESS_SETTING.get(settings) : null;
|
||||
this.port = PORT_SETTING.get(settings);
|
||||
this.group = GROUP_SETTING.get(settings);
|
||||
this.bufferSize = BUFFER_SIZE_SETTING.get(settings).bytesAsInt();
|
||||
this.ttl = TTL_SETTING.get(settings);
|
||||
this.pingEnabled = PING_ENABLED_SETTING.get(settings);
|
||||
|
||||
logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address);
|
||||
|
||||
|
@ -142,9 +153,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
try {
|
||||
// we know OSX has bugs in the JVM when creating multiple instances of multicast sockets
|
||||
// causing for "socket close" exceptions when receive and/or crashes
|
||||
boolean shared = settings.getAsBoolean("discovery.zen.ping.multicast.shared", Constants.MAC_OS_X);
|
||||
boolean shared = SHARED_SETTING.get(settings);
|
||||
// OSX does not correctly send multicasts FROM the right interface
|
||||
boolean deferToInterface = settings.getAsBoolean("discovery.zen.ping.multicast.defer_group_to_set_interface", Constants.MAC_OS_X);
|
||||
boolean deferToInterface = DEFERE_TO_INTERFACE_SETTING.get(settings);
|
||||
|
||||
final MulticastChannel.Config config = new MulticastChannel.Config(port, group, bufferSize, ttl,
|
||||
getMulticastInterface(), deferToInterface);
|
||||
|
|
Loading…
Reference in New Issue