Wait for Netty threads after Security tests (elastic/x-pack-elasticsearch#4390)

This commit adds waits for two Netty threads that run in the background
and require explicit calls to await inactivity in them. This shows up
as an issue in security tests since we always use netty and do not have
these waits so these threads can trip the thread leak detector.

relates elastic/x-pack-elasticsearch#4367

Original commit: elastic/x-pack-elasticsearch@76d84553ba
This commit is contained in:
Jay Modi 2018-04-19 09:54:25 -06:00 committed by GitHub
parent 502bc6c572
commit 61542a5543
3 changed files with 65 additions and 0 deletions

View File

@ -5,12 +5,17 @@
*/
package org.elasticsearch.xpack.core;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.security.SecurityField;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ESTestCase.getTestTransportPlugin;
@ -28,4 +33,22 @@ public class TestXPackTransportClient extends TransportClient {
public TestXPackTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
super(settings, Settings.EMPTY, addPlugins(plugins, getTestTransportPlugin()), null);
}
@Override
public void close() {
super.close();
if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings) == false
|| NetworkModule.TRANSPORT_TYPE_SETTING.get(settings).equals(SecurityField.NAME4)) {
try {
GlobalEventExecutor.INSTANCE.awaitInactivity(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
ThreadDeathWatcher.awaitInactivity(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.test;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -160,6 +162,24 @@ public abstract class SecurityIntegTestCase extends ESIntegTestCase {
public static void destroyDefaultSettings() {
SECURITY_DEFAULT_SETTINGS = null;
customSecuritySettingsSource = null;
// Wait for the network threads to finish otherwise there is the possibility that one of
// the threads lingers and trips the thread leak detector
try {
GlobalEventExecutor.INSTANCE.awaitInactivity(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IllegalStateException e) {
if (e.getMessage().equals("thread was not started") == false) {
throw e;
}
// ignore since the thread was never started
}
try {
ThreadDeathWatcher.awaitInactivity(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Rule

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.test;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -31,6 +33,7 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
@ -93,6 +96,25 @@ public abstract class SecuritySingleNodeTestCase extends ESSingleNodeTestCase {
IOUtils.closeWhileHandlingException(restClient);
restClient = null;
}
// Wait for the network threads to finish otherwise there is the possibility that one of
// the threads lingers and trips the thread leak detector
try {
GlobalEventExecutor.INSTANCE.awaitInactivity(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IllegalStateException e) {
if (e.getMessage().equals("thread was not started") == false) {
throw e;
}
// ignore since the thread was never started
}
try {
ThreadDeathWatcher.awaitInactivity(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Rule