Notify systemd when Elasticsearch is ready (#44673)

Today our systemd service defaults to a service type of simple. This
means that systemd assumes Elasticsearch is ready as soon as the
ExecStart (bin/elasticsearch) process is forked off. This means that the
service appears ready long before it actually is, so before it is ready
to receive requests. It also means that services that want to depend on
Elasticsearch being ready to start can not as there is not a reliable
mechanism to determine this. This commit changes the service type to
notify. This requires that Elasticsearch sends a notification message
via libsystemd sd_notify method. This commit does that by using JNA to
invoke this native method. Additionally, we use this integration to also
notify systemd when we are stopping.
This commit is contained in:
Jason Tedor 2019-07-23 22:03:48 -07:00
parent 27440b7692
commit 659ebf6cfb
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
8 changed files with 384 additions and 0 deletions

View File

@ -68,6 +68,7 @@ task buildOssNoJdkNotice(type: NoticeTask) {
*****************************************************************************/
String ossOutputs = 'build/outputs/oss'
String defaultOutputs = 'build/outputs/default'
String systemdOutputs = 'build/outputs/systemd'
String transportOutputs = 'build/outputs/transport-only'
task processOssOutputs(type: Sync) {
@ -79,6 +80,10 @@ task processDefaultOutputs(type: Sync) {
from processOssOutputs
}
task processSystemdOutputs(type: Sync) {
into systemdOutputs
}
// Integ tests work over the rest http layer, so we need a transport included with the integ test zip.
// All transport modules are included so that they may be randomized for testing
task processTransportOutputs(type: Sync) {
@ -110,6 +115,10 @@ task buildDefaultConfig {
dependsOn processDefaultOutputs
outputs.dir "${defaultOutputs}/config"
}
task buildSystemdModule {
dependsOn processSystemdOutputs
outputs.dir "${systemdOutputs}/modules"
}
task buildTransportModules {
dependsOn processTransportOutputs
outputs.dir "${transportOutputs}/modules"
@ -186,6 +195,10 @@ ext.restTestExpansions = [
// we create the buildOssModules task above but fill it here so we can do a single
// loop over modules to also setup cross task dependencies and increment our modules counter
project.rootProject.subprojects.findAll { it.parent.path == ':modules' }.each { Project module ->
if (module.name == 'systemd') {
// the systemd module is only included in the package distributions
return
}
File licenses = new File(module.projectDir, 'licenses')
if (licenses.exists()) {
buildDefaultNotice.licensesDir licenses
@ -218,6 +231,8 @@ xpack.subprojects.findAll { it.parent == xpack }.each { Project xpackModule ->
copyLog4jProperties(buildDefaultLog4jConfig, xpackModule)
}
copyModule(processSystemdOutputs, project(':modules:systemd'))
// make sure we have a clean task since we aren't a java project, but we have tasks that
// put stuff in the build dir
task clean(type: Delete) {
@ -285,6 +300,9 @@ configure(subprojects.findAll { ['archives', 'packages'].contains(it.name) }) {
exclude "**/platform/${excludePlatform}-x86_64/**"
}
}
if (project.path.startsWith(':distribution:packages')) {
from(project(':distribution').buildSystemdModule)
}
}
}

View File

@ -5,11 +5,13 @@ Wants=network-online.target
After=network-online.target
[Service]
Type=notify
RuntimeDirectory=elasticsearch
PrivateTmp=true
Environment=ES_HOME=/usr/share/elasticsearch
Environment=ES_PATH_CONF=${path.conf}
Environment=PID_DIR=/var/run/elasticsearch
Environment=ES_SD_NOTIFY=true
EnvironmentFile=-${path.env}
WorkingDirectory=/usr/share/elasticsearch

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
esplugin {
description 'Integrates Elasticsearch with systemd'
classname 'org.elasticsearch.systemd.SystemdPlugin'
}
integTest.enabled = false

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.systemd;
import com.sun.jna.Native;
import java.security.AccessController;
import java.security.PrivilegedAction;
/**
* Provides access to the native method sd_notify from libsystemd.
*/
class Libsystemd {
static {
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
Native.register(Libsystemd.class, "libsystemd.so.0");
return null;
});
}
/**
* Notify systemd of state changes.
*
* @param unset_environment if non-zero, the NOTIFY_SOCKET environment variable will be unset before returning and further calls to
* sd_notify will fail
* @param state a new-line separated list of variable assignments; some assignments are understood directly by systemd
* @return a negative error code on failure, and positive if status was successfully sent
*/
static native int sd_notify(int unset_environment, String state);
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.systemd;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Assertions;
import org.elasticsearch.Build;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
public class SystemdPlugin extends Plugin implements ClusterPlugin {
private static final Logger logger = LogManager.getLogger(SystemdPlugin.class);
private final boolean enabled;
final boolean isEnabled() {
return enabled;
}
@SuppressWarnings("unused")
public SystemdPlugin() {
this(true, Constants.LINUX, System.getenv("ES_SD_NOTIFY"));
}
SystemdPlugin(final boolean assertIsPackageDistribution, final boolean isLinux, final String esSDNotify) {
if (Assertions.ENABLED && assertIsPackageDistribution) {
// our build is configured to only include this module in the package distributions
assert Build.CURRENT.type() == Build.Type.DEB || Build.CURRENT.type() == Build.Type.RPM : Build.CURRENT.type();
}
if (isLinux == false || esSDNotify == null) {
enabled = false;
return;
}
if (Boolean.TRUE.toString().equals(esSDNotify) == false && Boolean.FALSE.toString().equals(esSDNotify) == false) {
throw new RuntimeException("ES_SD_NOTIFY set to unexpected value [" + esSDNotify + "]");
}
enabled = Boolean.TRUE.toString().equals(esSDNotify);
}
int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) {
return Libsystemd.sd_notify(unset_environment, state);
}
@Override
public void onNodeStarted() {
if (enabled == false) {
return;
}
final int rc = sd_notify(0, "READY=1");
logger.trace("sd_notify returned [{}]", rc);
if (rc < 0) {
// treat failure to notify systemd of readiness as a startup failure
throw new RuntimeException("sd_notify returned error [" + rc + "]");
}
}
@Override
public void close() {
if (enabled == false) {
return;
}
final int rc = sd_notify(0, "STOPPING=1");
logger.trace("sd_notify returned [{}]", rc);
if (rc < 0) {
// do not treat failure to notify systemd of stopping as a failure
logger.warn("sd_notify returned error [{}]", rc);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
grant codeBase "${codebase.systemd}" {
// for registering native methods
permission java.lang.RuntimePermission "accessDeclaredMembers";
};

View File

@ -0,0 +1,171 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.systemd;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.hamcrest.OptionalMatchers;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
public class SystemdPluginTests extends ESTestCase {
public void testIsEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, true, Boolean.TRUE.toString());
assertTrue(plugin.isEnabled());
}
public void testIsNotLinux() {
final SystemdPlugin plugin = new SystemdPlugin(false, false, Boolean.TRUE.toString());
assertFalse(plugin.isEnabled());
}
public void testIsImplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, true, null);
assertFalse(plugin.isEnabled());
}
public void testIsExplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, true, Boolean.FALSE.toString());
assertFalse(plugin.isEnabled());
}
public void testInvalid() {
final String esSDNotify = randomValueOtherThanMany(
s -> Boolean.TRUE.toString().equals(s) || Boolean.FALSE.toString().equals(s),
() -> randomAlphaOfLength(4));
final RuntimeException e = expectThrows(RuntimeException.class,
() -> new SystemdPlugin(false, true, esSDNotify));
assertThat(e, hasToString(containsString("ES_SD_NOTIFY set to unexpected value [" + esSDNotify + "]")));
}
public void testOnNodeStartedSuccess() {
runTestOnNodeStarted(
Boolean.TRUE.toString(),
randomIntBetween(0, Integer.MAX_VALUE),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
public void testOnNodeStartedFailure() {
final int rc = randomIntBetween(Integer.MIN_VALUE, -1);
runTestOnNodeStarted(
Boolean.TRUE.toString(),
rc,
maybe -> {
assertThat(maybe, OptionalMatchers.isPresent());
// noinspection OptionalGetWithoutIsPresent
assertThat(maybe.get(), instanceOf(RuntimeException.class));
assertThat(maybe.get(), hasToString(containsString("sd_notify returned error [" + rc + "]")));
});
}
public void testOnNodeStartedNotEnabled() {
runTestOnNodeStarted(
Boolean.FALSE.toString(),
randomInt(),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
private void runTestOnNodeStarted(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1");
}
public void testCloseSuccess() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(1, Integer.MAX_VALUE),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
public void testCloseFailure() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(Integer.MIN_VALUE, -1),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
public void testCloseNotEnabled() {
runTestClose(
Boolean.FALSE.toString(),
randomInt(),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
private void runTestClose(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1");
}
private void runTest(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions,
final CheckedConsumer<SystemdPlugin, IOException> invocation,
final String expectedState) {
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicInteger invokedUnsetEnvironment = new AtomicInteger();
final AtomicReference<String> invokedState = new AtomicReference<>();
final SystemdPlugin plugin = new SystemdPlugin(false, true, esSDNotify) {
@Override
int sd_notify(final int unset_environment, final String state) {
invoked.set(true);
invokedUnsetEnvironment.set(unset_environment);
invokedState.set(state);
return rc;
}
};
boolean success = false;
try {
invocation.accept(plugin);
success = true;
} catch (final Exception e) {
assertions.accept(Optional.of(e));
}
if (success) {
assertions.accept(Optional.empty());
}
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertTrue(invoked.get());
assertThat(invokedUnsetEnvironment.get(), equalTo(0));
assertThat(invokedState.get(), equalTo(expectedState));
} else {
assertFalse(invoked.get());
}
}
}

View File

@ -52,6 +52,11 @@ grant codeBase "${codebase.elasticsearch-plugin-classloader}" {
permission java.lang.RuntimePermission "createClassLoader";
};
grant codeBase "${codebase.jna}" {
// for registering native methods
permission java.lang.RuntimePermission "accessDeclaredMembers";
};
//// Everything else:
grant {
@ -143,4 +148,5 @@ grant {
permission java.io.FilePermission "/sys/fs/cgroup/cpuacct/-", "read";
permission java.io.FilePermission "/sys/fs/cgroup/memory", "read";
permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read";
};