From 24873dd3e382c7046f31e5f0f0fcb3cc752bc096 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 29 Jul 2019 11:34:48 +0200 Subject: [PATCH] Do not block transport thread on startup (#44939) We currently block the transport thread on startup, which has caused test failures. I think this is some kind of deadlock situation. I don't think we should even block a transport thread, and there's also no need to do so. We can just reject requests as long we're not fully set up. Note that the HTTP layer is only started much later (after we've completed full start up of the transport layer), so that one should be completely unaffected by this. Closes #41745 --- .../org/elasticsearch/transport/InboundHandler.java | 2 +- .../elasticsearch/transport/TransportService.java | 13 +++++-------- .../transport/AbstractSimpleTransportTestCase.java | 4 ++-- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index c50825d00a0..b204d818c68 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -158,9 +158,9 @@ public class InboundHandler { final long requestId = message.getRequestId(); final StreamInput stream = message.getStreamInput(); final Version version = message.getVersion(); - messageListener.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { + messageListener.onRequestReceived(requestId, action); if (message.isHandshake()) { handshaker.handleHandshake(version, features, channel, requestId, stream); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 18d22f47c5c..f4eed061c7f 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -65,8 +65,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -77,7 +77,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; - private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); + private final AtomicBoolean handleIncomingRequests = new AtomicBoolean(); private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); protected final Transport transport; protected final ConnectionManager connectionManager; @@ -294,7 +294,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran * this method is called */ public final void acceptIncomingRequests() { - blockIncomingRequestsLatch.countDown(); + handleIncomingRequests.set(true); } public TransportInfo info() { @@ -887,11 +887,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran */ @Override public void onRequestReceived(long requestId, String action) { - try { - blockIncomingRequestsLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("interrupted while waiting for incoming requests block to be removed"); + if (handleIncomingRequests.get() == false) { + throw new IllegalStateException("transport not ready yet to handle incoming requests"); } if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 360f87d13d6..d5cc128f116 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1995,8 +1995,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node)); - assertTrue(exception.getCause() instanceof TransportException); - assertEquals("handshake failed because connection reset", exception.getCause().getMessage()); + assertThat(exception.getCause(), instanceOf(IllegalStateException.class)); + assertEquals("handshake failed", exception.getCause().getMessage()); } ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);