mirror of https://github.com/apache/druid.git
Zookeeper loss (#6740)
* Update init Fix bin/init to source from proper directory. * Fix for Proposal #6518: Shutdown druid processes upon complete loss of ZK connectivity * Zookeeper Loss: - Add feature documentation - Cosmetic refactors - Variable extractions - Remove getter * - Change config key name and reword documentation - Switch from Function<Void,Void> to Runnable/Lambda - try { … } finally { … } * Fix line length too long * - change to formatted string for logging - use System.err.println after lifecycle stops * commenting on makeEnsembleProvider()-created Zookeeper termination * Add javadoc * added java doc reference back to apache discussion thread. * move comment to other class * favor two-slash comments instead of multiline comments
This commit is contained in:
parent
62c3e89266
commit
347779b17a
|
@ -171,6 +171,7 @@ We recommend just setting the base ZK path and the ZK service host, but all ZK p
|
||||||
|`druid.zk.service.user`|The username to authenticate with ZooKeeper. This is an optional property.|none|
|
|`druid.zk.service.user`|The username to authenticate with ZooKeeper. This is an optional property.|none|
|
||||||
|`druid.zk.service.pwd`|The [Password Provider](../operations/password-provider.html) or the string password to authenticate with ZooKeeper. This is an optional property.|none|
|
|`druid.zk.service.pwd`|The [Password Provider](../operations/password-provider.html) or the string password to authenticate with ZooKeeper. This is an optional property.|none|
|
||||||
|`druid.zk.service.authScheme`|digest is the only authentication scheme supported. |digest|
|
|`druid.zk.service.authScheme`|digest is the only authentication scheme supported. |digest|
|
||||||
|
|`druid.zk.service.terminateDruidProcessOnConnectFail`|If set to 'true' and the connection to ZooKeeper fails (after exhausting all potential backoff retires), Druid process terminates itself with exit code 1.|false|
|
||||||
|
|
||||||
#### Zookeeper Behavior
|
#### Zookeeper Behavior
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.curator;
|
||||||
|
|
||||||
|
import org.apache.curator.RetrySleeper;
|
||||||
|
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BoundedExponentialBackoffRetryWithQuit extends BoundedExponentialBackoffRetry for simplicity. It's not actually a
|
||||||
|
* BoundedExponentialBackoffRetry from the Liskov substitution principle point of view,
|
||||||
|
* but it doesn't matter in this code.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class BoundedExponentialBackoffRetryWithQuit extends BoundedExponentialBackoffRetry
|
||||||
|
{
|
||||||
|
|
||||||
|
private static final Logger log = new Logger(BoundedExponentialBackoffRetryWithQuit.class);
|
||||||
|
|
||||||
|
private final Runnable exitRunner;
|
||||||
|
|
||||||
|
public BoundedExponentialBackoffRetryWithQuit(
|
||||||
|
Runnable exitRunner,
|
||||||
|
int baseSleepTimeMs,
|
||||||
|
int maxSleepTimeMs,
|
||||||
|
int maxRetries
|
||||||
|
)
|
||||||
|
{
|
||||||
|
super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
|
||||||
|
this.exitRunner = exitRunner;
|
||||||
|
log.info("BoundedExponentialBackoffRetryWithQuit Retry Policy selected.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
|
||||||
|
{
|
||||||
|
log.warn("Zookeeper can't be reached, retrying (retryCount = %s out of %s)...", retryCount, this.getN());
|
||||||
|
boolean shouldRetry = super.allowRetry(retryCount, elapsedTimeMs, sleeper);
|
||||||
|
if (!shouldRetry) {
|
||||||
|
log.warn("Since Zookeeper can't be reached after retries exhausted, calling exit function...");
|
||||||
|
exitRunner.run();
|
||||||
|
}
|
||||||
|
return shouldRetry;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -52,6 +52,10 @@ public class CuratorConfig
|
||||||
@JsonProperty("authScheme")
|
@JsonProperty("authScheme")
|
||||||
private String authScheme = "digest";
|
private String authScheme = "digest";
|
||||||
|
|
||||||
|
@JsonProperty("terminateDruidProcessOnConnectFail")
|
||||||
|
private boolean terminateDruidProcessOnConnectFail = false;
|
||||||
|
|
||||||
|
|
||||||
public String getZkHosts()
|
public String getZkHosts()
|
||||||
{
|
{
|
||||||
return zkHosts;
|
return zkHosts;
|
||||||
|
@ -109,4 +113,18 @@ public class CuratorConfig
|
||||||
return authScheme;
|
return authScheme;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getTerminateDruidProcessOnConnectFail()
|
||||||
|
{
|
||||||
|
return terminateDruidProcessOnConnectFail;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTerminateDruidProcessOnConnectFail(Boolean terminateDruidProcessOnConnectFail)
|
||||||
|
{
|
||||||
|
if (terminateDruidProcessOnConnectFail == null) {
|
||||||
|
this.terminateDruidProcessOnConnectFail = false;
|
||||||
|
} else {
|
||||||
|
this.terminateDruidProcessOnConnectFail = terminateDruidProcessOnConnectFail;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.druid.curator;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
|
import io.netty.util.SuppressForbidden;
|
||||||
|
import org.apache.curator.RetryPolicy;
|
||||||
import org.apache.curator.ensemble.EnsembleProvider;
|
import org.apache.curator.ensemble.EnsembleProvider;
|
||||||
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
|
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
|
||||||
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
|
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
|
||||||
|
@ -70,6 +72,7 @@ public class CuratorModule implements Module
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@LazySingleton
|
@LazySingleton
|
||||||
|
@SuppressForbidden(reason = "System#err")
|
||||||
public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
|
public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
|
||||||
{
|
{
|
||||||
final Builder builder = CuratorFrameworkFactory.builder();
|
final Builder builder = CuratorFrameworkFactory.builder();
|
||||||
|
@ -79,10 +82,33 @@ public class CuratorModule implements Module
|
||||||
StringUtils.format("%s:%s", config.getZkUser(), config.getZkPwd()).getBytes(StandardCharsets.UTF_8)
|
StringUtils.format("%s:%s", config.getZkUser(), config.getZkPwd()).getBytes(StandardCharsets.UTF_8)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RetryPolicy retryPolicy;
|
||||||
|
if (config.getTerminateDruidProcessOnConnectFail()) {
|
||||||
|
final Runnable exitRunner = () -> {
|
||||||
|
try {
|
||||||
|
log.error("Zookeeper can't be reached, forcefully stopping lifecycle...");
|
||||||
|
lifecycle.stop();
|
||||||
|
System.err.println("Zookeeper can't be reached, forcefully stopping virtual machine...");
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
|
||||||
|
exitRunner,
|
||||||
|
BASE_SLEEP_TIME_MS,
|
||||||
|
MAX_SLEEP_TIME_MS,
|
||||||
|
MAX_RETRIES
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||||
|
}
|
||||||
|
|
||||||
final CuratorFramework framework = builder
|
final CuratorFramework framework = builder
|
||||||
.ensembleProvider(ensembleProvider)
|
.ensembleProvider(ensembleProvider)
|
||||||
.sessionTimeoutMs(config.getZkSessionTimeoutMs())
|
.sessionTimeoutMs(config.getZkSessionTimeoutMs())
|
||||||
.retryPolicy(new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES))
|
.retryPolicy(retryPolicy)
|
||||||
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
|
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
|
||||||
.aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
|
.aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
|
||||||
.build();
|
.build();
|
||||||
|
@ -127,6 +153,30 @@ public class CuratorModule implements Module
|
||||||
return new FixedEnsembleProvider(config.getZkHosts());
|
return new FixedEnsembleProvider(config.getZkHosts());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RetryPolicy retryPolicy;
|
||||||
|
if (config.getTerminateDruidProcessOnConnectFail()) {
|
||||||
|
// It's unknown whether or not this precaution is needed. Tests revealed that this path was never taken.
|
||||||
|
// see discussions in https://github.com/apache/incubator-druid/pull/6740
|
||||||
|
|
||||||
|
final Runnable exitRunner = () -> {
|
||||||
|
try {
|
||||||
|
log.error("Zookeeper can't be reached, forcefully stopping virtual machine...");
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
|
||||||
|
exitRunner,
|
||||||
|
BASE_SLEEP_TIME_MS,
|
||||||
|
MAX_SLEEP_TIME_MS,
|
||||||
|
MAX_RETRIES
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
|
||||||
|
}
|
||||||
|
|
||||||
return new ExhibitorEnsembleProvider(
|
return new ExhibitorEnsembleProvider(
|
||||||
new Exhibitors(
|
new Exhibitors(
|
||||||
exConfig.getHosts(),
|
exConfig.getHosts(),
|
||||||
|
@ -136,7 +186,7 @@ public class CuratorModule implements Module
|
||||||
new DefaultExhibitorRestClient(exConfig.getUseSsl()),
|
new DefaultExhibitorRestClient(exConfig.getUseSsl()),
|
||||||
exConfig.getRestUriPath(),
|
exConfig.getRestUriPath(),
|
||||||
exConfig.getPollingMs(),
|
exConfig.getPollingMs(),
|
||||||
new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES)
|
retryPolicy
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.curator;
|
||||||
|
|
||||||
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
|
import org.apache.curator.test.TestingServer;
|
||||||
|
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public final class BoundedExponentialBackoffRetryWithQuitTest
|
||||||
|
{
|
||||||
|
|
||||||
|
private static final Logger log = new Logger(BoundedExponentialBackoffRetryWithQuitTest.class);
|
||||||
|
|
||||||
|
/*
|
||||||
|
Methodology (order is important!):
|
||||||
|
1. Zookeeper Server Service started
|
||||||
|
2. Lifecycle started
|
||||||
|
3. Curator invokes connection to service
|
||||||
|
4. Service is stopped
|
||||||
|
5. Curator attempts to do something, which invokes the retries policy
|
||||||
|
6. Retries exceed limit, call function which simulates an exit (since mocking System.exit() is hard to do without
|
||||||
|
changing a lot of dependencies)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testExitWithLifecycle() throws Exception
|
||||||
|
{
|
||||||
|
final Lifecycle actualNoop = new Lifecycle() {
|
||||||
|
@Override
|
||||||
|
public void start() throws Exception
|
||||||
|
{
|
||||||
|
super.start();
|
||||||
|
log.info("Starting lifecycle...");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
super.stop();
|
||||||
|
log.info("Stopping lifecycle...");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Lifecycle noop = EasyMock.mock(Lifecycle.class);
|
||||||
|
|
||||||
|
noop.start();
|
||||||
|
EasyMock.expectLastCall().andDelegateTo(actualNoop);
|
||||||
|
noop.stop();
|
||||||
|
EasyMock.expectLastCall().andDelegateTo(actualNoop);
|
||||||
|
EasyMock.replay(noop);
|
||||||
|
|
||||||
|
Runnable exitFunction = () -> {
|
||||||
|
log.info("Zookeeper retries exhausted, exiting...");
|
||||||
|
noop.stop();
|
||||||
|
throw new RuntimeException("Simulated exit");
|
||||||
|
};
|
||||||
|
|
||||||
|
TestingServer server = new TestingServer();
|
||||||
|
BoundedExponentialBackoffRetryWithQuit retry = new BoundedExponentialBackoffRetryWithQuit(exitFunction, 1, 1, 2);
|
||||||
|
CuratorFramework curator = CuratorFrameworkFactory
|
||||||
|
.builder()
|
||||||
|
.connectString(server.getConnectString())
|
||||||
|
.sessionTimeoutMs(1000)
|
||||||
|
.connectionTimeoutMs(1)
|
||||||
|
.retryPolicy(retry)
|
||||||
|
.build();
|
||||||
|
server.start();
|
||||||
|
System.out.println("Server started.");
|
||||||
|
curator.start();
|
||||||
|
noop.start();
|
||||||
|
curator.checkExists().forPath("/tmp");
|
||||||
|
log.info("Connected.");
|
||||||
|
boolean failed = false;
|
||||||
|
try {
|
||||||
|
server.stop();
|
||||||
|
log.info("Stopped.");
|
||||||
|
curator.checkExists().forPath("/tmp");
|
||||||
|
Thread.sleep(10);
|
||||||
|
curator.checkExists().forPath("/tmp");
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
Assert.assertTrue("Correct exception type", e instanceof RuntimeException);
|
||||||
|
EasyMock.verify(noop);
|
||||||
|
curator.close();
|
||||||
|
failed = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("Must be marked in failure state", failed);
|
||||||
|
log.info("Lifecycle stopped.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue