Fix deadlock in DruidStatement & DruidConnection (#6868)

* Fix deadlock in DruidStatement & DruidConnection

* change statements type to ConcurrentMap
This commit is contained in:
zhaojiandong 2019-01-17 12:16:35 -06:00 committed by Fangjin Yang
parent b704ebfa37
commit 9f0fdcfef6
2 changed files with 42 additions and 38 deletions

View File

@ -30,9 +30,10 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.SqlLifecycleFactory; import org.apache.druid.sql.SqlLifecycleFactory;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -53,10 +54,14 @@ public class DruidConnection
private final AtomicInteger statementCounter = new AtomicInteger(); private final AtomicInteger statementCounter = new AtomicInteger();
private final AtomicReference<Future<?>> timeoutFuture = new AtomicReference<>(); private final AtomicReference<Future<?>> timeoutFuture = new AtomicReference<>();
@GuardedBy("statements") // Typically synchronized by connectionLock, except in one case: the onClose function passed
private final Map<Integer, DruidStatement> statements; // into DruidStatements contained by the map.
private final ConcurrentMap<Integer, DruidStatement> statements;
@GuardedBy("statements") @GuardedBy("connectionLock")
private final Object connectionLock = new Object();
@GuardedBy("connectionLock")
private boolean open = true; private boolean open = true;
public DruidConnection(final String connectionId, final int maxStatements, final Map<String, Object> context) public DruidConnection(final String connectionId, final int maxStatements, final Map<String, Object> context)
@ -64,14 +69,14 @@ public class DruidConnection
this.connectionId = Preconditions.checkNotNull(connectionId); this.connectionId = Preconditions.checkNotNull(connectionId);
this.maxStatements = maxStatements; this.maxStatements = maxStatements;
this.context = ImmutableMap.copyOf(context); this.context = ImmutableMap.copyOf(context);
this.statements = new HashMap<>(); this.statements = new ConcurrentHashMap<>();
} }
public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory) public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory)
{ {
final int statementId = statementCounter.incrementAndGet(); final int statementId = statementCounter.incrementAndGet();
synchronized (statements) { synchronized (connectionLock) {
if (statements.containsKey(statementId)) { if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this // Will only happen if statementCounter rolls over before old statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't have billions of statements. // ever happens then something fishy is going on, because we shouldn't have billions of statements.
@ -96,10 +101,9 @@ public class DruidConnection
sqlLifecycleFactory.factorize(), sqlLifecycleFactory.factorize(),
() -> { () -> {
// onClose function for the statement // onClose function for the statement
synchronized (statements) { log.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
log.debug("Connection[%s] closed statement[%s].", connectionId, statementId); // statements will be accessed unsynchronized to avoid deadlock
statements.remove(statementId); statements.remove(statementId);
}
} }
); );
@ -111,7 +115,7 @@ public class DruidConnection
public DruidStatement getStatement(final int statementId) public DruidStatement getStatement(final int statementId)
{ {
synchronized (statements) { synchronized (connectionLock) {
return statements.get(statementId); return statements.get(statementId);
} }
} }
@ -123,7 +127,7 @@ public class DruidConnection
*/ */
public boolean closeIfEmpty() public boolean closeIfEmpty()
{ {
synchronized (statements) { synchronized (connectionLock) {
if (statements.isEmpty()) { if (statements.isEmpty()) {
close(); close();
return true; return true;
@ -135,7 +139,7 @@ public class DruidConnection
public void close() public void close()
{ {
synchronized (statements) { synchronized (connectionLock) {
// Copy statements before iterating because statement.close() modifies it. // Copy statements before iterating because statement.close() modifies it.
for (DruidStatement statement : ImmutableList.copyOf(statements.values())) { for (DruidStatement statement : ImmutableList.copyOf(statements.values())) {
try { try {

View File

@ -300,11 +300,11 @@ public class DruidStatement implements Closeable
@Override @Override
public void close() public void close()
{ {
synchronized (lock) { State oldState = null;
final State oldState = state; try {
state = State.DONE; synchronized (lock) {
oldState = state;
try { state = State.DONE;
if (yielder != null) { if (yielder != null) {
Yielder<Object[]> theYielder = this.yielder; Yielder<Object[]> theYielder = this.yielder;
this.yielder = null; this.yielder = null;
@ -321,33 +321,33 @@ public class DruidStatement implements Closeable
yielderOpenCloseExecutor.shutdownNow(); yielderOpenCloseExecutor.shutdownNow();
} }
} }
catch (Throwable t) { }
if (oldState != State.DONE) { catch (Throwable t) {
// First close. Run the onClose function.
try {
onClose.run();
sqlLifecycle.emitLogsAndMetrics(t, null, -1);
}
catch (Throwable t1) {
t.addSuppressed(t1);
}
}
throw Throwables.propagate(t);
}
if (oldState != State.DONE) { if (oldState != State.DONE) {
// First close. Run the onClose function. // First close. Run the onClose function.
try { try {
if (!(this.throwable instanceof ForbiddenException)) {
sqlLifecycle.emitLogsAndMetrics(this.throwable, null, -1);
}
onClose.run(); onClose.run();
sqlLifecycle.emitLogsAndMetrics(t, null, -1);
} }
catch (Throwable t) { catch (Throwable t1) {
throw Throwables.propagate(t); t.addSuppressed(t1);
} }
} }
throw Throwables.propagate(t);
}
if (oldState != State.DONE) {
// First close. Run the onClose function.
try {
if (!(this.throwable instanceof ForbiddenException)) {
sqlLifecycle.emitLogsAndMetrics(this.throwable, null, -1);
}
onClose.run();
}
catch (Throwable t) {
throw Throwables.propagate(t);
}
} }
} }