From 02519d5b64f3e09f2c4909fd63e6909df1924b80 Mon Sep 17 00:00:00 2001 From: Yuusaku Taniguchi Date: Tue, 3 Jan 2017 02:15:36 +0900 Subject: [PATCH] Exhibitor Support (#3664) * allow JsonConfigTesterBase to treat the fields of collections * [Feature] Exhibitor Support (#3664) This patch provides the integration of Druid & Netflix Exhibitor. Druid currently use Apache Curator as ZooKeeper client. Curator can be integrated with Exhibitor to achieve a live/updating list of the ZooKeeper ensemble. This patch enables Druid to use this features. --- docs/content/configuration/index.md | 17 +++ .../java/io/druid/curator/CuratorModule.java | 96 +++++++++--- .../io/druid/curator/ExhibitorConfig.java | 77 ++++++++++ .../io/druid/curator/CuratorModuleTest.java | 139 ++++++++++++++++++ .../io/druid/curator/ExhibitorConfigTest.java | 69 +++++++++ .../io/druid/guice/JsonConfigTesterBase.java | 10 +- .../druid/guice/JsonConfigTesterBaseTest.java | 134 +++++++++++++++++ 7 files changed, 520 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/io/druid/curator/ExhibitorConfig.java create mode 100644 server/src/test/java/io/druid/curator/CuratorModuleTest.java create mode 100644 server/src/test/java/io/druid/curator/ExhibitorConfigTest.java create mode 100644 server/src/test/java/io/druid/guice/JsonConfigTesterBaseTest.java diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 08091bf5076..0ec2c1b99ad 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -75,6 +75,23 @@ The following path is used for service discovery. It is **not** affected by `dru |--------|-----------|-------| |`druid.discovery.curator.path`|Services announce themselves under this ZooKeeper path.|`/druid/discovery`| +### Exhibitor + +[Exhibitor](https://github.com/Netflix/exhibitor/wiki) is a supervisor system for ZooKeeper. +Exhibitor can dynamically scale-up/down the cluster of ZooKeeper servers. +Druid can update self-owned list of ZooKeeper servers through Exhibitor without restarting. +That is, it allows Druid to keep the connections of Exhibitor-supervised ZooKeeper servers. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.exhibitor.service.hosts`|A JSON array which contains the hostnames of Exhibitor instances. Please specify this property if you want to use Exhibitor-supervised cluster.|none| +|`druid.exhibitor.service.port`|The REST port used to connect to Exhibitor.|`8080`| +|`druid.exhibitor.service.restUriPath`|The path of the REST call used to get the server set.|`/exhibitor/v1/cluster/list`| +|`druid.exhibitor.service.useSsl`|Boolean flag for whether or not to use https protocol.|`false`| +|`druid.exhibitor.service.pollingMs`|How ofter to poll the exhibitors for the list|`10000`| + +Note that `druid.zk.service.host` is used as a backup in case an Exhibitor instance can't be contacted and therefore should still be set. + ### Startup Logging All nodes can log debugging information on startup. diff --git a/server/src/main/java/io/druid/curator/CuratorModule.java b/server/src/main/java/io/druid/curator/CuratorModule.java index 8f836cbc9ae..19db9df6dfd 100644 --- a/server/src/main/java/io/druid/curator/CuratorModule.java +++ b/server/src/main/java/io/druid/curator/CuratorModule.java @@ -23,48 +23,62 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.DefaultACLProvider; +import org.apache.curator.retry.BoundedExponentialBackoffRetry; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import java.io.IOException; +import java.util.List; + import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.java.util.common.logger.Logger; -import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.imps.DefaultACLProvider; -import org.apache.curator.retry.BoundedExponentialBackoffRetry; - -import java.io.IOException; - -import java.util.List; - -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; - /** */ public class CuratorModule implements Module { + static final String CURATOR_CONFIG_PREFIX = "druid.zk.service"; + + static final String EXHIBITOR_CONFIG_PREFIX = "druid.exhibitor.service"; + + private static final int BASE_SLEEP_TIME_MS = 1000; + + private static final int MAX_SLEEP_TIME_MS = 45000; + + private static final int MAX_RETRIES = 30; + private static final Logger log = new Logger(CuratorModule.class); @Override public void configure(Binder binder) { - JsonConfigProvider.bind( - binder, "druid.zk.service", - CuratorConfig.class - ); + JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class); + JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class); } @Provides @LazySingleton - public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException + public CuratorFramework makeCurator( + CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle + ) throws IOException { final CuratorFramework framework = CuratorFrameworkFactory.builder() - .connectString(config.getZkHosts()) + .ensembleProvider(ensembleProvider) .sessionTimeoutMs(config.getZkSessionTimeoutMs()) - .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) + .retryPolicy(new BoundedExponentialBackoffRetry( + BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES)) .compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression())) .aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider()) .build(); @@ -91,6 +105,48 @@ public class CuratorModule implements Module return framework; } + @Provides + @LazySingleton + public EnsembleProvider makeEnsembleProvider(CuratorConfig config, ExhibitorConfig exConfig) + { + if (exConfig.getHosts().isEmpty()) { + return new FixedEnsembleProvider(config.getZkHosts()); + } + + return new ExhibitorEnsembleProvider( + new Exhibitors( + exConfig.getHosts(), + exConfig.getRestPort(), + newBackupProvider(config.getZkHosts()) + ), + new DefaultExhibitorRestClient(exConfig.getUseSsl()), + exConfig.getRestUriPath(), + exConfig.getPollingMs(), + new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES) + ) + { + @Override + public void start() throws Exception + { + log.info("Poll the list of zookeeper servers for initial ensemble"); + this.pollForInitialEnsemble(); + super.start(); + } + }; + } + + private Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts) + { + return new Exhibitors.BackupConnectionStringProvider() + { + @Override + public String getBackupConnectionString() throws Exception + { + return zkHosts; + } + }; + } + class SecuredACLProvider implements ACLProvider { @Override diff --git a/server/src/main/java/io/druid/curator/ExhibitorConfig.java b/server/src/main/java/io/druid/curator/ExhibitorConfig.java new file mode 100644 index 00000000000..f82910b5cc3 --- /dev/null +++ b/server/src/main/java/io/druid/curator/ExhibitorConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +/** + */ +public class ExhibitorConfig +{ + @JsonProperty + private List hosts = new ArrayList<>(); + + @JsonProperty("port") + @Min(0) + @Max(0xffff) + private int restPort = 8080; + + @JsonProperty + private String restUriPath = "/exhibitor/v1/cluster/list"; + + @JsonProperty + private boolean useSsl = false; + + @JsonProperty + @Min(0) + private int pollingMs = 10000; + + public List getHosts() + { + return hosts; + } + + public int getRestPort() + { + return restPort; + } + + public String getRestUriPath() + { + return restUriPath; + } + + public boolean getUseSsl() + { + return useSsl; + } + + public int getPollingMs() + { + return pollingMs; + } + +} diff --git a/server/src/test/java/io/druid/curator/CuratorModuleTest.java b/server/src/test/java/io/druid/curator/CuratorModuleTest.java new file mode 100644 index 00000000000..be7c531d9e2 --- /dev/null +++ b/server/src/test/java/io/druid/curator/CuratorModuleTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.util.Modules; + +import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Properties; + +import io.druid.guice.GuiceInjectors; +import io.druid.guice.LifecycleModule; + +/** + */ +public final class CuratorModuleTest +{ + + private static final String curatorHostKey = CuratorModule.CURATOR_CONFIG_PREFIX + ".host"; + + private static final String exhibitorHostsKey = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts"; + + @Test + public void defaultEnsembleProvider() throws NoSuchFieldException, IllegalAccessException + { + Injector injector = newInjector(new Properties()); + injector.getInstance(CuratorFramework.class); // initialize related components + EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class); + Assert.assertTrue( + "EnsembleProvider should be FixedEnsembleProvider", + ensembleProvider instanceof FixedEnsembleProvider + ); + Assert.assertEquals( + "The connectionString should be 'localhost'", + "localhost", ensembleProvider.getConnectionString() + ); + } + + @Test + public void fixedZkHosts() + { + Properties props = new Properties(); + props.put(curatorHostKey, "hostA"); + Injector injector = newInjector(props); + + injector.getInstance(CuratorFramework.class); // initialize related components + EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class); + Assert.assertTrue( + "EnsembleProvider should be FixedEnsembleProvider", + ensembleProvider instanceof FixedEnsembleProvider + ); + Assert.assertEquals( + "The connectionString should be 'hostA'", + "hostA", ensembleProvider.getConnectionString() + ); + } + + @Test + public void exhibitorEnsembleProvider() + { + Properties props = new Properties(); + props.put(curatorHostKey, "hostA"); + props.put(exhibitorHostsKey, "[\"hostB\"]"); + Injector injector = newInjector(props); + + injector.getInstance(CuratorFramework.class); // initialize related components + EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class); + Assert.assertTrue( + "EnsembleProvider should be ExhibitorEnsembleProvider", + ensembleProvider instanceof ExhibitorEnsembleProvider + ); + } + + @Test + public void emptyExhibitorHosts() + { + Properties props = new Properties(); + props.put(curatorHostKey, "hostB"); + props.put(exhibitorHostsKey, "[]"); + Injector injector = newInjector(props); + + injector.getInstance(CuratorFramework.class); // initialize related components + EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class); + Assert.assertTrue( + "EnsembleProvider should be FixedEnsembleProvider", + ensembleProvider instanceof FixedEnsembleProvider + ); + Assert.assertEquals( + "The connectionString should be 'hostB'", + "hostB", ensembleProvider.getConnectionString() + ); + } + + private Injector newInjector(final Properties props) + { + List modules = ImmutableList.builder() + .addAll(GuiceInjectors.makeDefaultStartupModules()) + .add(new LifecycleModule()).add(new CuratorModule()).build(); + return Guice.createInjector( + Modules.override(modules).with(new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(Properties.class).toInstance(props); + } + }) + ); + } + +} diff --git a/server/src/test/java/io/druid/curator/ExhibitorConfigTest.java b/server/src/test/java/io/druid/curator/ExhibitorConfigTest.java new file mode 100644 index 00000000000..ae729ccc216 --- /dev/null +++ b/server/src/test/java/io/druid/curator/ExhibitorConfigTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Properties; + +import io.druid.guice.JsonConfigTesterBase; + +public class ExhibitorConfigTest extends JsonConfigTesterBase +{ + @Test + public void testSerde() + throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + propertyValues.put(getPropertyKey("hosts"), "[\"hostA\",\"hostB\"]"); + propertyValues.put(getPropertyKey("port"), "80"); + propertyValues.put(getPropertyKey("restUriPath"), "/list"); + propertyValues.put(getPropertyKey("useSsl"), "true"); + propertyValues.put(getPropertyKey("pollingMs"), "1000"); + testProperties.putAll(propertyValues); + configProvider.inject(testProperties, configurator); + ExhibitorConfig config = configProvider.get().get(); + + List hosts = config.getHosts(); + Assert.assertEquals(2, hosts.size()); + Assert.assertTrue(hosts.contains("hostA")); + Assert.assertTrue(hosts.contains("hostB")); + Assert.assertEquals(80, config.getRestPort()); + Assert.assertEquals("/list", config.getRestUriPath()); + Assert.assertTrue(config.getUseSsl()); + Assert.assertEquals(1000, config.getPollingMs()); + } + + @Test + public void defaultValues() + { + configProvider.inject(new Properties(), configurator); + ExhibitorConfig config = configProvider.get().get(); + + List hosts = config.getHosts(); + Assert.assertTrue(hosts.isEmpty()); + Assert.assertEquals(8080, config.getRestPort()); + Assert.assertEquals("/exhibitor/v1/cluster/list", config.getRestUriPath()); + Assert.assertFalse(config.getUseSsl()); + Assert.assertEquals(10000, config.getPollingMs()); + } +} diff --git a/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java b/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java index 583834e2d43..73583540b64 100644 --- a/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java +++ b/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java @@ -35,6 +35,7 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -125,10 +126,15 @@ public abstract class JsonConfigTesterBase final String propertyKey = getPropertyKey(field); if (null != propertyKey) { field.setAccessible(true); - if (field.getType().isAssignableFrom(String.class)) { + Class fieldType = field.getType(); + if (String.class.isAssignableFrom(fieldType)) { propertyValues.put(propertyKey, UUID.randomUUID().toString()); + } else if (Collection.class.isAssignableFrom(fieldType)) { + propertyValues.put(propertyKey, "[]"); + } else if (Map.class.isAssignableFrom(fieldType)) { + propertyValues.put(propertyKey, "{}"); } else { - propertyValues.put(propertyKey, field.get(fakeValues).toString()); + propertyValues.put(propertyKey, String.valueOf(field.get(fakeValues))); } } } diff --git a/server/src/test/java/io/druid/guice/JsonConfigTesterBaseTest.java b/server/src/test/java/io/druid/guice/JsonConfigTesterBaseTest.java new file mode 100644 index 00000000000..607c07e3661 --- /dev/null +++ b/server/src/test/java/io/druid/guice/JsonConfigTesterBaseTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * + */ +public final class JsonConfigTesterBaseTest + extends JsonConfigTesterBase +{ + @Test + public void defaultTestPropertiesForSample() + { + Assert.assertEquals("0", testProperties.getProperty("druid.test.prefix.primitiveInt")); + Assert.assertEquals("false", testProperties.getProperty("druid.test.prefix.primitiveBoolean")); + Assert.assertTrue(!testProperties.getProperty("druid.test.prefix.text").isEmpty()); + Assert.assertEquals("[]", testProperties.getProperty("druid.test.prefix.list")); + Assert.assertEquals("[]", testProperties.getProperty("druid.test.prefix.set")); + Assert.assertEquals("{}", testProperties.getProperty("druid.test.prefix.map")); + for (Map.Entry entry : System.getProperties().entrySet()) { + Assert.assertEquals(entry.getValue(), testProperties.get(entry.getKey())); + } + } + + @Test + public void injectFieldValues() + { + propertyValues.put(getPropertyKey("primitiveInt"), "1"); + propertyValues.put(getPropertyKey("primitiveBoolean"), "true"); + propertyValues.put(getPropertyKey("text"), "foo"); + propertyValues.put(getPropertyKey("list"), "[\"one\",\"two\"]"); + propertyValues.put(getPropertyKey("set"), "[\"three\",\"four\"]"); + propertyValues.put(getPropertyKey("map"), "{\"k1\": \"v1\", \"k2\": \"v2\"}"); + testProperties.putAll(propertyValues); + configProvider.inject(testProperties, configurator); + Sample results = configProvider.get().get(); + + Assert.assertEquals(1, results.getPrimitiveInt()); + Assert.assertTrue(results.getPrimitiveBoolean()); + Assert.assertEquals("foo", results.getText()); + + List list = results.getList(); + Assert.assertEquals(2, list.size()); + Assert.assertEquals("one", list.get(0)); + Assert.assertEquals("two", list.get(1)); + + Set set = results.getSet(); + Assert.assertEquals(2, set.size()); + Assert.assertTrue(set.contains("three")); + Assert.assertTrue(set.contains("four")); + + Map map = results.getMap(); + Assert.assertEquals(2, map.size()); + Assert.assertEquals("v1", map.get("k1")); + Assert.assertEquals("v2", map.get("k2")); + } + + public static class Sample + { + @JsonProperty + private int primitiveInt; + + @JsonProperty + private boolean primitiveBoolean; + + @JsonProperty + private String text; + + @JsonProperty + private List list; + + @JsonProperty + private Set set; + + @JsonProperty + private Map map; + + public int getPrimitiveInt() + { + return primitiveInt; + } + + public boolean getPrimitiveBoolean() + { + return primitiveBoolean; + } + + public String getText() + { + return text; + } + + public List getList() + { + return list; + } + + public Set getSet() + { + return set; + } + + public Map getMap() + { + return map; + } + } +}