fix potential blocking of NettyTransport connect and disconnect methods

Currently, in NettyTransport the locks for connecting and disconnecting channels are stored in a ConcurrentMap that has 500 entries. A tread can acquire a lock for an id and the lock returned is chosen on a hash function computed from the id.
Unfortunately, a collision of two ids can cause a deadlock as follows:

Scenario: one master (no data), one datanode (only data node)

DiscoveryNode id of master is X
DiscoveryNode id of datanode is Y

Both X and Y cause the same lock to be returned by NettyTransport#connectLock()

Both are up and running, all is fine until master stops.

Thread 1: The master fault detection of the datanode is notified (onNodeDisconnected()), which in turn leads the node to try and reconnect to master via the callstack titled "Thread 1" below.

-> connectToNode() is called and lock for X is acquired. The method waits for 45s for the cannels to reconnect.

Furthermore, Thread 1 holds the NettyTransport#masterNodeMutex.

Thread 2: The connection fails with an exception (connection refused, see callstack below), because the master shut down already. The exception is handled in NettyTransport#exceptionCaught which calls NettyTransport#disconnectFromNodeChannel. This method acquires the lock for Y (see Thread 2 below).

Now, if Y and X have two different locks, this would get the lock, disconnect the channels and notify thread 1. But since X and Y have the same locks, thread 2 is deadlocked with thread 1 which waits for 45s.

In this time, no thread can acquire the masterNodeMutex (held by thread 1), so the node can, for example, not stop.

This commit introduces a mechanism that assures unique locks for unique ids. This lock is not reentrant and therfore assures that threads can not end up in an infinite recursion (see Thread 3 below for an example on how a thread can aquire a lock twice).
While this is not a problem right now, it is potentially dangerous to have it that way, because the callstacks are complex as is and slight changes might cause unecpected recursions.

Thread 1
----

	owns: Object  (id=114)
	owns: Object  (id=118)
	waiting for: DefaultChannelFuture  (id=140)
	Object.wait(long) line: not available [native method]
	DefaultChannelFuture(Object).wait(long, int) line: 461
	DefaultChannelFuture.await0(long, boolean) line: 311
	DefaultChannelFuture.awaitUninterruptibly(long) line: 285
	NettyTransport.connectToChannels(NettyTransport$NodeChannels, DiscoveryNode) line: 672
	NettyTransport.connectToNode(DiscoveryNode, boolean) line: 609
	NettyTransport.connectToNode(DiscoveryNode) line: 579
	TransportService.connectToNode(DiscoveryNode) line: 129
	MasterFaultDetection.handleTransportDisconnect(DiscoveryNode) line: 195
	MasterFaultDetection.access$0(MasterFaultDetection, DiscoveryNode) line: 188
	MasterFaultDetection$FDConnectionListener.onNodeDisconnected(DiscoveryNode) line: 245
	TransportService$Adapter$2.run() line: 298
	EsThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker) line: 1145
	ThreadPoolExecutor$Worker.run() line: 615
	Thread.run() line: 724

Thread 2
-------

	waiting for: Object  (id=114)
	NettyTransport.disconnectFromNodeChannel(Channel, Throwable) line: 790
	NettyTransport.exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 495
	MessageChannelHandler.exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 228
	MessageChannelHandler(SimpleChannelUpstreamHandler).handleUpstream(ChannelHandlerContext, ChannelEvent) line: 112
	DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 564
	DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(ChannelEvent) line: 791
	SizeHeaderFrameDecoder(FrameDecoder).exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 377
	SizeHeaderFrameDecoder(SimpleChannelUpstreamHandler).handleUpstream(ChannelHandlerContext, ChannelEvent) line: 112
	DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 564
	DefaultChannelPipeline.sendUpstream(ChannelEvent) line: 559
	Channels.fireExceptionCaught(Channel, Throwable) line: 525
	NioClientBoss.processSelectedKeys(Set<SelectionKey>) line: 110
	NioClientBoss.process(Selector) line: 79
	NioClientBoss(AbstractNioSelector).run() line: 312
	NioClientBoss.run() line: 42
	ThreadRenamingRunnable.run() line: 108
	DeadLockProofWorker$1.run() line: 42
	ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145
	ThreadPoolExecutor$Worker.run() line: 615
	Thread.run() line: 724

Thread 3
---------

        org.elasticsearch.transport.netty.NettyTransport.disconnectFromNode(NettyTransport.java:772)
        org.elasticsearch.transport.netty.NettyTransport.access$1200(NettyTransport.java:92)
        org.elasticsearch.transport.netty.NettyTransport$ChannelCloseListener.operationComplete(NettyTransport.java:830)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:418)
        org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362)
        org.jboss.netty.channel.AbstractChannel$ChannelCloseFuture.setClosed(AbstractChannel.java:355)
        org.jboss.netty.channel.AbstractChannel.setClosed(AbstractChannel.java:185)
        org.jboss.netty.channel.socket.nio.AbstractNioChannel.setClosed(AbstractNioChannel.java:197)
        org.jboss.netty.channel.socket.nio.NioSocketChannel.setClosed(NioSocketChannel.java:84)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:357)
        org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58)
        org.jboss.netty.channel.Channels.close(Channels.java:812)
        org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
        org.elasticsearch.transport.netty.NettyTransport$NodeChannels.closeChannelsAndWait(NettyTransport.java:892)
        org.elasticsearch.transport.netty.NettyTransport$NodeChannels.close(NettyTransport.java:879)
        org.elasticsearch.transport.netty.NettyTransport.disconnectFromNode(NettyTransport.java:778)
        org.elasticsearch.transport.netty.NettyTransport.access$1200(NettyTransport.java:92)
        org.elasticsearch.transport.netty.NettyTransport$ChannelCloseListener.operationComplete(NettyTransport.java:830)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:418)
        org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362)
        org.jboss.netty.channel.AbstractChannel$ChannelCloseFuture.setClosed(AbstractChannel.java:355)
        org.jboss.netty.channel.AbstractChannel.setClosed(AbstractChannel.java:185)
        org.jboss.netty.channel.socket.nio.AbstractNioChannel.setClosed(AbstractNioChannel.java:197)
        org.jboss.netty.channel.socket.nio.NioSocketChannel.setClosed(NioSocketChannel.java:84)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:357)
        org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
        org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:724)
This commit is contained in:
Simon Willnauer 2013-09-03 17:56:52 +02:00 committed by Britta Weber
parent 4155741f7f
commit 8203d4dbcf
3 changed files with 289 additions and 43 deletions

View File

@ -0,0 +1,96 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchIllegalStateException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class manages locks. Locks can be accessed with an identifier and are
* created the first time they are acquired and removed if no thread hold the
* lock. The latter is important to assure that the list of locks does not grow
* infinitely.
*
* A Thread can acquire a lock only once.
*
* */
public class KeyedLock<T> {
private final ConcurrentMap<T, KeyLock> map = new ConcurrentHashMap<T, KeyLock>();
private final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<KeyedLock.KeyLock>();
public void acquire(T key) {
while (true) {
if (threadLocal.get() != null) {
// if we are here, the thread already has the lock
throw new ElasticSearchIllegalStateException("Lock already accquired in Thread" + Thread.currentThread().getId()
+ " for key " + key);
}
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
KeyLock newLock = new KeyLock();
perNodeLock = map.putIfAbsent(key, newLock);
if (perNodeLock == null) {
newLock.lock();
threadLocal.set(newLock);
return;
}
}
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
threadLocal.set(perNodeLock);
return;
}
}
}
public void release(T key) {
KeyLock lock = threadLocal.get();
if (lock == null) {
throw new ElasticSearchIllegalStateException("Lock not accquired");
}
assert lock.isHeldByCurrentThread();
assert lock == map.get(key);
lock.unlock();
threadLocal.set(null);
int decrementAndGet = lock.count.decrementAndGet();
if (decrementAndGet == 0) {
map.remove(key, lock);
}
}
@SuppressWarnings("serial")
private final static class KeyLock extends ReentrantLock {
private final AtomicInteger count = new AtomicInteger(1);
}
public boolean hasLockedKeys() {
return !map.isEmpty();
}
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -151,7 +152,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile BoundTransportAddress boundAddress;
private final Object[] connectMutex;
private final KeyedLock<String> connectionLock = new KeyedLock<String >();
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
@ -167,11 +169,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
}
this.connectMutex = new Object[500];
for (int i = 0; i < connectMutex.length; i++) {
connectMutex[i] = new Object();
}
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
@ -591,15 +588,17 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
}
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
}
try {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
if (light) {
nodeChannels = connectToChannelsLight(node);
@ -629,6 +628,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
}
} finally {
connectionLock.release(node.id());
}
} finally {
globalLock.readLock().unlock();
@ -750,15 +751,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public void disconnectFromNode(DiscoveryNode node) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
connectionLock.acquire(node.id());
try {
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
}
}
}
@ -767,15 +771,22 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)){ //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
nodeChannels.close();
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
logger.debug("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
connectionLock.release(node.id());
}
}
}
@ -786,15 +797,22 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
*/
private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
nodeChannels.close();
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
connectionLock.release(node.id());
}
}
}
@ -809,15 +827,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return nodeChannels.channel(options.type());
}
private Object connectLock(String nodeId) {
int hash = nodeId.hashCode();
// abs returns Integer.MIN_VALUE, so we need to protect against it...
if (hash == Integer.MIN_VALUE) {
hash = 0;
}
return connectMutex[Math.abs(hash) % connectMutex.length];
}
private class ChannelCloseListener implements ChannelFutureListener {
private final DiscoveryNode node;

View File

@ -0,0 +1,141 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.transport.netty;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class KeyedLockTests extends ElasticsearchTestCase {
@Test
public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException {
ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<String, Integer>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<String, AtomicInteger>();
KeyedLock<String> connectionLock = new KeyedLock<String>();
String[] names = new String[randomIntBetween(1, 40)];
for (int i = 0; i < names.length; i++) {
names[i] = randomRealisticUnicodeOfLengthBetween(10, 20);
}
CountDownLatch startLatch = new CountDownLatch(1);
int numThreads = randomIntBetween(3, 10);
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter);
}
for (int i = 0; i < numThreads; i++) {
threads[i].start();
}
startLatch.countDown();
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
assertThat(connectionLock.hasLockedKeys(), equalTo(false));
Set<Entry<String, Integer>> entrySet = counter.entrySet();
assertThat(counter.size(), equalTo(safeCounter.size()));
for (Entry<String, Integer> entry : entrySet) {
AtomicInteger atomicInteger = safeCounter.get(entry.getKey());
assertThat(atomicInteger, not(Matchers.nullValue()));
assertThat(atomicInteger.get(), equalTo(entry.getValue()));
}
}
@Test(expected = ElasticSearchIllegalStateException.class)
public void checkCannotAcquireTwoLocks() throws InterruptedException {
ConcurrentHashMap<String, Integer> counters = new ConcurrentHashMap<String, Integer>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<String, AtomicInteger>();
KeyedLock<String> connectionLock = new KeyedLock<String>();
String[] names = new String[randomIntBetween(1, 40)];
connectionLock = new KeyedLock<String>();
String name = randomRealisticUnicodeOfLength(atLeast(10));
connectionLock.acquire(name);
connectionLock.acquire(name);
}
@Test(expected = ElasticSearchIllegalStateException.class)
public void checkCannotReleaseUnacquiredLock() throws InterruptedException {
ConcurrentHashMap<String, Integer> counters = new ConcurrentHashMap<String, Integer>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<String, AtomicInteger>();
KeyedLock<String> connectionLock = new KeyedLock<String>();
String[] names = new String[randomIntBetween(1, 40)];
connectionLock = new KeyedLock<String>();
String name = randomRealisticUnicodeOfLength(atLeast(10));
connectionLock.release(name);
}
public static class AcquireAndReleaseThread extends Thread {
private CountDownLatch startLatch;
KeyedLock<String> connectionLock;
String[] names;
ConcurrentHashMap<String, Integer> counter;
ConcurrentHashMap<String, AtomicInteger> safeCounter;
public AcquireAndReleaseThread(CountDownLatch startLatch, KeyedLock<String> connectionLock, String[] names,
ConcurrentHashMap<String, Integer> counter, ConcurrentHashMap<String, AtomicInteger> safeCounter) {
this.startLatch = startLatch;
this.connectionLock = connectionLock;
this.names = names;
this.counter = counter;
this.safeCounter = safeCounter;
}
public void run() {
try {
startLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException();
}
int numRuns = atLeast(500);
for (int i = 0; i < numRuns; i++) {
String curName = names[randomInt(names.length - 1)];
connectionLock.acquire(curName);
try {
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
} finally {
connectionLock.release(curName);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
}
}
}
}