diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index c50825d00a0..b204d818c68 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -158,9 +158,9 @@ public class InboundHandler { final long requestId = message.getRequestId(); final StreamInput stream = message.getStreamInput(); final Version version = message.getVersion(); - messageListener.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { + messageListener.onRequestReceived(requestId, action); if (message.isHandshake()) { handshaker.handleHandshake(version, features, channel, requestId, stream); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 18d22f47c5c..f4eed061c7f 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -65,8 +65,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -77,7 +77,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; - private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); + private final AtomicBoolean handleIncomingRequests = new AtomicBoolean(); private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); protected final Transport transport; protected final ConnectionManager connectionManager; @@ -294,7 +294,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran * this method is called */ public final void acceptIncomingRequests() { - blockIncomingRequestsLatch.countDown(); + handleIncomingRequests.set(true); } public TransportInfo info() { @@ -887,11 +887,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran */ @Override public void onRequestReceived(long requestId, String action) { - try { - blockIncomingRequestsLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("interrupted while waiting for incoming requests block to be removed"); + if (handleIncomingRequests.get() == false) { + throw new IllegalStateException("transport not ready yet to handle incoming requests"); } if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 360f87d13d6..d5cc128f116 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1995,8 +1995,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node)); - assertTrue(exception.getCause() instanceof TransportException); - assertEquals("handshake failed because connection reset", exception.getCause().getMessage()); + assertThat(exception.getCause(), instanceOf(IllegalStateException.class)); + assertEquals("handshake failed", exception.getCause().getMessage()); } ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);