mirror of https://github.com/apache/druid.git
This commit contains the cleanup needed for the new integration test framework. Changes: - Fix log lines, misspellings, docs, etc. - Allow the use of some of Druid's "JSON config" objects in tests - Fix minor bug in `BaseNodeRoleWatcher`
This commit is contained in:
parent
3d9e3dbad9
commit
ffcb996468
|
@ -13,7 +13,7 @@ target
|
|||
.settings/
|
||||
.vscode
|
||||
*.log
|
||||
*.DS_Store
|
||||
.DS_Store
|
||||
_site
|
||||
dependency-reduced-pom.xml
|
||||
LICENSE.BINARY
|
||||
|
@ -21,7 +21,8 @@ NOTICE.BINARY
|
|||
README.BINARY
|
||||
README
|
||||
*.lock
|
||||
**/.pmd
|
||||
**/.pmdruleset.xml
|
||||
.pmd
|
||||
.pmdruleset.xml
|
||||
.java-version
|
||||
integration-tests/gen-scripts/
|
||||
bin/
|
||||
|
|
|
@ -34,8 +34,8 @@ env:
|
|||
- DOCKER_IP=127.0.0.1 # for integration tests
|
||||
- MVN="mvn -B"
|
||||
- > # Various options to make execution of maven goals faster (e.g., mvn install)
|
||||
MAVEN_SKIP="-Pskip-static-checks -Ddruid.console.skip=true -Dmaven.javadoc.skip=true"
|
||||
- MAVEN_SKIP_TESTS="-Pskip-tests"
|
||||
MAVEN_SKIP="-P skip-static-checks -Ddruid.console.skip=true -Dmaven.javadoc.skip=true"
|
||||
- MAVEN_SKIP_TESTS="-P skip-tests"
|
||||
|
||||
addons:
|
||||
apt:
|
||||
|
@ -46,7 +46,7 @@ addons:
|
|||
# Add various options to make 'mvn install' fast and skip javascript compile (-Ddruid.console.skip=true) since it is not
|
||||
# needed. Depending on network speeds, "mvn -q install" may take longer than the default 10 minute timeout to print any
|
||||
# output. To compensate, use travis_wait to extend the timeout.
|
||||
install: ./check_test_suite.py && travis_terminate 0 || echo 'Running maven install...' && MAVEN_OPTS='-Xmx3000m' travis_wait 15 ${MVN} clean install -q -ff -pl '!distribution' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} -T1C && ${MVN} install -q -ff -pl 'distribution' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS}
|
||||
install: ./check_test_suite.py && travis_terminate 0 || echo 'Running Maven install...' && MAVEN_OPTS='-Xmx3000m' travis_wait 15 ${MVN} clean install -q -ff -pl '!distribution' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} -T1C && ${MVN} install -q -ff -pl 'distribution' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS}
|
||||
|
||||
# There are 3 stages of tests
|
||||
# 1. Tests - phase 1
|
||||
|
|
|
@ -61,7 +61,7 @@ public class LifecycleModule implements Module
|
|||
* is materialized and injected, meaning that objects are not actually instantiated in dependency order.
|
||||
* Registering with the LifecyceModule, on the other hand, will instantiate the objects after the normal object
|
||||
* graph has already been instantiated, meaning that objects will be created in dependency order and this will
|
||||
* only actually instantiate something that wasn't actually dependend upon.
|
||||
* only actually instantiate something that wasn't actually depended upon.
|
||||
*
|
||||
* @param clazz the class to instantiate
|
||||
* @return this, for chaining.
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.metadata;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
|
@ -49,6 +50,30 @@ public class MetadataStorageConnectorConfig
|
|||
@JsonProperty("dbcp")
|
||||
private Properties dbcpProperties;
|
||||
|
||||
public static MetadataStorageConnectorConfig create(
|
||||
String connectUri,
|
||||
String user,
|
||||
String password,
|
||||
Map<String, Object> properties
|
||||
)
|
||||
{
|
||||
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
|
||||
if (connectUri != null) {
|
||||
config.connectURI = connectUri;
|
||||
}
|
||||
if (user != null) {
|
||||
config.user = user;
|
||||
}
|
||||
if (password != null) {
|
||||
config.passwordProvider = () -> password;
|
||||
}
|
||||
if (properties != null) {
|
||||
config.dbcpProperties = new Properties();
|
||||
config.dbcpProperties.putAll(properties);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isCreateTables()
|
||||
{
|
||||
|
|
|
@ -20,10 +20,12 @@
|
|||
package org.apache.druid.metadata;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class MetadataStorageConnectorConfigTest
|
||||
|
@ -187,4 +189,19 @@ public class MetadataStorageConnectorConfigTest
|
|||
Assert.assertEquals(dbcpProperties.getProperty("maxConnLifetimeMillis"), "1200000");
|
||||
Assert.assertEquals(dbcpProperties.getProperty("defaultQueryTimeout"), "30000");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreate()
|
||||
{
|
||||
Map<String, Object> props = ImmutableMap.of("key", "value");
|
||||
MetadataStorageConnectorConfig config = MetadataStorageConnectorConfig.create(
|
||||
"connectURI",
|
||||
"user",
|
||||
"pwd",
|
||||
props);
|
||||
Assert.assertEquals("connectURI", config.getConnectURI());
|
||||
Assert.assertEquals("user", config.getUser());
|
||||
Assert.assertEquals("pwd", config.getPassword());
|
||||
Assert.assertEquals(1, config.getDbcpProperties().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,11 @@
|
|||
#
|
||||
|
||||
# NOTE: this is a 'run' script for the stock tarball
|
||||
# It takes 1 required argument (the name of the service,
|
||||
# It takes one required argument (the name of the service,
|
||||
# e.g. 'broker', 'historical' etc). Any additional arguments
|
||||
# are passed to that service.
|
||||
#
|
||||
# It accepts 'JAVA_OPTS' as an environment variable
|
||||
# This script accepts JAVA_OPTS as an environment variable
|
||||
#
|
||||
# Additional env vars:
|
||||
# - DRUID_LOG4J -- set the entire log4j.xml verbatim
|
||||
|
|
|
@ -61,19 +61,24 @@ public class MySQLConnector extends SQLMetadataConnector
|
|||
)
|
||||
{
|
||||
super(config, dbTables);
|
||||
log.info("Loading \"MySQL\" metadata connector driver %s", driverConfig.getDriverClassName());
|
||||
tryLoadDriverClass(driverConfig.getDriverClassName(), true);
|
||||
this.dbi = createDBI(config.get(), driverConfig, connectorSslConfig, getValidationQuery());
|
||||
|
||||
if (driverConfig.getDriverClassName().contains("mysql")) {
|
||||
myTransientExceptionClass = tryLoadDriverClass(MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME, false);
|
||||
} else {
|
||||
myTransientExceptionClass = null;
|
||||
}
|
||||
}
|
||||
|
||||
final BasicDataSource datasource = getDatasource();
|
||||
public static DBI createDBI(MetadataStorageConnectorConfig config, MySQLConnectorDriverConfig driverConfig, MySQLConnectorSslConfig connectorSslConfig, String validationQuery)
|
||||
{
|
||||
log.info("Loading \"MySQL\" metadata connector driver %s", driverConfig.getDriverClassName());
|
||||
tryLoadDriverClass(driverConfig.getDriverClassName(), true);
|
||||
|
||||
final BasicDataSource datasource = makeDatasource(config, validationQuery);
|
||||
// MySQL driver is classloader isolated as part of the extension
|
||||
// so we need to help JDBC find the driver
|
||||
datasource.setDriverClassLoader(getClass().getClassLoader());
|
||||
datasource.setDriverClassLoader(MySQLConnector.class.getClassLoader());
|
||||
datasource.setDriverClassName(driverConfig.getDriverClassName());
|
||||
datasource.addConnectionProperty("useSSL", String.valueOf(connectorSslConfig.isUseSSL()));
|
||||
if (connectorSslConfig.isUseSSL()) {
|
||||
|
@ -141,9 +146,10 @@ public class MySQLConnector extends SQLMetadataConnector
|
|||
// use double-quotes for quoting columns, so we can write SQL that works with most databases
|
||||
datasource.setConnectionInitSqls(ImmutableList.of("SET sql_mode='ANSI_QUOTES'"));
|
||||
|
||||
this.dbi = new DBI(datasource);
|
||||
DBI dbi = new DBI(datasource);
|
||||
|
||||
log.info("Configured MySQL as metadata storage");
|
||||
return dbi;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -252,10 +258,10 @@ public class MySQLConnector extends SQLMetadataConnector
|
|||
}
|
||||
|
||||
@Nullable
|
||||
private Class<?> tryLoadDriverClass(String className, boolean failIfNotFound)
|
||||
private static Class<?> tryLoadDriverClass(String className, boolean failIfNotFound)
|
||||
{
|
||||
try {
|
||||
return Class.forName(className, false, getClass().getClassLoader());
|
||||
return Class.forName(className, false, MySQLConnector.class.getClassLoader());
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
if (failIfNotFound) {
|
||||
|
|
|
@ -20,13 +20,25 @@
|
|||
package org.apache.druid.metadata.storage.mysql;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class MySQLConnectorDriverConfig
|
||||
{
|
||||
public static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
|
||||
|
||||
@JsonProperty
|
||||
private String driverClassName = "com.mysql.jdbc.Driver";
|
||||
private String driverClassName = MYSQL_DRIVER;
|
||||
|
||||
public static MySQLConnectorDriverConfig create(String driverClassName)
|
||||
{
|
||||
MySQLConnectorDriverConfig config = new MySQLConnectorDriverConfig();
|
||||
if (!Strings.isNullOrEmpty(driverClassName)) {
|
||||
config.driverClassName = driverClassName;
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDriverClassName()
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.druid.metadata.storage.mysql;
|
|||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class MySQLConnectorDriverConfigTest
|
||||
{
|
||||
@Test
|
||||
|
@ -33,4 +35,13 @@ public class MySQLConnectorDriverConfigTest
|
|||
.withNonnullFields("driverClassName")
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreate()
|
||||
{
|
||||
MySQLConnectorDriverConfig config = MySQLConnectorDriverConfig.create(null);
|
||||
assertEquals(MySQLConnectorDriverConfig.MYSQL_DRIVER, config.getDriverClassName());
|
||||
config = MySQLConnectorDriverConfig.create("myDriver");
|
||||
assertEquals("myDriver", config.getDriverClassName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,20 +36,20 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
|
|||
@JacksonInject
|
||||
@NotNull
|
||||
@EscalatedClient
|
||||
HttpClient httpClient = null;
|
||||
HttpClient httpClient;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
@Smile
|
||||
ObjectMapper smileMapper = null;
|
||||
ObjectMapper smileMapper;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
HttpServerInventoryViewConfig config = null;
|
||||
HttpServerInventoryViewConfig config;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null;
|
||||
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
|
||||
|
||||
@Override
|
||||
public HttpServerInventoryView get()
|
||||
|
|
|
@ -28,6 +28,8 @@ import javax.validation.constraints.Min;
|
|||
|
||||
public class CuratorConfig
|
||||
{
|
||||
public static final String CONFIG_PREFIX = "druid.zk.service";
|
||||
|
||||
static final String HOST = "host";
|
||||
@JsonProperty(HOST)
|
||||
private String zkHosts = "localhost";
|
||||
|
@ -56,6 +58,13 @@ public class CuratorConfig
|
|||
@JsonProperty("authScheme")
|
||||
private String authScheme = "digest";
|
||||
|
||||
public static CuratorConfig create(String hosts)
|
||||
{
|
||||
CuratorConfig config = new CuratorConfig();
|
||||
config.zkHosts = hosts;
|
||||
return config;
|
||||
}
|
||||
|
||||
public String getZkHosts()
|
||||
{
|
||||
return zkHosts;
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.curator;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.ensemble.EnsembleProvider;
|
||||
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
|
||||
|
@ -48,35 +47,26 @@ import java.util.List;
|
|||
|
||||
public class CuratorModule implements Module
|
||||
{
|
||||
static final String CURATOR_CONFIG_PREFIX = "druid.zk.service";
|
||||
|
||||
static final String EXHIBITOR_CONFIG_PREFIX = "druid.exhibitor.service";
|
||||
private static final Logger log = new Logger(CuratorModule.class);
|
||||
|
||||
private static final int BASE_SLEEP_TIME_MS = 1000;
|
||||
|
||||
private static final int MAX_SLEEP_TIME_MS = 45000;
|
||||
|
||||
private static final int MAX_RETRIES = 29;
|
||||
|
||||
private static final Logger log = new Logger(CuratorModule.class);
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, ZkEnablementConfig.class);
|
||||
JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class);
|
||||
JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class);
|
||||
JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, ZkEnablementConfig.class);
|
||||
JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, CuratorConfig.class);
|
||||
JsonConfigProvider.bind(binder, ExhibitorConfig.CONFIG_PREFIX, ExhibitorConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
@SuppressForbidden(reason = "System#err")
|
||||
public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
|
||||
/**
|
||||
* Create the Curator framework outside of Guice given the ZK config.
|
||||
* Primarily for tests.
|
||||
*/
|
||||
public static CuratorFramework createCurator(CuratorConfig config, EnsembleProvider ensembleProvider)
|
||||
{
|
||||
if (!zkEnablementConfig.isEnabled()) {
|
||||
throw new RuntimeException("Zookeeper is disabled, Can't create CuratorFramework.");
|
||||
}
|
||||
|
||||
final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
|
||||
if (!Strings.isNullOrEmpty(config.getZkUser()) && !Strings.isNullOrEmpty(config.getZkPwd())) {
|
||||
builder.authorization(
|
||||
|
@ -87,7 +77,7 @@ public class CuratorModule implements Module
|
|||
|
||||
RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||
|
||||
final CuratorFramework framework = builder
|
||||
return builder
|
||||
.ensembleProvider(ensembleProvider)
|
||||
.sessionTimeoutMs(config.getZkSessionTimeoutMs())
|
||||
.connectionTimeoutMs(config.getZkConnectionTimeoutMs())
|
||||
|
@ -95,6 +85,20 @@ public class CuratorModule implements Module
|
|||
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
|
||||
.aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide the Curator framework via Guice, integrated with the Druid lifecycle.
|
||||
*/
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
|
||||
{
|
||||
if (!zkEnablementConfig.isEnabled()) {
|
||||
throw new RuntimeException("Zookeeper is disabled, cannot create CuratorFramework.");
|
||||
}
|
||||
|
||||
final CuratorFramework framework = createCurator(config, ensembleProvider);
|
||||
|
||||
framework.getUnhandledErrorListenable().addListener((message, e) -> {
|
||||
log.error(e, "Unhandled error in Curator, stopping server.");
|
||||
|
@ -123,9 +127,11 @@ public class CuratorModule implements Module
|
|||
return framework;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public EnsembleProvider makeEnsembleProvider(CuratorConfig config, ExhibitorConfig exConfig)
|
||||
/**
|
||||
* Create an EnsembleProvider given the related configurations. Primarily for tests
|
||||
* which do not use Guice to do the work.
|
||||
*/
|
||||
public static EnsembleProvider createEnsembleProvider(CuratorConfig config, ExhibitorConfig exConfig)
|
||||
{
|
||||
if (exConfig.getHosts().isEmpty()) {
|
||||
return new FixedEnsembleProvider(config.getZkHosts());
|
||||
|
@ -155,7 +161,17 @@ public class CuratorModule implements Module
|
|||
};
|
||||
}
|
||||
|
||||
private Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts)
|
||||
/**
|
||||
* Provide an EnsembleProvider via Guice configuration.
|
||||
*/
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public EnsembleProvider makeEnsembleProvider(CuratorConfig config, ExhibitorConfig exConfig)
|
||||
{
|
||||
return createEnsembleProvider(config, exConfig);
|
||||
}
|
||||
|
||||
private static Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts)
|
||||
{
|
||||
return () -> zkHosts;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.util.List;
|
|||
*/
|
||||
public class ExhibitorConfig
|
||||
{
|
||||
static final String CONFIG_PREFIX = "druid.exhibitor.service";
|
||||
|
||||
@JsonProperty
|
||||
private List<String> hosts = new ArrayList<>();
|
||||
|
||||
|
@ -48,6 +50,15 @@ public class ExhibitorConfig
|
|||
@Min(0)
|
||||
private int pollingMs = 10000;
|
||||
|
||||
public static ExhibitorConfig create(List<String> hosts)
|
||||
{
|
||||
ExhibitorConfig config = new ExhibitorConfig();
|
||||
if (hosts != null) {
|
||||
config.hosts = hosts;
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
public List<String> getHosts()
|
||||
{
|
||||
return hosts;
|
||||
|
@ -72,5 +83,4 @@ public class ExhibitorConfig
|
|||
{
|
||||
return pollingMs;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.Properties;
|
|||
|
||||
public class ZkEnablementConfig
|
||||
{
|
||||
private static final String PROP_KEY_ENABLED = StringUtils.format("%s.enabled", CuratorModule.CURATOR_CONFIG_PREFIX);
|
||||
private static final String PROP_KEY_ENABLED = StringUtils.format("%s.enabled", CuratorConfig.CONFIG_PREFIX);
|
||||
|
||||
public static final ZkEnablementConfig ENABLED = new ZkEnablementConfig(true);
|
||||
|
||||
|
|
|
@ -65,18 +65,10 @@ public class PathChildrenCacheFactory
|
|||
{
|
||||
private static final ThreadFactory DEFAULT_THREAD_FACTORY = ThreadUtils.newThreadFactory("PathChildrenCache");
|
||||
|
||||
private boolean cacheData;
|
||||
private boolean compressed;
|
||||
private boolean cacheData = true;
|
||||
private boolean compressed = false;
|
||||
private ExecutorService exec;
|
||||
private boolean shutdownExecutorOnClose;
|
||||
|
||||
public Builder()
|
||||
{
|
||||
cacheData = true;
|
||||
compressed = false;
|
||||
exec = null;
|
||||
shutdownExecutorOnClose = true;
|
||||
}
|
||||
private boolean shutdownExecutorOnClose = true;
|
||||
|
||||
public Builder withCacheData(boolean cacheData)
|
||||
{
|
||||
|
|
|
@ -246,12 +246,12 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
break;
|
||||
}
|
||||
default: {
|
||||
log.warn("Ignored event type[%s] for node watcher of role[%s].", event.getType(), nodeRole.getJsonName());
|
||||
log.warn("Ignored event type [%s] for node watcher of role [%s].", event.getType(), nodeRole.getJsonName());
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Unknown error in node watcher of role[%s].", nodeRole.getJsonName());
|
||||
log.error(ex, "Unknown error in node watcher of role [%s].", nodeRole.getJsonName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ public class BaseNodeRoleWatcher
|
|||
synchronized (lock) {
|
||||
if (!nodeRole.equals(druidNode.getNodeRole())) {
|
||||
LOGGER.error(
|
||||
"Node[%s] of role[%s] addition ignored due to mismatched role (expected role[%s]).",
|
||||
"Node [%s] of role [%s] addition ignored due to mismatched role (expected role [%s]).",
|
||||
druidNode.getDruidNode().getUriToUse(),
|
||||
druidNode.getNodeRole().getJsonName(),
|
||||
nodeRole.getJsonName()
|
||||
|
@ -128,7 +128,7 @@ public class BaseNodeRoleWatcher
|
|||
return;
|
||||
}
|
||||
|
||||
LOGGER.info("Node[%s] of role[%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
|
||||
LOGGER.info("Node [%s] of role [%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
|
||||
|
||||
addNode(druidNode);
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ public class BaseNodeRoleWatcher
|
|||
}
|
||||
} else {
|
||||
LOGGER.error(
|
||||
"Node[%s] of role[%s] discovered but existed already [%s].",
|
||||
"Node [%s] of role [%s] discovered but existed already [%s].",
|
||||
druidNode.getDruidNode().getUriToUse(),
|
||||
nodeRole.getJsonName(),
|
||||
prev
|
||||
|
@ -166,7 +166,7 @@ public class BaseNodeRoleWatcher
|
|||
synchronized (lock) {
|
||||
if (!nodeRole.equals(druidNode.getNodeRole())) {
|
||||
LOGGER.error(
|
||||
"Node[%s] of role[%s] removal ignored due to mismatched role (expected role[%s]).",
|
||||
"Node [%s] of role [%s] removal ignored due to mismatched role (expected role [%s]).",
|
||||
druidNode.getDruidNode().getUriToUse(),
|
||||
druidNode.getNodeRole().getJsonName(),
|
||||
nodeRole.getJsonName()
|
||||
|
@ -174,7 +174,7 @@ public class BaseNodeRoleWatcher
|
|||
return;
|
||||
}
|
||||
|
||||
LOGGER.info("Node[%s] of role[%s] went offline.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
|
||||
LOGGER.info("Node [%s] of role [%s] went offline.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
|
||||
|
||||
removeNode(druidNode);
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ public class BaseNodeRoleWatcher
|
|||
|
||||
if (prev == null) {
|
||||
LOGGER.error(
|
||||
"Noticed disappearance of unknown druid node [%s] of role[%s].",
|
||||
"Noticed disappearance of unknown druid node [%s] of role [%s].",
|
||||
druidNode.getDruidNode().getUriToUse(),
|
||||
druidNode.getNodeRole().getJsonName()
|
||||
);
|
||||
|
@ -200,7 +200,7 @@ public class BaseNodeRoleWatcher
|
|||
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
|
||||
safeSchedule(
|
||||
() -> listener.nodesRemoved(nodeRemoved),
|
||||
"Exception occured in nodeRemoved(node[%s] of role[%s]) in listener [%s].",
|
||||
"Exception occured in nodeRemoved(node [%s] of role [%s]) in listener [%s].",
|
||||
druidNode.getDruidNode().getUriToUse(),
|
||||
druidNode.getNodeRole().getJsonName(),
|
||||
listener
|
||||
|
@ -219,12 +219,15 @@ public class BaseNodeRoleWatcher
|
|||
return;
|
||||
}
|
||||
|
||||
LOGGER.info("Node watcher of role[%s] is now initialized.", nodeRole.getJsonName());
|
||||
// It is important to take a snapshot here as list of nodes might change by the time listeners process
|
||||
// the changes.
|
||||
List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
|
||||
LOGGER.info(
|
||||
"Node watcher of role [%s] is now initialized with %d nodes.",
|
||||
nodeRole.getJsonName(),
|
||||
currNodes.size());
|
||||
|
||||
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
|
||||
// It is important to take a snapshot here as list of nodes might change by the time listeners process
|
||||
// the changes.
|
||||
List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
|
||||
safeSchedule(
|
||||
() -> {
|
||||
listener.nodesAdded(currNodes);
|
||||
|
|
|
@ -51,7 +51,7 @@ public class DiscoveryDruidNode
|
|||
|
||||
/**
|
||||
* Map of service name -> DruidServices.
|
||||
* This map has only the DruidServices that is understandable.
|
||||
* This map has only the DruidServices that are understandable.
|
||||
* It means, if there is some DruidService not understandable found while converting rawServices to services,
|
||||
* that DruidService will be ignored and not stored in this map.
|
||||
*
|
||||
|
|
|
@ -449,7 +449,7 @@ public class Initialization
|
|||
return Guice.createInjector(Modules.override(intermediateModules).with(extensionModules.getModules()));
|
||||
}
|
||||
|
||||
private static class ModuleList
|
||||
public static class ModuleList
|
||||
{
|
||||
private final Injector baseInjector;
|
||||
private final Set<NodeRole> nodeRoles;
|
||||
|
@ -468,7 +468,7 @@ public class Initialization
|
|||
this.modules = new ArrayList<>();
|
||||
}
|
||||
|
||||
private List<Module> getModules()
|
||||
public List<Module> getModules()
|
||||
{
|
||||
return Collections.unmodifiableList(modules);
|
||||
}
|
||||
|
|
|
@ -193,14 +193,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
public Void withHandle(Handle handle)
|
||||
{
|
||||
if (!tableExists(handle, tableName)) {
|
||||
log.info("Creating table[%s]", tableName);
|
||||
log.info("Creating table [%s]", tableName);
|
||||
final Batch batch = handle.createBatch();
|
||||
for (String s : sql) {
|
||||
batch.add(s);
|
||||
}
|
||||
batch.execute();
|
||||
} else {
|
||||
log.info("Table[%s] already exists", tableName);
|
||||
log.info("Table [%s] already exists", tableName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -702,10 +702,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
return config.get();
|
||||
}
|
||||
|
||||
protected BasicDataSource getDatasource()
|
||||
protected static BasicDataSource makeDatasource(MetadataStorageConnectorConfig connectorConfig, String validationQuery)
|
||||
{
|
||||
MetadataStorageConnectorConfig connectorConfig = getConfig();
|
||||
|
||||
BasicDataSource dataSource;
|
||||
|
||||
try {
|
||||
|
@ -725,12 +723,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
String uri = connectorConfig.getConnectURI();
|
||||
dataSource.setUrl(uri);
|
||||
|
||||
dataSource.setValidationQuery(getValidationQuery());
|
||||
dataSource.setValidationQuery(validationQuery);
|
||||
dataSource.setTestOnBorrow(true);
|
||||
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
protected BasicDataSource getDatasource()
|
||||
{
|
||||
return makeDatasource(getConfig(), getValidationQuery());
|
||||
}
|
||||
|
||||
protected final <T> T inReadOnlyTransaction(
|
||||
final TransactionCallback<T> callback
|
||||
)
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.joda.time.Period;
|
|||
*/
|
||||
public class SegmentsMetadataManagerConfig
|
||||
{
|
||||
public static final String CONFIG_PREFIX = "druid.manager.segments";
|
||||
|
||||
@JsonProperty
|
||||
private Period pollDuration = new Period("PT1M");
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.druid.guice.JsonConfigTesterBase;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class CuratorConfigTest extends JsonConfigTesterBase<CuratorConfig>
|
||||
{
|
||||
@Test
|
||||
|
@ -42,4 +44,21 @@ public class CuratorConfigTest extends JsonConfigTesterBase<CuratorConfig>
|
|||
Assert.assertEquals("test-zk-pwd", config.getZkPwd());
|
||||
Assert.assertEquals("auth", config.getAuthScheme());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreate()
|
||||
{
|
||||
CuratorConfig config = CuratorConfig.create("foo:2181,bar:2181");
|
||||
Assert.assertEquals("foo:2181,bar:2181", config.getZkHosts());
|
||||
Assert.assertEquals(false, config.getEnableAcl());
|
||||
Assert.assertNull(config.getZkUser());
|
||||
Assert.assertEquals("digest", config.getAuthScheme());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExhibitorCreate()
|
||||
{
|
||||
ExhibitorConfig config = ExhibitorConfig.create(Arrays.asList("foo:2181", "bar:2181"));
|
||||
Assert.assertEquals(2, config.getHosts().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,10 +48,10 @@ import java.util.Properties;
|
|||
|
||||
public final class CuratorModuleTest
|
||||
{
|
||||
private static final String CURATOR_HOST_KEY = CuratorModule.CURATOR_CONFIG_PREFIX + "." + CuratorConfig.HOST;
|
||||
private static final String CURATOR_HOST_KEY = CuratorConfig.CONFIG_PREFIX + "." + CuratorConfig.HOST;
|
||||
private static final String CURATOR_CONNECTION_TIMEOUT_MS_KEY =
|
||||
CuratorModule.CURATOR_CONFIG_PREFIX + "." + CuratorConfig.CONNECTION_TIMEOUT_MS;
|
||||
private static final String EXHIBITOR_HOSTS_KEY = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts";
|
||||
CuratorConfig.CONFIG_PREFIX + "." + CuratorConfig.CONNECTION_TIMEOUT_MS;
|
||||
private static final String EXHIBITOR_HOSTS_KEY = ExhibitorConfig.CONFIG_PREFIX + ".hosts";
|
||||
|
||||
@Rule
|
||||
public final ExpectedSystemExit exit = ExpectedSystemExit.none();
|
||||
|
@ -164,7 +164,7 @@ public final class CuratorModuleTest
|
|||
public void ignoresDeprecatedCuratorConfigProperties()
|
||||
{
|
||||
Properties props = new Properties();
|
||||
String deprecatedPropName = CuratorModule.CURATOR_CONFIG_PREFIX + ".terminateDruidProcessOnConnectFail";
|
||||
String deprecatedPropName = CuratorConfig.CONFIG_PREFIX + ".terminateDruidProcessOnConnectFail";
|
||||
props.setProperty(deprecatedPropName, "true");
|
||||
Injector injector = newInjector(props);
|
||||
|
||||
|
|
|
@ -59,7 +59,6 @@ public class CuratorTestBase
|
|||
.retryPolicy(new RetryOneTime(1))
|
||||
.compressionProvider(new PotentiallyGzippedCompressionProvider(true))
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
protected void setupZNodeForServer(DruidServer server, ZkPathsConfig zkPathsConfig, ObjectMapper jsonMapper)
|
||||
|
|
|
@ -186,7 +186,7 @@ public class CliCoordinator extends ServerRunnable
|
|||
|
||||
binder.bind(MetadataStorage.class).toProvider(MetadataStorageProvider.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.manager.segments", SegmentsMetadataManagerConfig.class);
|
||||
JsonConfigProvider.bind(binder, SegmentsMetadataManagerConfig.CONFIG_PREFIX, SegmentsMetadataManagerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.manager.lookups", LookupCoordinatorManagerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class);
|
||||
|
|
|
@ -124,6 +124,11 @@ public abstract class GuiceRunnable implements Runnable
|
|||
}
|
||||
|
||||
public Lifecycle initLifecycle(Injector injector)
|
||||
{
|
||||
return initLifecycle(injector, log);
|
||||
}
|
||||
|
||||
public static Lifecycle initLifecycle(Injector injector, Logger log)
|
||||
{
|
||||
try {
|
||||
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||
|
|
|
@ -1074,7 +1074,7 @@ public class SystemSchema extends AbstractSchema
|
|||
|
||||
if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
|
||||
throw new RE(
|
||||
"Failed to talk to leader node at [%s]. Error code[%d], description[%s].",
|
||||
"Failed to talk to leader node at [%s]. Error code [%d], description [%s].",
|
||||
query,
|
||||
responseHolder.getStatus().getCode(),
|
||||
responseHolder.getStatus().getReasonPhrase()
|
||||
|
|
Loading…
Reference in New Issue