Close connection manager on current thread in RemoteClusterConnection (#44805)

The problem is that RemoteClusterConnection closes the connection manager asynchronously, which races with the threadpool being shutdown at the end of the test.

Closes #44339
Closes #44610
This commit is contained in:
Yannick Welsch 2019-07-24 17:43:18 +02:00
parent 9944e193f9
commit e0d4544ef6
4 changed files with 24 additions and 21 deletions

View File

@ -58,6 +58,15 @@ public class ConnectionManager implements Closeable {
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") { private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
@Override @Override
protected void closeInternal() { protected void closeInternal() {
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
closeLatch.countDown(); closeLatch.countDown();
} }
}; };
@ -249,23 +258,24 @@ public class ConnectionManager implements Closeable {
@Override @Override
public void close() { public void close() {
internalClose(true);
}
public void closeNoBlock() {
internalClose(false);
}
private void internalClose(boolean waitForPendingConnections) {
assert Transports.assertNotTransportThread("Closing ConnectionManager"); assert Transports.assertNotTransportThread("Closing ConnectionManager");
if (closing.compareAndSet(false, true)) { if (closing.compareAndSet(false, true)) {
connectingRefCounter.decRef(); connectingRefCounter.decRef();
if (waitForPendingConnections) {
try { try {
closeLatch.await(); closeLatch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
} }
} }
} }

View File

@ -319,8 +319,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
@Override @Override
public void close() throws IOException { public void close() throws IOException {
IOUtils.close(connectHandler); IOUtils.close(connectHandler);
// In the ConnectionManager we wait on connections being closed. connectionManager.closeNoBlock();
threadPool.generic().execute(connectionManager::close);
} }
public boolean isClosed() { public boolean isClosed() {

View File

@ -94,6 +94,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.iterableWithSize;
@ -101,7 +102,6 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.Matchers.endsWith;
public class RemoteClusterConnectionTests extends ESTestCase { public class RemoteClusterConnectionTests extends ESTestCase {

View File

@ -1,5 +1,3 @@
import org.elasticsearch.gradle.OS
evaluationDependsOn(xpackModule('core')) evaluationDependsOn(xpackModule('core'))
apply plugin: 'elasticsearch.esplugin' apply plugin: 'elasticsearch.esplugin'
@ -24,8 +22,6 @@ task internalClusterTestNoSecurityManager(type: Test) {
include noSecurityManagerITClasses include noSecurityManagerITClasses
systemProperty 'es.set.netty.runtime.available.processors', 'false' systemProperty 'es.set.netty.runtime.available.processors', 'false'
systemProperty 'tests.security.manager', 'false' systemProperty 'tests.security.manager', 'false'
// Disable tests on windows https://github.com/elastic/elasticsearch/issues/44610
onlyIf { OS.WINDOWS.equals(OS.current()) == false }
} }
// Instead we create a separate task to run the // Instead we create a separate task to run the
@ -38,8 +34,6 @@ task internalClusterTest(type: Test) {
include '**/*IT.class' include '**/*IT.class'
exclude noSecurityManagerITClasses exclude noSecurityManagerITClasses
systemProperty 'es.set.netty.runtime.available.processors', 'false' systemProperty 'es.set.netty.runtime.available.processors', 'false'
// Disable tests on windows https://github.com/elastic/elasticsearch/issues/44610
onlyIf { OS.WINDOWS.equals(OS.current()) == false }
} }
check.dependsOn internalClusterTest check.dependsOn internalClusterTest