Remove transport name from tcp channel (#40074)

Currently, we maintain a transport name ("mock-nio", "nio", "netty")
that is passed to a `TcpTransportChannel` when a request is received.
The value of this name is to associate with the task when we register a
task with the task manager. However, it is only possible to run ES with
one transport, so having an implementation specific name is unnecessary.
This commit removes the name and replaces it with the generic
"transport".
This commit is contained in:
Tim Brooks 2019-03-15 11:47:29 -06:00
parent 6ffa8a040d
commit 0b50a670a4
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
6 changed files with 12 additions and 16 deletions

View File

@ -108,7 +108,7 @@ public class Netty4Transport extends TcpTransport {
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) { CircuitBreakerService circuitBreakerService) {
super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings)); Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings); this.workerCount = WORKER_COUNT.get(settings);

View File

@ -64,7 +64,7 @@ public class NioTransport extends TcpTransport {
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) { CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) {
super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
this.groupFactory = groupFactory; this.groupFactory = groupFactory;
} }

View File

@ -129,7 +129,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// connections while no connect operations is going on // connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private volatile BoundTransportAddress boundAddress; private volatile BoundTransportAddress boundAddress;
private final String transportName;
private final MeanMetric readBytesMetric = new MeanMetric(); private final MeanMetric readBytesMetric = new MeanMetric();
private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap(); private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
@ -141,9 +140,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final OutboundHandler outboundHandler; private final OutboundHandler outboundHandler;
private final String nodeName; private final String nodeName;
public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, public TcpTransport(Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { NetworkService networkService) {
this.settings = settings; this.settings = settings;
this.profileSettings = getProfileSettings(settings); this.profileSettings = getProfileSettings(settings);
this.version = version; this.version = version;
@ -152,7 +151,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.pageCacheRecycler = pageCacheRecycler; this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.networkService = networkService; this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger(); this.transportLogger = new TransportLogger();
this.outboundHandler = new OutboundHandler(threadPool, bigArrays, transportLogger); this.outboundHandler = new OutboundHandler(threadPool, bigArrays, transportLogger);
this.handshaker = new TransportHandshaker(version, threadPool, this.handshaker = new TransportHandshaker(version, threadPool,
@ -1023,7 +1021,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} else { } else {
getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
} }
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, transportChannel = new TcpTransportChannel(this, channel, action, requestId, version, features, profileName,
messageLengthBytes, message.isCompress()); messageLengthBytes, message.isCompress());
final TransportRequest request = reg.newRequest(stream); final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
@ -1034,7 +1032,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} catch (Exception e) { } catch (Exception e) {
// the circuit breaker tripped // the circuit breaker tripped
if (transportChannel == null) { if (transportChannel == null) {
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, transportChannel = new TcpTransportChannel(this, channel, action, requestId, version, features,
profileName, 0, message.isCompress()); profileName, 0, message.isCompress());
} }
try { try {

View File

@ -35,12 +35,11 @@ public final class TcpTransportChannel implements TransportChannel {
private final long requestId; private final long requestId;
private final String profileName; private final String profileName;
private final long reservedBytes; private final long reservedBytes;
private final String channelType;
private final TcpChannel channel; private final TcpChannel channel;
private final boolean compressResponse; private final boolean compressResponse;
TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version, TcpTransportChannel(TcpTransport transport, TcpChannel channel, String action, long requestId, Version version, Set<String> features,
Set<String> features, String profileName, long reservedBytes, boolean compressResponse) { String profileName, long reservedBytes, boolean compressResponse) {
this.version = version; this.version = version;
this.features = features; this.features = features;
this.channel = channel; this.channel = channel;
@ -49,7 +48,6 @@ public final class TcpTransportChannel implements TransportChannel {
this.requestId = requestId; this.requestId = requestId;
this.profileName = profileName; this.profileName = profileName;
this.reservedBytes = reservedBytes; this.reservedBytes = reservedBytes;
this.channelType = channelType;
this.compressResponse = compressResponse; this.compressResponse = compressResponse;
} }
@ -91,7 +89,7 @@ public final class TcpTransportChannel implements TransportChannel {
@Override @Override
public String getChannelType() { public String getChannelType() {
return channelType; return "transport";
} }
@Override @Override

View File

@ -164,7 +164,7 @@ public class TcpTransportTests extends ESTestCase {
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName()); ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>(); AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
try { try {
TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, TcpTransport transport = new TcpTransport(Settings.EMPTY, Version.CURRENT, threadPool,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) { PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) {
@Override @Override

View File

@ -75,7 +75,7 @@ public class MockNioTransport extends TcpTransport {
public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) { CircuitBreakerService circuitBreakerService) {
super("mock-nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
} }
@Override @Override