Remove internal channel tracking in transports (#27711)

This commit attempts to continue unifying the logic between different
transport implementations. As transports call a `TcpTransport` callback
when a new channel is accepted, there is no need to internally track
channels accepted. Instead there is a set of accepted channels in
`TcpTransport`. This set is used for metrics and shutting down channels.
This commit is contained in:
Tim Brooks 2017-12-08 16:56:53 -07:00 committed by GitHub
parent f50f99ef11
commit d1acb7697b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 52 additions and 177 deletions

View File

@ -195,22 +195,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;
protected volatile TransportService transportService;
// node id to actual channel
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private volatile TransportService transportService;
protected final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, List<TcpChannel>> serverChannels = newConcurrentMap();
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected final KeyedLock<String> connectionLock = new KeyedLock<>();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final NamedWriteableRegistry namedWriteableRegistry;
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
protected final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
protected final boolean compress;
protected volatile BoundTransportAddress boundAddress;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;
@ -438,7 +438,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
@Override
public void close() throws IOException {
public void close() {
if (closed.compareAndSet(false, true)) {
try {
if (lifecycle.stopped()) {
@ -582,7 +582,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
@ -602,6 +602,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
connectionFutures.add(connectFuture);
TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
channels.add(channel);
} catch (Exception e) {
// If there was an exception when attempting to instantiate the raw channels, we close all of the channels
@ -1041,6 +1042,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
boolean addedOnThisCall = acceptedChannels.add(channel);
assert addedOnThisCall : "Channel should only be added to accept channel set once";
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
}
/**
@ -1738,15 +1740,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
/**
* Returns count of currently open connections
*/
protected abstract long getNumOpenServerConnections();
@Override
public final TransportStats getStats() {
return new TransportStats(
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
return new TransportStats(acceptedChannels.size(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
transmittedBytesMetric.sum());
}

View File

@ -191,11 +191,6 @@ public class TcpTransportTests extends ESTestCase {
return new FakeChannel(messageCaptor);
}
@Override
public long getNumOpenServerConnections() {
return 0;
}
@Override
public NodeChannels getConnection(DiscoveryNode node) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();

View File

@ -104,8 +104,6 @@ public class Netty4Transport extends TcpTransport {
protected final int workerCount;
protected final ByteSizeValue receivePredictorMin;
protected final ByteSizeValue receivePredictorMax;
// package private for testing
volatile Netty4OpenChannelsHandler serverOpenChannels;
protected volatile Bootstrap bootstrap;
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
@ -132,8 +130,6 @@ public class Netty4Transport extends TcpTransport {
try {
bootstrap = createBootstrap();
if (NetworkService.NETWORK_SERVER.get(settings)) {
final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings);
bindServer(profileSettings);
@ -242,12 +238,6 @@ public class Netty4Transport extends TcpTransport {
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
}
@Override
public long getNumOpenServerConnections() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return channels == null ? 0 : channels.numberOfOpenChannels();
}
@Override
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> listener)
throws IOException {
@ -294,7 +284,7 @@ public class Netty4Transport extends TcpTransport {
@Override
@SuppressForbidden(reason = "debug")
protected void stopInternal() {
Releasables.close(serverOpenChannels, () -> {
Releasables.close(() -> {
final List<Tuple<String, Future<?>>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size());
for (final Map.Entry<String, ServerBootstrap> entry : serverBootstraps.entrySet()) {
serverBootstrapCloseFutures.add(
@ -349,7 +339,6 @@ public class Netty4Transport extends TcpTransport {
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
serverAcceptedChannel(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}

View File

@ -96,7 +96,7 @@ public class NettyTcpChannel implements TcpChannel {
}
});
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);
if (channel.eventLoop().isShutdown()) {
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
@ -105,4 +105,12 @@ public class NettyTcpChannel implements TcpChannel {
public Channel getLowLevelChannel() {
return channel;
}
@Override
public String toString() {
return "NettyTcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
'}';
}
}

View File

@ -109,6 +109,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
protected int channelsPerNodeConnection() {
return 13;
}
@Override
@Before
public void setUp() throws Exception {
@ -2345,6 +2349,24 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
}
public void testAcceptedChannelCount() throws Exception {
assertBusy(() -> {
TransportStats transportStats = serviceA.transport.getStats();
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
});
assertBusy(() -> {
TransportStats transportStats = serviceB.transport.getStats();
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
});
serviceA.close();
assertBusy(() -> {
TransportStats transportStats = serviceB.transport.getStats();
assertEquals(0, transportStats.getServerOpen());
});
}
public void testTransportStatsWithException() throws Exception {
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
CountDownLatch receivedLatch = new CountDownLatch(1);

View File

@ -217,11 +217,6 @@ public class MockTcpTransport extends TcpTransport {
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}
@Override
public long getNumOpenServerConnections() {
return 1;
}
public final class MockChannel implements Closeable, TcpChannel {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;

View File

@ -34,17 +34,12 @@ public class NioShutdown {
this.logger = logger;
}
void orderlyShutdown(OpenChannels openChannels, ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {
// Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections
openChannels.closeServerChannels();
void orderlyShutdown(ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {
for (AcceptingSelector acceptor : acceptors) {
shutdownSelector(acceptor);
}
openChannels.close();
for (SocketSelector selector : socketSelectors) {
shutdownSelector(selector);
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpChannelFactory;
@ -70,7 +69,6 @@ public class NioTransport extends TcpTransport {
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
private final OpenChannels openChannels = new OpenChannels(logger);
private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
@ -86,27 +84,17 @@ public class NioTransport extends TcpTransport {
this.pageCacheRecycler = pageCacheRecycler;
}
@Override
public long getNumOpenServerConnections() {
return openChannels.serverChannelsCount();
}
@Override
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector);
openChannels.serverChannelOpened(serverChannel);
serverChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(serverChannel)));
return serverChannel;
return channelFactory.openNioServerSocketChannel(address, selector);
}
@Override
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
openChannels.clientChannelOpened(channel);
channel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
channel.addConnectListener(connectListener);
return channel;
}
@ -175,7 +163,7 @@ public class NioTransport extends TcpTransport {
@Override
protected void stopInternal() {
NioShutdown nioShutdown = new NioShutdown(logger);
nioShutdown.orderlyShutdown(openChannels, acceptors, socketSelectors);
nioShutdown.orderlyShutdown(acceptors, socketSelectors);
profileToChannelFactory.clear();
socketSelectors.clear();
@ -202,8 +190,6 @@ public class NioTransport extends TcpTransport {
private void acceptChannel(NioSocketChannel channel) {
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
openChannels.acceptedChannelOpened(tcpChannel);
tcpChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
serverAcceptedChannel(tcpChannel);
}

View File

@ -1,116 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
public class OpenChannels implements Releasable {
// TODO: Maybe set concurrency levels?
private final ConcurrentMap<TcpNioSocketChannel, Long> openClientChannels = newConcurrentMap();
private final ConcurrentMap<TcpNioSocketChannel, Long> openAcceptedChannels = newConcurrentMap();
private final ConcurrentMap<TcpNioServerSocketChannel, Long> openServerChannels = newConcurrentMap();
private final Logger logger;
public OpenChannels(Logger logger) {
this.logger = logger;
}
public void serverChannelOpened(TcpNioServerSocketChannel channel) {
boolean added = openServerChannels.putIfAbsent(channel, System.nanoTime()) == null;
if (added && logger.isTraceEnabled()) {
logger.trace("server channel opened: {}", channel);
}
}
public long serverChannelsCount() {
return openServerChannels.size();
}
public void acceptedChannelOpened(TcpNioSocketChannel channel) {
boolean added = openAcceptedChannels.putIfAbsent(channel, System.nanoTime()) == null;
if (added && logger.isTraceEnabled()) {
logger.trace("accepted channel opened: {}", channel);
}
}
public HashSet<NioSocketChannel> getAcceptedChannels() {
return new HashSet<>(openAcceptedChannels.keySet());
}
public void clientChannelOpened(TcpNioSocketChannel channel) {
boolean added = openClientChannels.putIfAbsent(channel, System.nanoTime()) == null;
if (added && logger.isTraceEnabled()) {
logger.trace("client channel opened: {}", channel);
}
}
public Map<TcpNioSocketChannel, Long> getClientChannels() {
return openClientChannels;
}
public void channelClosed(NioChannel channel) {
boolean removed;
if (channel instanceof NioServerSocketChannel) {
removed = openServerChannels.remove(channel) != null;
} else {
NioSocketChannel socketChannel = (NioSocketChannel) channel;
removed = openClientChannels.remove(socketChannel) != null;
if (removed == false) {
removed = openAcceptedChannels.remove(socketChannel) != null;
}
}
if (removed && logger.isTraceEnabled()) {
logger.trace("channel closed: {}", channel);
}
}
public void closeServerChannels() {
TcpChannel.closeChannels(new ArrayList<>(openServerChannels.keySet()), true);
openServerChannels.clear();
}
@Override
public void close() {
Stream<TcpChannel> channels = Stream.concat(openClientChannels.keySet().stream(), openAcceptedChannels.keySet().stream());
TcpChannel.closeChannels(channels.collect(Collectors.toList()), true);
openClientChannels.clear();
openAcceptedChannels.clear();
}
}

View File

@ -55,6 +55,11 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
return mockTransportService;
}
@Override
public int channelsPerNodeConnection() {
return 1;
}
@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
final MockTcpTransport t = (MockTcpTransport) transport;