mirror of https://github.com/apache/druid.git
Exit JVM on curator unhandled errors (#8458)
* Exit JVM on curator unhandled errors If an unhandled error occurs when curator is talking to ZooKeeper, exit the JVM in addition to stopping the lifecycle to prevent the process from being left in a zombie state. With this change, BoundedExponentialBackoffRetryWithQuit is no longer needed as when curator exceeds the configured retries, it triggers its unhandled error listeners. A new "connectionTimeoutMs" CuratorConfig setting is added mostly to facilitate testing curator unhandled errors, but it may be useful for users as well. * Address review comments
This commit is contained in:
parent
9fa3407596
commit
14a8613d69
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.junit;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.apache.logging.log4j.core.LoggerContext;
|
||||
import org.apache.logging.log4j.core.appender.AbstractAppender;
|
||||
import org.apache.logging.log4j.core.config.Configuration;
|
||||
import org.apache.logging.log4j.core.config.LoggerConfig;
|
||||
import org.junit.rules.ExternalResource;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* JUnit rule to capture a class's logger output to an in-memory buffer to allow verification of log messages in tests.
|
||||
*/
|
||||
public class LoggerCaptureRule extends ExternalResource
|
||||
{
|
||||
private final Class<?> targetClass;
|
||||
|
||||
private InMemoryAppender inMemoryAppender;
|
||||
private LoggerConfig targetClassLoggerConfig;
|
||||
|
||||
public LoggerCaptureRule(Class<?> targetClass)
|
||||
{
|
||||
this.targetClass = targetClass;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void before()
|
||||
{
|
||||
inMemoryAppender = new InMemoryAppender();
|
||||
LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
|
||||
Configuration configuration = loggerContext.getConfiguration();
|
||||
targetClassLoggerConfig = configuration.getLoggerConfig(targetClass.getName());
|
||||
targetClassLoggerConfig.addAppender(inMemoryAppender, Level.ALL, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void after()
|
||||
{
|
||||
clearLogEvents();
|
||||
targetClassLoggerConfig.removeAppender(InMemoryAppender.NAME);
|
||||
}
|
||||
|
||||
public List<LogEvent> getLogEvents()
|
||||
{
|
||||
return inMemoryAppender.getLogEvents();
|
||||
}
|
||||
|
||||
public void clearLogEvents()
|
||||
{
|
||||
inMemoryAppender.clearLogEvents();
|
||||
}
|
||||
|
||||
private static class InMemoryAppender extends AbstractAppender
|
||||
{
|
||||
static final String NAME = InMemoryAppender.class.getName();
|
||||
|
||||
private final List<LogEvent> logEvents;
|
||||
|
||||
InMemoryAppender()
|
||||
{
|
||||
super(NAME, null, null);
|
||||
logEvents = new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(LogEvent logEvent)
|
||||
{
|
||||
logEvents.add(logEvent);
|
||||
}
|
||||
|
||||
List<LogEvent> getLogEvents()
|
||||
{
|
||||
return Collections.unmodifiableList(logEvents);
|
||||
}
|
||||
|
||||
void clearLogEvents()
|
||||
{
|
||||
logEvents.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -107,13 +107,13 @@ We recommend just setting the base ZK path and the ZK service host, but all ZK p
|
|||
|`druid.zk.service.user`|The username to authenticate with ZooKeeper. This is an optional property.|none|
|
||||
|`druid.zk.service.pwd`|The [Password Provider](../operations/password-provider.md) or the string password to authenticate with ZooKeeper. This is an optional property.|none|
|
||||
|`druid.zk.service.authScheme`|digest is the only authentication scheme supported. |digest|
|
||||
|`druid.zk.service.terminateDruidProcessOnConnectFail`|If set to 'true' and the connection to ZooKeeper fails (after exhausting all potential backoff retires), Druid process terminates itself with exit code 1.|false|
|
||||
|
||||
#### Zookeeper Behavior
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.zk.service.sessionTimeoutMs`|ZooKeeper session timeout, in milliseconds.|`30000`|
|
||||
|`druid.zk.service.connectionTimeoutMs`|ZooKeeper connection timeout, in milliseconds.|`15000`|
|
||||
|`druid.zk.service.compress`|Boolean flag for whether or not created Znodes should be compressed.|`true`|
|
||||
|`druid.zk.service.acl`|Boolean flag for whether or not to enable ACL security for ZooKeeper. If ACL is enabled, zNode creators will have all permissions.|`false`|
|
||||
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -967,6 +967,12 @@
|
|||
<version>${guava.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stefanbirkner</groupId>
|
||||
<artifactId>system-rules</artifactId>
|
||||
<version>1.19.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
|
|
@ -265,6 +265,11 @@
|
|||
<artifactId>JUnitParams</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stefanbirkner</groupId>
|
||||
<artifactId>system-rules</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.curator;
|
||||
|
||||
import org.apache.curator.RetrySleeper;
|
||||
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
||||
/**
|
||||
* BoundedExponentialBackoffRetryWithQuit extends BoundedExponentialBackoffRetry for simplicity. It's not actually a
|
||||
* BoundedExponentialBackoffRetry from the Liskov substitution principle point of view,
|
||||
* but it doesn't matter in this code.
|
||||
*
|
||||
*/
|
||||
public class BoundedExponentialBackoffRetryWithQuit extends BoundedExponentialBackoffRetry
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(BoundedExponentialBackoffRetryWithQuit.class);
|
||||
|
||||
private final Runnable exitRunner;
|
||||
|
||||
public BoundedExponentialBackoffRetryWithQuit(
|
||||
Runnable exitRunner,
|
||||
int baseSleepTimeMs,
|
||||
int maxSleepTimeMs,
|
||||
int maxRetries
|
||||
)
|
||||
{
|
||||
super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
|
||||
this.exitRunner = exitRunner;
|
||||
log.info("BoundedExponentialBackoffRetryWithQuit Retry Policy selected.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
|
||||
{
|
||||
log.warn("Zookeeper can't be reached, retrying (retryCount = %s out of %s)...", retryCount, this.getN());
|
||||
boolean shouldRetry = super.allowRetry(retryCount, elapsedTimeMs, sleeper);
|
||||
if (!shouldRetry) {
|
||||
log.warn("Since Zookeeper can't be reached after retries exhausted, calling exit function...");
|
||||
exitRunner.run();
|
||||
}
|
||||
return shouldRetry;
|
||||
}
|
||||
|
||||
}
|
|
@ -26,16 +26,20 @@ import org.apache.druid.metadata.PasswordProvider;
|
|||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CuratorConfig
|
||||
{
|
||||
@JsonProperty("host")
|
||||
static final String HOST = "host";
|
||||
@JsonProperty(HOST)
|
||||
private String zkHosts = "localhost";
|
||||
|
||||
@JsonProperty("sessionTimeoutMs")
|
||||
@Min(0)
|
||||
private int zkSessionTimeoutMs = 30000;
|
||||
private int zkSessionTimeoutMs = 30_000;
|
||||
|
||||
static final String CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
|
||||
@JsonProperty(CONNECTION_TIMEOUT_MS)
|
||||
@Min(0)
|
||||
private int zkConnectionTimeoutMs = 15_000; // same as Curator default: https://git.io/fjhhr
|
||||
|
||||
@JsonProperty("compress")
|
||||
private boolean enableCompression = true;
|
||||
|
@ -52,10 +56,6 @@ public class CuratorConfig
|
|||
@JsonProperty("authScheme")
|
||||
private String authScheme = "digest";
|
||||
|
||||
@JsonProperty("terminateDruidProcessOnConnectFail")
|
||||
private boolean terminateDruidProcessOnConnectFail = false;
|
||||
|
||||
|
||||
public String getZkHosts()
|
||||
{
|
||||
return zkHosts;
|
||||
|
@ -66,7 +66,7 @@ public class CuratorConfig
|
|||
this.zkHosts = zkHosts;
|
||||
}
|
||||
|
||||
public Integer getZkSessionTimeoutMs()
|
||||
public int getZkSessionTimeoutMs()
|
||||
{
|
||||
return zkSessionTimeoutMs;
|
||||
}
|
||||
|
@ -76,6 +76,16 @@ public class CuratorConfig
|
|||
this.zkSessionTimeoutMs = zkSessionTimeoutMs;
|
||||
}
|
||||
|
||||
public int getZkConnectionTimeoutMs()
|
||||
{
|
||||
return zkConnectionTimeoutMs;
|
||||
}
|
||||
|
||||
public void setZkConnectionTimeoutMs(Integer zkConnectionTimeoutMs)
|
||||
{
|
||||
this.zkConnectionTimeoutMs = zkConnectionTimeoutMs;
|
||||
}
|
||||
|
||||
public boolean getEnableCompression()
|
||||
{
|
||||
return enableCompression;
|
||||
|
@ -112,19 +122,4 @@ public class CuratorConfig
|
|||
{
|
||||
return authScheme;
|
||||
}
|
||||
|
||||
public boolean getTerminateDruidProcessOnConnectFail()
|
||||
{
|
||||
return terminateDruidProcessOnConnectFail;
|
||||
}
|
||||
|
||||
public void setTerminateDruidProcessOnConnectFail(Boolean terminateDruidProcessOnConnectFail)
|
||||
{
|
||||
if (terminateDruidProcessOnConnectFail == null) {
|
||||
this.terminateDruidProcessOnConnectFail = false;
|
||||
} else {
|
||||
this.terminateDruidProcessOnConnectFail = terminateDruidProcessOnConnectFail;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.curator.ensemble.exhibitor.Exhibitors;
|
|||
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
|
||||
import org.apache.curator.framework.api.ACLProvider;
|
||||
import org.apache.curator.framework.imps.DefaultACLProvider;
|
||||
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
|
||||
|
@ -47,8 +46,6 @@ import org.apache.zookeeper.data.ACL;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CuratorModule implements Module
|
||||
{
|
||||
static final String CURATOR_CONFIG_PREFIX = "druid.zk.service";
|
||||
|
@ -75,7 +72,7 @@ public class CuratorModule implements Module
|
|||
@SuppressForbidden(reason = "System#err")
|
||||
public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
|
||||
{
|
||||
final Builder builder = CuratorFrameworkFactory.builder();
|
||||
final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
|
||||
if (!Strings.isNullOrEmpty(config.getZkUser()) && !Strings.isNullOrEmpty(config.getZkPwd())) {
|
||||
builder.authorization(
|
||||
config.getAuthScheme(),
|
||||
|
@ -83,31 +80,12 @@ public class CuratorModule implements Module
|
|||
);
|
||||
}
|
||||
|
||||
RetryPolicy retryPolicy;
|
||||
if (config.getTerminateDruidProcessOnConnectFail()) {
|
||||
final Runnable exitRunner = () -> {
|
||||
try {
|
||||
log.error("Zookeeper can't be reached, forcefully stopping lifecycle...");
|
||||
lifecycle.stop();
|
||||
System.err.println("Zookeeper can't be reached, forcefully stopping virtual machine...");
|
||||
}
|
||||
finally {
|
||||
System.exit(1);
|
||||
}
|
||||
};
|
||||
retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
|
||||
exitRunner,
|
||||
BASE_SLEEP_TIME_MS,
|
||||
MAX_SLEEP_TIME_MS,
|
||||
MAX_RETRIES
|
||||
);
|
||||
} else {
|
||||
retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||
}
|
||||
RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||
|
||||
final CuratorFramework framework = builder
|
||||
.ensembleProvider(ensembleProvider)
|
||||
.sessionTimeoutMs(config.getZkSessionTimeoutMs())
|
||||
.connectionTimeoutMs(config.getZkConnectionTimeoutMs())
|
||||
.retryPolicy(retryPolicy)
|
||||
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
|
||||
.aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
|
||||
|
@ -115,12 +93,7 @@ public class CuratorModule implements Module
|
|||
|
||||
framework.getUnhandledErrorListenable().addListener((message, e) -> {
|
||||
log.error(e, "Unhandled error in Curator Framework");
|
||||
try {
|
||||
lifecycle.stop();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.warn(t, "Exception when stopping druid lifecycle");
|
||||
}
|
||||
shutdown(lifecycle);
|
||||
});
|
||||
|
||||
lifecycle.addHandler(
|
||||
|
@ -153,29 +126,7 @@ public class CuratorModule implements Module
|
|||
return new FixedEnsembleProvider(config.getZkHosts());
|
||||
}
|
||||
|
||||
RetryPolicy retryPolicy;
|
||||
if (config.getTerminateDruidProcessOnConnectFail()) {
|
||||
// It's unknown whether or not this precaution is needed. Tests revealed that this path was never taken.
|
||||
// see discussions in https://github.com/apache/incubator-druid/pull/6740
|
||||
|
||||
final Runnable exitRunner = () -> {
|
||||
try {
|
||||
log.error("Zookeeper can't be reached, forcefully stopping virtual machine...");
|
||||
}
|
||||
finally {
|
||||
System.exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
|
||||
exitRunner,
|
||||
BASE_SLEEP_TIME_MS,
|
||||
MAX_SLEEP_TIME_MS,
|
||||
MAX_RETRIES
|
||||
);
|
||||
} else {
|
||||
retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||
}
|
||||
RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||
|
||||
return new ExhibitorEnsembleProvider(
|
||||
new Exhibitors(
|
||||
|
@ -201,14 +152,7 @@ public class CuratorModule implements Module
|
|||
|
||||
private Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts)
|
||||
{
|
||||
return new Exhibitors.BackupConnectionStringProvider()
|
||||
{
|
||||
@Override
|
||||
public String getBackupConnectionString()
|
||||
{
|
||||
return zkHosts;
|
||||
}
|
||||
};
|
||||
return () -> zkHosts;
|
||||
}
|
||||
|
||||
static class SecuredACLProvider implements ACLProvider
|
||||
|
@ -225,4 +169,18 @@ public class CuratorModule implements Module
|
|||
return ZooDefs.Ids.CREATOR_ALL_ACL;
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdown(Lifecycle lifecycle)
|
||||
{
|
||||
//noinspection finally (not completing the 'finally' block normally is intentional)
|
||||
try {
|
||||
lifecycle.stop();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.error(t, "Exception when stopping druid lifecycle");
|
||||
}
|
||||
finally {
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,111 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.curator;
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public final class BoundedExponentialBackoffRetryWithQuitTest
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(BoundedExponentialBackoffRetryWithQuitTest.class);
|
||||
|
||||
/*
|
||||
Methodology (order is important!):
|
||||
1. Zookeeper Server Service started
|
||||
2. Lifecycle started
|
||||
3. Curator invokes connection to service
|
||||
4. Service is stopped
|
||||
5. Curator attempts to do something, which invokes the retries policy
|
||||
6. Retries exceed limit, call function which simulates an exit (since mocking System.exit() is hard to do without
|
||||
changing a lot of dependencies)
|
||||
*/
|
||||
@Test
|
||||
public void testExitWithLifecycle() throws Exception
|
||||
{
|
||||
final Lifecycle actualNoop = new Lifecycle() {
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
super.start();
|
||||
log.info("Starting lifecycle...");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
super.stop();
|
||||
log.info("Stopping lifecycle...");
|
||||
}
|
||||
};
|
||||
Lifecycle noop = EasyMock.mock(Lifecycle.class);
|
||||
|
||||
noop.start();
|
||||
EasyMock.expectLastCall().andDelegateTo(actualNoop);
|
||||
noop.stop();
|
||||
EasyMock.expectLastCall().andDelegateTo(actualNoop);
|
||||
EasyMock.replay(noop);
|
||||
|
||||
Runnable exitFunction = () -> {
|
||||
log.info("Zookeeper retries exhausted, exiting...");
|
||||
noop.stop();
|
||||
throw new RuntimeException("Simulated exit");
|
||||
};
|
||||
|
||||
TestingServer server = new TestingServer();
|
||||
BoundedExponentialBackoffRetryWithQuit retry = new BoundedExponentialBackoffRetryWithQuit(exitFunction, 1, 1, 2);
|
||||
CuratorFramework curator = CuratorFrameworkFactory
|
||||
.builder()
|
||||
.connectString(server.getConnectString())
|
||||
.sessionTimeoutMs(1000)
|
||||
.connectionTimeoutMs(1)
|
||||
.retryPolicy(retry)
|
||||
.build();
|
||||
server.start();
|
||||
System.out.println("Server started.");
|
||||
curator.start();
|
||||
noop.start();
|
||||
curator.checkExists().forPath("/tmp");
|
||||
log.info("Connected.");
|
||||
boolean failed = false;
|
||||
try {
|
||||
server.stop();
|
||||
log.info("Stopped.");
|
||||
curator.checkExists().forPath("/tmp");
|
||||
Thread.sleep(10);
|
||||
curator.checkExists().forPath("/tmp");
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.assertTrue("Correct exception type", e instanceof RuntimeException);
|
||||
EasyMock.verify(noop);
|
||||
curator.close();
|
||||
failed = true;
|
||||
}
|
||||
Assert.assertTrue("Must be marked in failure state", failed);
|
||||
log.info("Lifecycle stopped.");
|
||||
}
|
||||
|
||||
}
|
|
@ -20,32 +20,45 @@
|
|||
package org.apache.druid.curator;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.util.Modules;
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.ensemble.EnsembleProvider;
|
||||
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
|
||||
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.druid.guice.GuiceInjectors;
|
||||
import org.apache.druid.guice.LifecycleModule;
|
||||
import org.apache.druid.testing.junit.LoggerCaptureRule;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*/
|
||||
public final class CuratorModuleTest
|
||||
{
|
||||
|
||||
private static final String CURATOR_HOST_KEY = CuratorModule.CURATOR_CONFIG_PREFIX + ".host";
|
||||
|
||||
private static final String CURATOR_HOST_KEY = CuratorModule.CURATOR_CONFIG_PREFIX + "." + CuratorConfig.HOST;
|
||||
private static final String CURATOR_CONNECTION_TIMEOUT_MS_KEY =
|
||||
CuratorModule.CURATOR_CONFIG_PREFIX + "." + CuratorConfig.CONNECTION_TIMEOUT_MS;
|
||||
private static final String EXHIBITOR_HOSTS_KEY = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts";
|
||||
|
||||
@Rule
|
||||
public final ExpectedSystemExit exit = ExpectedSystemExit.none();
|
||||
|
||||
@Rule
|
||||
public final LoggerCaptureRule logger = new LoggerCaptureRule(CuratorModule.class);
|
||||
|
||||
@Test
|
||||
public void defaultEnsembleProvider()
|
||||
{
|
||||
|
@ -66,7 +79,7 @@ public final class CuratorModuleTest
|
|||
public void fixedZkHosts()
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.put(CURATOR_HOST_KEY, "hostA");
|
||||
props.setProperty(CURATOR_HOST_KEY, "hostA");
|
||||
Injector injector = newInjector(props);
|
||||
|
||||
injector.getInstance(CuratorFramework.class); // initialize related components
|
||||
|
@ -85,8 +98,8 @@ public final class CuratorModuleTest
|
|||
public void exhibitorEnsembleProvider()
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.put(CURATOR_HOST_KEY, "hostA");
|
||||
props.put(EXHIBITOR_HOSTS_KEY, "[\"hostB\"]");
|
||||
props.setProperty(CURATOR_HOST_KEY, "hostA");
|
||||
props.setProperty(EXHIBITOR_HOSTS_KEY, "[\"hostB\"]");
|
||||
Injector injector = newInjector(props);
|
||||
|
||||
injector.getInstance(CuratorFramework.class); // initialize related components
|
||||
|
@ -101,8 +114,8 @@ public final class CuratorModuleTest
|
|||
public void emptyExhibitorHosts()
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.put(CURATOR_HOST_KEY, "hostB");
|
||||
props.put(EXHIBITOR_HOSTS_KEY, "[]");
|
||||
props.setProperty(CURATOR_HOST_KEY, "hostB");
|
||||
props.setProperty(EXHIBITOR_HOSTS_KEY, "[]");
|
||||
Injector injector = newInjector(props);
|
||||
|
||||
injector.getInstance(CuratorFramework.class); // initialize related components
|
||||
|
@ -117,21 +130,74 @@ public final class CuratorModuleTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void exitsJvmWhenMaxRetriesExceeded() throws Exception
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.setProperty(CURATOR_CONNECTION_TIMEOUT_MS_KEY, "0");
|
||||
Injector injector = newInjector(props);
|
||||
CuratorFramework curatorFramework = createCuratorFramework(injector, 0);
|
||||
curatorFramework.start();
|
||||
|
||||
exit.expectSystemExitWithStatus(1);
|
||||
logger.clearLogEvents();
|
||||
|
||||
// This will result in a curator unhandled error since the connection timeout is 0 and retries are disabled
|
||||
curatorFramework.create().inBackground().forPath("/foo");
|
||||
|
||||
// org.apache.curator.framework.impl.CuratorFrameworkImpl logs "Background retry gave up" unhandled error twice
|
||||
List<LogEvent> loggingEvents = logger.getLogEvents();
|
||||
Assert.assertFalse(loggingEvents.isEmpty());
|
||||
LogEvent logEvent = loggingEvents.get(0);
|
||||
Assert.assertEquals(Level.ERROR, logEvent.getLevel());
|
||||
Assert.assertEquals("Unhandled error in Curator Framework", logEvent.getMessage().getFormattedMessage());
|
||||
}
|
||||
|
||||
@Ignore("Verifies changes in https://github.com/apache/incubator-druid/pull/8458, but overkill for regular testing")
|
||||
@Test
|
||||
public void ignoresDeprecatedCuratorConfigProperties()
|
||||
{
|
||||
Properties props = new Properties();
|
||||
String deprecatedPropName = CuratorModule.CURATOR_CONFIG_PREFIX + ".terminateDruidProcessOnConnectFail";
|
||||
props.setProperty(deprecatedPropName, "true");
|
||||
Injector injector = newInjector(props);
|
||||
|
||||
try {
|
||||
injector.getInstance(CuratorFramework.class);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.fail("Deprecated curator config was not ignored:\n" + e);
|
||||
}
|
||||
}
|
||||
|
||||
private Injector newInjector(final Properties props)
|
||||
{
|
||||
List<Module> modules = ImmutableList.<Module>builder()
|
||||
.addAll(GuiceInjectors.makeDefaultStartupModules())
|
||||
.add(new LifecycleModule()).add(new CuratorModule()).build();
|
||||
.add(new LifecycleModule())
|
||||
.add(new CuratorModule())
|
||||
.build();
|
||||
return Guice.createInjector(
|
||||
Modules.override(modules).with(new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(Properties.class).toInstance(props);
|
||||
}
|
||||
})
|
||||
Modules.override(modules).with(binder -> binder.bind(Properties.class).toInstance(props))
|
||||
);
|
||||
}
|
||||
|
||||
private static CuratorFramework createCuratorFramework(Injector injector, int maxRetries)
|
||||
{
|
||||
CuratorFramework curatorFramework = injector.getInstance(CuratorFramework.class);
|
||||
RetryPolicy retryPolicy = curatorFramework.getZookeeperClient().getRetryPolicy();
|
||||
Assert.assertThat(retryPolicy, CoreMatchers.instanceOf(ExponentialBackoffRetry.class));
|
||||
RetryPolicy adjustedRetryPolicy = adjustRetryPolicy((BoundedExponentialBackoffRetry) retryPolicy, 0);
|
||||
curatorFramework.getZookeeperClient().setRetryPolicy(adjustedRetryPolicy);
|
||||
return curatorFramework;
|
||||
}
|
||||
|
||||
private static RetryPolicy adjustRetryPolicy(BoundedExponentialBackoffRetry origRetryPolicy, int maxRetries)
|
||||
{
|
||||
return new BoundedExponentialBackoffRetry(
|
||||
origRetryPolicy.getBaseSleepTimeMs(),
|
||||
origRetryPolicy.getMaxSleepTimeMs(),
|
||||
maxRetries
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue