Delete unneeded nio client (#27408)

This is a follow up to #27132. As that PR greatly simplified the
connection logic inside a low level transport implementation, much of
the functionality provided by the NioClient class is no longer
necessary. This commit removes that class.
This commit is contained in:
Tim Brooks 2017-11-16 09:22:40 -07:00 committed by GitHub
parent 3b963dcfe5
commit 35a5922927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 8 additions and 163 deletions

View File

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
public class NioClient {
private final OpenChannels openChannels;
private final Supplier<SocketSelector> selectorSupplier;
private final ChannelFactory channelFactory;
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
NioClient(OpenChannels openChannels, Supplier<SocketSelector> selectorSupplier, ChannelFactory channelFactory) {
this.openChannels = openChannels;
this.selectorSupplier = selectorSupplier;
this.channelFactory = channelFactory;
}
public void close() {
semaphore.acquireUninterruptibly(Integer.MAX_VALUE);
}
NioSocketChannel initiateConnection(InetSocketAddress address) throws IOException {
boolean allowedToConnect = semaphore.tryAcquire();
if (allowedToConnect == false) {
return null;
}
try {
SocketSelector selector = selectorSupplier.get();
NioSocketChannel nioSocketChannel = channelFactory.openNioChannel(address, selector);
openChannels.clientChannelOpened(nioSocketChannel);
return nioSocketChannel;
} finally {
semaphore.release();
}
}
}

View File

@ -34,13 +34,7 @@ public class NioShutdown {
this.logger = logger; this.logger = logger;
} }
void orderlyShutdown(OpenChannels openChannels, NioClient client, ArrayList<AcceptingSelector> acceptors, void orderlyShutdown(OpenChannels openChannels, ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {
ArrayList<SocketSelector> socketSelectors) {
// Close the client. This ensures that no new send connections will be opened. Client could be null if exception was
// throw on start up
if (client != null) {
client.close();
}
// Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections // Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections
openChannels.closeServerChannels(); openChannels.closeServerChannels();

View File

@ -72,7 +72,8 @@ public class NioTransport extends TcpTransport<NioChannel> {
private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap(); private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap();
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>(); private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>(); private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>();
private NioClient client; private RoundRobinSelectorSupplier clientSelectorSupplier;
private ChannelFactory clientChannelFactory;
private int acceptorNumber; private int acceptorNumber;
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
@ -111,10 +112,8 @@ public class NioTransport extends TcpTransport<NioChannel> {
@Override @Override
protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<NioChannel> connectListener) protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<NioChannel> connectListener)
throws IOException { throws IOException {
NioSocketChannel channel = client.initiateConnection(node.getAddress().address()); NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
if (channel == null) { openChannels.clientChannelOpened(channel);
throw new ElasticsearchException("client is shutdown");
}
channel.addConnectListener(connectListener); channel.addConnectListener(connectListener);
return channel; return channel;
} }
@ -137,7 +136,8 @@ public class NioTransport extends TcpTransport<NioChannel> {
} }
} }
client = createClient(); clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), contextSetter);
if (NetworkService.NETWORK_SERVER.get(settings)) { if (NetworkService.NETWORK_SERVER.get(settings)) {
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
@ -178,7 +178,7 @@ public class NioTransport extends TcpTransport<NioChannel> {
@Override @Override
protected void stopInternal() { protected void stopInternal() {
NioShutdown nioShutdown = new NioShutdown(logger); NioShutdown nioShutdown = new NioShutdown(logger);
nioShutdown.orderlyShutdown(openChannels, client, acceptors, socketSelectors); nioShutdown.orderlyShutdown(openChannels, acceptors, socketSelectors);
profileToChannelFactory.clear(); profileToChannelFactory.clear();
socketSelectors.clear(); socketSelectors.clear();
@ -193,10 +193,4 @@ public class NioTransport extends TcpTransport<NioChannel> {
final Throwable t = unwrapped != null ? unwrapped : cause; final Throwable t = unwrapped != null ? unwrapped : cause;
onException(channel, t instanceof Exception ? (Exception) t : new ElasticsearchException(t)); onException(channel, t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
} }
private NioClient createClient() {
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), contextSetter);
return new NioClient(openChannels, selectorSupplier, channelFactory);
}
} }

View File

@ -1,81 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.junit.Before;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.function.Supplier;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class NioClientTests extends ESTestCase {
private NioClient client;
private SocketSelector selector;
private ChannelFactory channelFactory;
private OpenChannels openChannels = new OpenChannels(logger);
private InetSocketAddress address;
@Before
@SuppressWarnings("unchecked")
public void setUpClient() {
channelFactory = mock(ChannelFactory.class);
selector = mock(SocketSelector.class);
ArrayList<SocketSelector> selectors = new ArrayList<>();
selectors.add(selector);
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(selectors);
client = new NioClient(openChannels, selectorSupplier, channelFactory);
address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
}
public void testCreateConnection() throws IOException, InterruptedException {
NioSocketChannel channel1 = mock(NioSocketChannel.class);
when(channelFactory.openNioChannel(eq(address), eq(selector))).thenReturn(channel1);
NioSocketChannel nioSocketChannel = client.initiateConnection(address);
assertEquals(channel1, nioSocketChannel);
}
public void testConnectionException() throws IOException, InterruptedException {
IOException ioException = new IOException();
when(channelFactory.openNioChannel(eq(address), eq(selector))).thenThrow(ioException);
expectThrows(IOException.class, () -> client.initiateConnection(address));
}
public void testCloseDoesNotAllowConnections() throws IOException {
client.close();
assertNull(client.initiateConnection(address));
}
}