Use set once for systemd extender (#55497)

When Elasticsearch is starting up, we schedule a thread to repeatedly
let systemd know that we are still in the process of starting up. Today
we use a non-final field for this. This commit changes this to be a set
once so we can mark the field as final, and get stronger guarantees when
reasoning about the state of execution here.
This commit is contained in:
Jason Tedor 2020-04-20 21:07:12 -04:00
parent 93a2e9b0f9
commit 80f18ad31a
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
2 changed files with 30 additions and 26 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.systemd;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build; import org.elasticsearch.Build;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -79,7 +80,7 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
enabled = Boolean.TRUE.toString().equals(esSDNotify); enabled = Boolean.TRUE.toString().equals(esSDNotify);
} }
Scheduler.Cancellable extender; final SetOnce<Scheduler.Cancellable> extender = new SetOnce<>();
@Override @Override
public Collection<Object> createComponents( public Collection<Object> createComponents(
@ -94,15 +95,18 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
final NamedWriteableRegistry namedWriteableRegistry, final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver expressionResolver, final IndexNameExpressionResolver expressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier) { final Supplier<RepositoriesService> repositoriesServiceSupplier) {
if (enabled) { if (enabled == false) {
extender.set(null);
return Collections.emptyList();
}
/* /*
* Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the * Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the
* READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then * READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then we
* we need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the * need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the timeout.
* timeout. Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds. * Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds. We will cancel
* We will cancel this scheduled task after we successfully notify systemd that we are ready. * this scheduled task after we successfully notify systemd that we are ready.
*/ */
extender = threadPool.scheduleWithFixedDelay( extender.set(threadPool.scheduleWithFixedDelay(
() -> { () -> {
final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000"); final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000");
if (rc < 0) { if (rc < 0) {
@ -110,8 +114,7 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
} }
}, },
TimeValue.timeValueSeconds(15), TimeValue.timeValueSeconds(15),
ThreadPool.Names.SAME); ThreadPool.Names.SAME));
}
return Collections.emptyList(); return Collections.emptyList();
} }
@ -124,6 +127,7 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
@Override @Override
public void onNodeStarted() { public void onNodeStarted() {
if (enabled == false) { if (enabled == false) {
assert extender.get() == null;
return; return;
} }
final int rc = sd_notify(0, "READY=1"); final int rc = sd_notify(0, "READY=1");
@ -131,8 +135,8 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
// treat failure to notify systemd of readiness as a startup failure // treat failure to notify systemd of readiness as a startup failure
throw new RuntimeException("sd_notify returned error [" + rc + "]"); throw new RuntimeException("sd_notify returned error [" + rc + "]");
} }
assert extender != null; assert extender.get() != null;
final boolean cancelled = extender.cancel(); final boolean cancelled = extender.get().cancel();
assert cancelled; assert cancelled;
} }

View File

@ -63,28 +63,28 @@ public class SystemdPluginTests extends ESTestCase {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString()); final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null); plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null);
assertTrue(plugin.isEnabled()); assertTrue(plugin.isEnabled());
assertNotNull(plugin.extender); assertNotNull(plugin.extender.get());
} }
public void testIsNotPackageDistribution() { public void testIsNotPackageDistribution() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString()); final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null); plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null);
assertFalse(plugin.isEnabled()); assertFalse(plugin.isEnabled());
assertNull(plugin.extender); assertNull(plugin.extender.get());
} }
public void testIsImplicitlyNotEnabled() { public void testIsImplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null); final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null);
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null); plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null);
assertFalse(plugin.isEnabled()); assertFalse(plugin.isEnabled());
assertNull(plugin.extender); assertNull(plugin.extender.get());
} }
public void testIsExplicitlyNotEnabled() { public void testIsExplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString()); final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null); plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null, null);
assertFalse(plugin.isEnabled()); assertFalse(plugin.isEnabled());
assertNull(plugin.extender); assertNull(plugin.extender.get());
} }
public void testInvalid() { public void testInvalid() {
@ -102,7 +102,7 @@ public class SystemdPluginTests extends ESTestCase {
randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE),
(maybe, plugin) -> { (maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty()); assertThat(maybe, OptionalMatchers.isEmpty());
verify(plugin.extender).cancel(); verify(plugin.extender.get()).cancel();
}); });
} }
@ -185,7 +185,7 @@ public class SystemdPluginTests extends ESTestCase {
if (Boolean.TRUE.toString().equals(esSDNotify)) { if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertNotNull(plugin.extender); assertNotNull(plugin.extender);
} else { } else {
assertNull(plugin.extender); assertNull(plugin.extender.get());
} }
boolean success = false; boolean success = false;