Exhibitor Support (#3664)

* allow JsonConfigTesterBase to treat the fields of collections

* [Feature] Exhibitor Support (#3664)

This patch provides the integration of Druid & Netflix Exhibitor. Druid
currently use Apache Curator as ZooKeeper client. Curator can be
integrated with Exhibitor to achieve a live/updating list of the
ZooKeeper ensemble. This patch enables Druid to use this features.
This commit is contained in:
Yuusaku Taniguchi 2017-01-03 02:15:36 +09:00 committed by Charles Allen
parent 49d71e9b38
commit 02519d5b64
7 changed files with 520 additions and 22 deletions

View File

@ -75,6 +75,23 @@ The following path is used for service discovery. It is **not** affected by `dru
|--------|-----------|-------| |--------|-----------|-------|
|`druid.discovery.curator.path`|Services announce themselves under this ZooKeeper path.|`/druid/discovery`| |`druid.discovery.curator.path`|Services announce themselves under this ZooKeeper path.|`/druid/discovery`|
### Exhibitor
[Exhibitor](https://github.com/Netflix/exhibitor/wiki) is a supervisor system for ZooKeeper.
Exhibitor can dynamically scale-up/down the cluster of ZooKeeper servers.
Druid can update self-owned list of ZooKeeper servers through Exhibitor without restarting.
That is, it allows Druid to keep the connections of Exhibitor-supervised ZooKeeper servers.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.exhibitor.service.hosts`|A JSON array which contains the hostnames of Exhibitor instances. Please specify this property if you want to use Exhibitor-supervised cluster.|none|
|`druid.exhibitor.service.port`|The REST port used to connect to Exhibitor.|`8080`|
|`druid.exhibitor.service.restUriPath`|The path of the REST call used to get the server set.|`/exhibitor/v1/cluster/list`|
|`druid.exhibitor.service.useSsl`|Boolean flag for whether or not to use https protocol.|`false`|
|`druid.exhibitor.service.pollingMs`|How ofter to poll the exhibitors for the list|`10000`|
Note that `druid.zk.service.host` is used as a backup in case an Exhibitor instance can't be contacted and therefore should still be set.
### Startup Logging ### Startup Logging
All nodes can log debugging information on startup. All nodes can log debugging information on startup.

View File

@ -23,48 +23,62 @@ import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.exhibitor.Exhibitors;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.util.List;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
/** /**
*/ */
public class CuratorModule implements Module public class CuratorModule implements Module
{ {
static final String CURATOR_CONFIG_PREFIX = "druid.zk.service";
static final String EXHIBITOR_CONFIG_PREFIX = "druid.exhibitor.service";
private static final int BASE_SLEEP_TIME_MS = 1000;
private static final int MAX_SLEEP_TIME_MS = 45000;
private static final int MAX_RETRIES = 30;
private static final Logger log = new Logger(CuratorModule.class); private static final Logger log = new Logger(CuratorModule.class);
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind( JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class);
binder, "druid.zk.service", JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class);
CuratorConfig.class
);
} }
@Provides @Provides
@LazySingleton @LazySingleton
public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException public CuratorFramework makeCurator(
CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle
) throws IOException
{ {
final CuratorFramework framework = final CuratorFramework framework =
CuratorFrameworkFactory.builder() CuratorFrameworkFactory.builder()
.connectString(config.getZkHosts()) .ensembleProvider(ensembleProvider)
.sessionTimeoutMs(config.getZkSessionTimeoutMs()) .sessionTimeoutMs(config.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) .retryPolicy(new BoundedExponentialBackoffRetry(
BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES))
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression())) .compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
.aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider()) .aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
.build(); .build();
@ -91,6 +105,48 @@ public class CuratorModule implements Module
return framework; return framework;
} }
@Provides
@LazySingleton
public EnsembleProvider makeEnsembleProvider(CuratorConfig config, ExhibitorConfig exConfig)
{
if (exConfig.getHosts().isEmpty()) {
return new FixedEnsembleProvider(config.getZkHosts());
}
return new ExhibitorEnsembleProvider(
new Exhibitors(
exConfig.getHosts(),
exConfig.getRestPort(),
newBackupProvider(config.getZkHosts())
),
new DefaultExhibitorRestClient(exConfig.getUseSsl()),
exConfig.getRestUriPath(),
exConfig.getPollingMs(),
new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES)
)
{
@Override
public void start() throws Exception
{
log.info("Poll the list of zookeeper servers for initial ensemble");
this.pollForInitialEnsemble();
super.start();
}
};
}
private Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts)
{
return new Exhibitors.BackupConnectionStringProvider()
{
@Override
public String getBackupConnectionString() throws Exception
{
return zkHosts;
}
};
}
class SecuredACLProvider implements ACLProvider class SecuredACLProvider implements ACLProvider
{ {
@Override @Override

View File

@ -0,0 +1,77 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
/**
*/
public class ExhibitorConfig
{
@JsonProperty
private List<String> hosts = new ArrayList<>();
@JsonProperty("port")
@Min(0)
@Max(0xffff)
private int restPort = 8080;
@JsonProperty
private String restUriPath = "/exhibitor/v1/cluster/list";
@JsonProperty
private boolean useSsl = false;
@JsonProperty
@Min(0)
private int pollingMs = 10000;
public List<String> getHosts()
{
return hosts;
}
public int getRestPort()
{
return restPort;
}
public String getRestUriPath()
{
return restUriPath;
}
public boolean getUseSsl()
{
return useSsl;
}
public int getPollingMs()
{
return pollingMs;
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Properties;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.LifecycleModule;
/**
*/
public final class CuratorModuleTest
{
private static final String curatorHostKey = CuratorModule.CURATOR_CONFIG_PREFIX + ".host";
private static final String exhibitorHostsKey = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts";
@Test
public void defaultEnsembleProvider() throws NoSuchFieldException, IllegalAccessException
{
Injector injector = newInjector(new Properties());
injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be FixedEnsembleProvider",
ensembleProvider instanceof FixedEnsembleProvider
);
Assert.assertEquals(
"The connectionString should be 'localhost'",
"localhost", ensembleProvider.getConnectionString()
);
}
@Test
public void fixedZkHosts()
{
Properties props = new Properties();
props.put(curatorHostKey, "hostA");
Injector injector = newInjector(props);
injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be FixedEnsembleProvider",
ensembleProvider instanceof FixedEnsembleProvider
);
Assert.assertEquals(
"The connectionString should be 'hostA'",
"hostA", ensembleProvider.getConnectionString()
);
}
@Test
public void exhibitorEnsembleProvider()
{
Properties props = new Properties();
props.put(curatorHostKey, "hostA");
props.put(exhibitorHostsKey, "[\"hostB\"]");
Injector injector = newInjector(props);
injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be ExhibitorEnsembleProvider",
ensembleProvider instanceof ExhibitorEnsembleProvider
);
}
@Test
public void emptyExhibitorHosts()
{
Properties props = new Properties();
props.put(curatorHostKey, "hostB");
props.put(exhibitorHostsKey, "[]");
Injector injector = newInjector(props);
injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be FixedEnsembleProvider",
ensembleProvider instanceof FixedEnsembleProvider
);
Assert.assertEquals(
"The connectionString should be 'hostB'",
"hostB", ensembleProvider.getConnectionString()
);
}
private Injector newInjector(final Properties props)
{
List<Module> modules = ImmutableList.<Module>builder()
.addAll(GuiceInjectors.makeDefaultStartupModules())
.add(new LifecycleModule()).add(new CuratorModule()).build();
return Guice.createInjector(
Modules.override(modules).with(new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(Properties.class).toInstance(props);
}
})
);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator;
import org.junit.Assert;
import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Properties;
import io.druid.guice.JsonConfigTesterBase;
public class ExhibitorConfigTest extends JsonConfigTesterBase<ExhibitorConfig>
{
@Test
public void testSerde()
throws IllegalAccessException, NoSuchMethodException, InvocationTargetException
{
propertyValues.put(getPropertyKey("hosts"), "[\"hostA\",\"hostB\"]");
propertyValues.put(getPropertyKey("port"), "80");
propertyValues.put(getPropertyKey("restUriPath"), "/list");
propertyValues.put(getPropertyKey("useSsl"), "true");
propertyValues.put(getPropertyKey("pollingMs"), "1000");
testProperties.putAll(propertyValues);
configProvider.inject(testProperties, configurator);
ExhibitorConfig config = configProvider.get().get();
List<String> hosts = config.getHosts();
Assert.assertEquals(2, hosts.size());
Assert.assertTrue(hosts.contains("hostA"));
Assert.assertTrue(hosts.contains("hostB"));
Assert.assertEquals(80, config.getRestPort());
Assert.assertEquals("/list", config.getRestUriPath());
Assert.assertTrue(config.getUseSsl());
Assert.assertEquals(1000, config.getPollingMs());
}
@Test
public void defaultValues()
{
configProvider.inject(new Properties(), configurator);
ExhibitorConfig config = configProvider.get().get();
List<String> hosts = config.getHosts();
Assert.assertTrue(hosts.isEmpty());
Assert.assertEquals(8080, config.getRestPort());
Assert.assertEquals("/exhibitor/v1/cluster/list", config.getRestUriPath());
Assert.assertFalse(config.getUseSsl());
Assert.assertEquals(10000, config.getPollingMs());
}
}

View File

@ -35,6 +35,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -125,10 +126,15 @@ public abstract class JsonConfigTesterBase<T>
final String propertyKey = getPropertyKey(field); final String propertyKey = getPropertyKey(field);
if (null != propertyKey) { if (null != propertyKey) {
field.setAccessible(true); field.setAccessible(true);
if (field.getType().isAssignableFrom(String.class)) { Class<?> fieldType = field.getType();
if (String.class.isAssignableFrom(fieldType)) {
propertyValues.put(propertyKey, UUID.randomUUID().toString()); propertyValues.put(propertyKey, UUID.randomUUID().toString());
} else if (Collection.class.isAssignableFrom(fieldType)) {
propertyValues.put(propertyKey, "[]");
} else if (Map.class.isAssignableFrom(fieldType)) {
propertyValues.put(propertyKey, "{}");
} else { } else {
propertyValues.put(propertyKey, field.get(fakeValues).toString()); propertyValues.put(propertyKey, String.valueOf(field.get(fakeValues)));
} }
} }
} }

View File

@ -0,0 +1,134 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*
*/
public final class JsonConfigTesterBaseTest
extends JsonConfigTesterBase<JsonConfigTesterBaseTest.Sample>
{
@Test
public void defaultTestPropertiesForSample()
{
Assert.assertEquals("0", testProperties.getProperty("druid.test.prefix.primitiveInt"));
Assert.assertEquals("false", testProperties.getProperty("druid.test.prefix.primitiveBoolean"));
Assert.assertTrue(!testProperties.getProperty("druid.test.prefix.text").isEmpty());
Assert.assertEquals("[]", testProperties.getProperty("druid.test.prefix.list"));
Assert.assertEquals("[]", testProperties.getProperty("druid.test.prefix.set"));
Assert.assertEquals("{}", testProperties.getProperty("druid.test.prefix.map"));
for (Map.Entry entry : System.getProperties().entrySet()) {
Assert.assertEquals(entry.getValue(), testProperties.get(entry.getKey()));
}
}
@Test
public void injectFieldValues()
{
propertyValues.put(getPropertyKey("primitiveInt"), "1");
propertyValues.put(getPropertyKey("primitiveBoolean"), "true");
propertyValues.put(getPropertyKey("text"), "foo");
propertyValues.put(getPropertyKey("list"), "[\"one\",\"two\"]");
propertyValues.put(getPropertyKey("set"), "[\"three\",\"four\"]");
propertyValues.put(getPropertyKey("map"), "{\"k1\": \"v1\", \"k2\": \"v2\"}");
testProperties.putAll(propertyValues);
configProvider.inject(testProperties, configurator);
Sample results = configProvider.get().get();
Assert.assertEquals(1, results.getPrimitiveInt());
Assert.assertTrue(results.getPrimitiveBoolean());
Assert.assertEquals("foo", results.getText());
List<String> list = results.getList();
Assert.assertEquals(2, list.size());
Assert.assertEquals("one", list.get(0));
Assert.assertEquals("two", list.get(1));
Set<String> set = results.getSet();
Assert.assertEquals(2, set.size());
Assert.assertTrue(set.contains("three"));
Assert.assertTrue(set.contains("four"));
Map<String, String> map = results.getMap();
Assert.assertEquals(2, map.size());
Assert.assertEquals("v1", map.get("k1"));
Assert.assertEquals("v2", map.get("k2"));
}
public static class Sample
{
@JsonProperty
private int primitiveInt;
@JsonProperty
private boolean primitiveBoolean;
@JsonProperty
private String text;
@JsonProperty
private List<String> list;
@JsonProperty
private Set<String> set;
@JsonProperty
private Map<String, String> map;
public int getPrimitiveInt()
{
return primitiveInt;
}
public boolean getPrimitiveBoolean()
{
return primitiveBoolean;
}
public String getText()
{
return text;
}
public List<String> getList()
{
return list;
}
public Set<String> getSet()
{
return set;
}
public Map<String, String> getMap()
{
return map;
}
}
}