HADOOP-10219. ipc.Client.setupIOstreams() needs to check for ClientCache.stopClient requested shutdowns.
Contributed by Kihwal Lee and Lukas Majercak.
(cherry picked from commit 9e96ac666d
)
This commit is contained in:
parent
9cf35d99b9
commit
142d878c90
|
@ -70,6 +70,7 @@ import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
||||||
|
@ -439,6 +440,8 @@ public class Client implements AutoCloseable {
|
||||||
|
|
||||||
private final Object sendRpcRequestLock = new Object();
|
private final Object sendRpcRequestLock = new Object();
|
||||||
|
|
||||||
|
private AtomicReference<Thread> connectingThread = new AtomicReference<>();
|
||||||
|
|
||||||
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
||||||
this.remoteId = remoteId;
|
this.remoteId = remoteId;
|
||||||
this.server = remoteId.getAddress();
|
this.server = remoteId.getAddress();
|
||||||
|
@ -777,6 +780,7 @@ public class Client implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
connectingThread.set(Thread.currentThread());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Connecting to "+server);
|
LOG.debug("Connecting to "+server);
|
||||||
}
|
}
|
||||||
|
@ -862,6 +866,8 @@ public class Client implements AutoCloseable {
|
||||||
markClosed(new IOException("Couldn't set up IO streams: " + t, t));
|
markClosed(new IOException("Couldn't set up IO streams: " + t, t));
|
||||||
}
|
}
|
||||||
close();
|
close();
|
||||||
|
} finally {
|
||||||
|
connectingThread.set(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1216,6 +1222,13 @@ public class Client implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void interruptConnectingThread() {
|
||||||
|
Thread connThread = connectingThread.get();
|
||||||
|
if (connThread != null) {
|
||||||
|
connThread.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Close the connection. */
|
/** Close the connection. */
|
||||||
private synchronized void close() {
|
private synchronized void close() {
|
||||||
if (!shouldCloseConnection.get()) {
|
if (!shouldCloseConnection.get()) {
|
||||||
|
@ -1321,6 +1334,7 @@ public class Client implements AutoCloseable {
|
||||||
// wake up all connections
|
// wake up all connections
|
||||||
for (Connection conn : connections.values()) {
|
for (Connection conn : connections.values()) {
|
||||||
conn.interrupt();
|
conn.interrupt();
|
||||||
|
conn.interruptConnectingThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait until all connections are closed
|
// wait until all connections are closed
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
@ -1398,6 +1399,50 @@ public class TestIPC {
|
||||||
assertEquals(Client.getTimeout(config), -1);
|
assertEquals(Client.getTimeout(config), -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testSetupConnectionShouldNotBlockShutdown() throws Exception {
|
||||||
|
// Start server
|
||||||
|
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
||||||
|
Server server = new TestServer(1, true);
|
||||||
|
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
|
// Track how many times we retried to set up the connection
|
||||||
|
final AtomicInteger createSocketCalled = new AtomicInteger();
|
||||||
|
|
||||||
|
doAnswer(new Answer() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
createSocketCalled.addAndGet(1);
|
||||||
|
Thread.sleep(MIN_SLEEP_TIME * 5);
|
||||||
|
throw new ConnectTimeoutException("fake");
|
||||||
|
}
|
||||||
|
}).when(mockFactory).createSocket();
|
||||||
|
final Client client = new Client(LongWritable.class, conf, mockFactory);
|
||||||
|
|
||||||
|
final AtomicBoolean callStarted = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// Call a random function asynchronously so that we can call stop()
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
callStarted.set(true);
|
||||||
|
call(client, RANDOM.nextLong(), addr, conf);
|
||||||
|
} catch (IOException ignored) {}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return callStarted.get() && createSocketCalled.get() == 1;
|
||||||
|
}
|
||||||
|
}, 50, 60000);
|
||||||
|
|
||||||
|
// stop() should stop the client immediately without any more retries
|
||||||
|
client.stop();
|
||||||
|
assertEquals(1, createSocketCalled.get());
|
||||||
|
}
|
||||||
|
|
||||||
private void assertRetriesOnSocketTimeouts(Configuration conf,
|
private void assertRetriesOnSocketTimeouts(Configuration conf,
|
||||||
int maxTimeoutRetries) throws IOException {
|
int maxTimeoutRetries) throws IOException {
|
||||||
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
||||||
|
|
Loading…
Reference in New Issue