diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 3a37cafe9bb..6d2dc45ec91 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -92,7 +92,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private final TimeValue initialConnectionTimeout; - private SetOnce remoteClusterName = new SetOnce<>(); + private final SetOnce remoteClusterName = new SetOnce<>(); /** * Creates a new {@link RemoteClusterConnection} diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index e81fb9c380e..f89692caa73 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -22,13 +22,12 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; + import java.io.Closeable; import java.io.IOException; import java.net.UnknownHostException; @@ -74,10 +73,6 @@ public interface Transport extends LifecycleComponent { */ List getDefaultSeedAddresses(); - default CircuitBreaker getInFlightRequestBreaker() { - return new NoopCircuitBreaker("in-flight-noop"); - } - /** * Opens a new connection to the given node. When the connection is fully connected, the listener is called. * The ActionListener will be called on the calling thread or the generic thread pool. diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java deleted file mode 100644 index 6b45feec948..00000000000 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; - -/** - * Base class for delegating transport response to a transport channel - */ -public class TransportChannelResponseHandler implements TransportResponseHandler { - - private final Logger logger; - private final TransportChannel channel; - private final String extraInfoOnError; - private final Writeable.Reader reader; - - public TransportChannelResponseHandler(Logger logger, TransportChannel channel, String extraInfoOnError, - Writeable.Reader reader) { - this.logger = logger; - this.channel = channel; - this.extraInfoOnError = extraInfoOnError; - this.reader = reader; - } - - @Override - public T read(StreamInput in) throws IOException { - return reader.read(in); - } - - @Override - public void handleResponse(T response) { - try { - channel.sendResponse(response); - } catch (IOException e) { - handleException(new TransportException(e)); - } - } - - @Override - public void handleException(TransportException exp) { - try { - channel.sendResponse(exp); - } catch (IOException e) { - logger.debug(() -> new ParameterizedMessage( - "failed to send failure {}", extraInfoOnError == null ? "" : "(" + extraInfoOnError + ")"), e); - } - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } -} diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 585a9339881..18d22f47c5c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -732,7 +732,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { - final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, this, threadPool); + final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool); try { onRequestSent(localNode, requestId, action, request, options); onRequestReceived(requestId, action); @@ -1141,16 +1141,13 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } static class DirectResponseChannel implements TransportChannel { - final Logger logger; final DiscoveryNode localNode; private final String action; private final long requestId; final TransportService service; final ThreadPool threadPool; - DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, TransportService service, - ThreadPool threadPool) { - this.logger = logger; + DirectResponseChannel(DiscoveryNode localNode, String action, long requestId, TransportService service, ThreadPool threadPool) { this.localNode = localNode; this.action = action; this.requestId = requestId;