Do not block transport thread on startup (#44939)

We currently block the transport thread on startup, which has caused test failures. I think this is
some kind of deadlock situation. I don't think we should even block a transport thread, and
there's also no need to do so. We can just reject requests as long we're not fully set up. Note
that the HTTP layer is only started much later (after we've completed full start up of the
transport layer), so that one should be completely unaffected by this.

Closes #41745
This commit is contained in:
Yannick Welsch 2019-07-29 11:34:48 +02:00
parent f5efafd4d6
commit 24873dd3e3
3 changed files with 8 additions and 11 deletions

View File

@ -158,9 +158,9 @@ public class InboundHandler {
final long requestId = message.getRequestId();
final StreamInput stream = message.getStreamInput();
final Version version = message.getVersion();
messageListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
messageListener.onRequestReceived(requestId, action);
if (message.isHandshake()) {
handshaker.handleHandshake(version, features, channel, requestId, stream);
} else {

View File

@ -65,8 +65,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -77,7 +77,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";
private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1);
private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
protected final Transport transport;
protected final ConnectionManager connectionManager;
@ -294,7 +294,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
* this method is called
*/
public final void acceptIncomingRequests() {
blockIncomingRequestsLatch.countDown();
handleIncomingRequests.set(true);
}
public TransportInfo info() {
@ -887,11 +887,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
*/
@Override
public void onRequestReceived(long requestId, String action) {
try {
blockIncomingRequestsLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("interrupted while waiting for incoming requests block to be removed");
if (handleIncomingRequests.get() == false) {
throw new IllegalStateException("transport not ready yet to handle incoming requests");
}
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] received request", requestId, action);

View File

@ -1995,8 +1995,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node));
assertTrue(exception.getCause() instanceof TransportException);
assertEquals("handshake failed because connection reset", exception.getCause().getMessage());
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
assertEquals("handshake failed", exception.getCause().getMessage());
}
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);