Harder close and remove reference concurrency in MockTcpTransport (#22613)

There was still small race in MockTcpTransport where channesl that are concurrently
closing are not yet removed from the reference tracking causing tests to fail. Compared to
the other races before this is a rather small windown and requires very very short test durations.
This commit is contained in:
Simon Willnauer 2017-01-13 16:04:05 +01:00 committed by GitHub
parent 18fdc39b8c
commit d5fa84f869
2 changed files with 19 additions and 15 deletions

View File

@ -439,9 +439,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
try {
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
if (!lifecycle.started()) {
throw new IllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
@ -867,7 +864,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeChannels nodeChannels = it.next();
it.remove();
IOUtils.closeWhileHandlingException(nodeChannels);
@ -889,7 +886,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected void onException(Channel channel, Exception e) throws IOException {
if (!lifecycle.started()) {
// ignore
// just close and ignore - we are already stopped and just need to make sure we release all resources
disconnectFromNodeChannel(channel, e);
return;
}
if (isCloseConnectionException(e)) {

View File

@ -49,6 +49,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -268,7 +269,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
private final ServerSocket serverSocket;
private final ConcurrentHashMap<MockChannel, Boolean> workerChannels = new ConcurrentHashMap<>();
private final Set<MockChannel> workerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Socket activeChannel;
private final String profile;
private final CancellableThreads cancellableThreads = new CancellableThreads();
@ -320,7 +321,8 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
if (isOpen.get()) {
incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove);
//establish a happens-before edge between closing and accepting a new connection
workerChannels.put(incomingChannel, Boolean.TRUE);
workerChannels.add(incomingChannel);
// this spawns a new thread immediately, so OK under lock
incomingChannel.loopRead(executor);
// the channel is properly registered and will be cleared by the close code.
@ -343,8 +345,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
if (isOpen.get()) {
try {
onException(MockChannel.this, e);
} catch (IOException ex) {
} catch (Exception ex) {
logger.warn("failed on handling exception", ex);
IOUtils.closeWhileHandlingException(MockChannel.this); // pure paranoia
}
}
}
@ -361,18 +364,21 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
}
@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
// establish a happens-before edge between closing and accepting a new connection
// we have to sync this entire block to ensure that our openChannels checks work correctly.
// The close block below will close all worker channels but if one of the worker channels runs into an exception
// for instance due to a disconnect the handling of this exception might be executed concurrently.
// now if we are in-turn concurrently call close we might not wait for the actual close to happen and that will, down the road
// make the assertion trip that not all channels are closed.
if (isOpen.compareAndSet(true, false)) {
final boolean removedChannel;
synchronized (openChannels) {
removedChannel = openChannels.remove(this);
}
//establish a happens-before edge between closing and accepting a new connection
synchronized (this) {
onChannelClosed(this);
IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()),
() -> cancellableThreads.cancel("channel closed"), onClose);
}
onChannelClosed(this);
IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels),
() -> cancellableThreads.cancel("channel closed"), onClose);
assert removedChannel: "Channel was not removed or removed twice?";
}
}