Extend systemd timeout during startup (#49784)

When we are notifying systemd that we are fully started up, it can be
that we do not notify systemd before its default timeout of sixty
seconds elapses (e.g., if we are upgrading on-disk metadata). In this
case, we need to notify systemd to extend this timeout so that we are
not abruptly terminated. We do this by repeatedly sending
EXTEND_TIMEOUT_USEC to extend the timeout by thirty seconds; we do this
every fifteen seconds. This will prevent systemd from abruptly
terminating us during a long startup. We cancel the scheduled execution
of this notification after we have successfully started up.
This commit is contained in:
Jason Tedor 2019-12-03 14:20:32 -05:00
parent d5eb9379c9
commit 0f27c0b702
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
2 changed files with 100 additions and 15 deletions

View File

@ -22,8 +22,22 @@ package org.elasticsearch.systemd;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Build;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.util.Collection;
import java.util.Collections;
public class SystemdPlugin extends Plugin implements ClusterPlugin {
@ -62,8 +76,44 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
enabled = Boolean.TRUE.toString().equals(esSDNotify);
}
Scheduler.Cancellable extender;
@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
if (enabled) {
/*
* Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the
* READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then
* we need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the
* timeout. Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds.
* We will cancel this scheduled task after we successfully notify systemd that we are ready.
*/
extender = threadPool.scheduleWithFixedDelay(
() -> {
final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000");
if (rc < 0) {
logger.warn("extending startup timeout via sd_notify failed with [{}]", rc);
}
},
TimeValue.timeValueSeconds(15),
ThreadPool.Names.SAME);
}
return Collections.emptyList();
}
int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) {
return Libsystemd.sd_notify(unset_environment, state);
final int rc = Libsystemd.sd_notify(unset_environment, state);
logger.trace("sd_notify({}, {}) returned [{}]", unset_environment, state, rc);
return rc;
}
@Override
@ -72,11 +122,13 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
return;
}
final int rc = sd_notify(0, "READY=1");
logger.trace("sd_notify returned [{}]", rc);
if (rc < 0) {
// treat failure to notify systemd of readiness as a startup failure
throw new RuntimeException("sd_notify returned error [" + rc + "]");
}
assert extender != null;
final boolean cancelled = extender.cancel();
assert cancelled;
}
@Override
@ -85,7 +137,6 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
return;
}
final int rc = sd_notify(0, "STOPPING=1");
logger.trace("sd_notify returned [{}]", rc);
if (rc < 0) {
// do not treat failure to notify systemd of stopping as a failure
logger.warn("sd_notify returned error [{}]", rc);

View File

@ -21,20 +21,28 @@ package org.elasticsearch.systemd;
import org.elasticsearch.Build;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.hamcrest.OptionalMatchers;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SystemdPluginTests extends ESTestCase {
@ -42,24 +50,41 @@ public class SystemdPluginTests extends ESTestCase {
private final Build.Type randomNonPackageBuildType =
randomValueOtherThanMany(t -> t == Build.Type.DEB || t == Build.Type.RPM, () -> randomFrom(Build.Type.values()));
final Scheduler.Cancellable extender = mock(Scheduler.Cancellable.class);
final ThreadPool threadPool = mock(ThreadPool.class);
{
when(extender.cancel()).thenReturn(true);
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME)))
.thenReturn(extender);
}
public void testIsEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertTrue(plugin.isEnabled());
assertNotNull(plugin.extender);
}
public void testIsNotPackageDistribution() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertFalse(plugin.isEnabled());
assertNull(plugin.extender);
}
public void testIsImplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null);
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertFalse(plugin.isEnabled());
assertNull(plugin.extender);
}
public void testIsExplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertFalse(plugin.isEnabled());
assertNull(plugin.extender);
}
public void testInvalid() {
@ -75,7 +100,10 @@ public class SystemdPluginTests extends ESTestCase {
runTestOnNodeStarted(
Boolean.TRUE.toString(),
randomIntBetween(0, Integer.MAX_VALUE),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty());
verify(plugin.extender).cancel();
});
}
public void testOnNodeStartedFailure() {
@ -83,7 +111,7 @@ public class SystemdPluginTests extends ESTestCase {
runTestOnNodeStarted(
Boolean.TRUE.toString(),
rc,
maybe -> {
(maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isPresent());
// noinspection OptionalGetWithoutIsPresent
assertThat(maybe.get(), instanceOf(RuntimeException.class));
@ -95,13 +123,13 @@ public class SystemdPluginTests extends ESTestCase {
runTestOnNodeStarted(
Boolean.FALSE.toString(),
randomInt(),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
private void runTestOnNodeStarted(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions) {
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1");
}
@ -109,34 +137,34 @@ public class SystemdPluginTests extends ESTestCase {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(1, Integer.MAX_VALUE),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
public void testCloseFailure() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(Integer.MIN_VALUE, -1),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
public void testCloseNotEnabled() {
runTestClose(
Boolean.FALSE.toString(),
randomInt(),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
private void runTestClose(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions) {
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1");
}
private void runTest(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions,
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions,
final CheckedConsumer<SystemdPlugin, IOException> invocation,
final String expectedState) {
final AtomicBoolean invoked = new AtomicBoolean();
@ -153,16 +181,22 @@ public class SystemdPluginTests extends ESTestCase {
}
};
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertNotNull(plugin.extender);
} else {
assertNull(plugin.extender);
}
boolean success = false;
try {
invocation.accept(plugin);
success = true;
} catch (final Exception e) {
assertions.accept(Optional.of(e));
assertions.accept(Optional.of(e), plugin);
}
if (success) {
assertions.accept(Optional.empty());
assertions.accept(Optional.empty(), plugin);
}
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertTrue(invoked.get());