From 0b50a670a413575fc6bf6661fc046becee354993 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 15 Mar 2019 11:47:29 -0600 Subject: [PATCH] Remove transport name from tcp channel (#40074) Currently, we maintain a transport name ("mock-nio", "nio", "netty") that is passed to a `TcpTransportChannel` when a request is received. The value of this name is to associate with the task when we register a task with the task manager. However, it is only possible to run ES with one transport, so having an implementation specific name is unnecessary. This commit removes the name and replaces it with the generic "transport". --- .../transport/netty4/Netty4Transport.java | 2 +- .../elasticsearch/transport/nio/NioTransport.java | 2 +- .../org/elasticsearch/transport/TcpTransport.java | 12 +++++------- .../elasticsearch/transport/TcpTransportChannel.java | 8 +++----- .../elasticsearch/transport/TcpTransportTests.java | 2 +- .../transport/nio/MockNioTransport.java | 2 +- 6 files changed, 12 insertions(+), 16 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 4823bbbc3d7..248aabf148a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -108,7 +108,7 @@ public class Netty4Transport extends TcpTransport { public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings)); this.workerCount = WORKER_COUNT.get(settings); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index a9a1c716ef8..2f30143f189 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -64,7 +64,7 @@ public class NioTransport extends TcpTransport { protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) { - super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); this.groupFactory = groupFactory; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 170b20cba0b..9b0a1dafd14 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -129,7 +129,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // connections while no connect operations is going on private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private volatile BoundTransportAddress boundAddress; - private final String transportName; private final MeanMetric readBytesMetric = new MeanMetric(); private volatile Map> requestHandlers = Collections.emptyMap(); @@ -141,9 +140,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final OutboundHandler outboundHandler; private final String nodeName; - public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, - PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { + public TcpTransport(Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, + CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, + NetworkService networkService) { this.settings = settings; this.profileSettings = getProfileSettings(settings); this.version = version; @@ -152,7 +151,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements this.pageCacheRecycler = pageCacheRecycler; this.circuitBreakerService = circuitBreakerService; this.networkService = networkService; - this.transportName = transportName; this.transportLogger = new TransportLogger(); this.outboundHandler = new OutboundHandler(threadPool, bigArrays, transportLogger); this.handshaker = new TransportHandshaker(version, threadPool, @@ -1023,7 +1021,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } else { getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); } - transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, + transportChannel = new TcpTransportChannel(this, channel, action, requestId, version, features, profileName, messageLengthBytes, message.isCompress()); final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); @@ -1034,7 +1032,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } catch (Exception e) { // the circuit breaker tripped if (transportChannel == null) { - transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, + transportChannel = new TcpTransportChannel(this, channel, action, requestId, version, features, profileName, 0, message.isCompress()); } try { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index c2a704428d3..b45fc19c762 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -35,12 +35,11 @@ public final class TcpTransportChannel implements TransportChannel { private final long requestId; private final String profileName; private final long reservedBytes; - private final String channelType; private final TcpChannel channel; private final boolean compressResponse; - TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version, - Set features, String profileName, long reservedBytes, boolean compressResponse) { + TcpTransportChannel(TcpTransport transport, TcpChannel channel, String action, long requestId, Version version, Set features, + String profileName, long reservedBytes, boolean compressResponse) { this.version = version; this.features = features; this.channel = channel; @@ -49,7 +48,6 @@ public final class TcpTransportChannel implements TransportChannel { this.requestId = requestId; this.profileName = profileName; this.reservedBytes = reservedBytes; - this.channelType = channelType; this.compressResponse = compressResponse; } @@ -91,7 +89,7 @@ public final class TcpTransportChannel implements TransportChannel { @Override public String getChannelType() { - return channelType; + return "transport"; } @Override diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index a25ac2a551a..9d038032443 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -164,7 +164,7 @@ public class TcpTransportTests extends ESTestCase { ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName()); AtomicReference messageCaptor = new AtomicReference<>(); try { - TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, + TcpTransport transport = new TcpTransport(Settings.EMPTY, Version.CURRENT, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) { @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 0282e3dcc6a..537bfd3aefd 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -75,7 +75,7 @@ public class MockNioTransport extends TcpTransport { public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("mock-nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); } @Override