ARTEMIS-1356 Avoid allocations and atomic operations to recognize handler's thread
This commit is contained in:
parent
ce01faeac7
commit
e0881448c1
|
@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
@ -159,7 +158,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
private final boolean direct;
|
private final boolean direct;
|
||||||
|
|
||||||
private static final ThreadLocal<AtomicBoolean> inHandler = ThreadLocal.withInitial(AtomicBoolean::new);
|
//marker instance used to recognize if a thread is performing a packet handling
|
||||||
|
private static final Object DUMMY = Boolean.TRUE;
|
||||||
|
|
||||||
|
//a thread that has its thread-local map populated with DUMMY is performing a packet handling
|
||||||
|
private static final ThreadLocal<Object> inHandler = new ThreadLocal<>();
|
||||||
|
|
||||||
public ServerSessionPacketHandler(final ActiveMQServer server,
|
public ServerSessionPacketHandler(final ActiveMQServer server,
|
||||||
final CoreProtocolManager manager,
|
final CoreProtocolManager manager,
|
||||||
|
@ -227,8 +230,25 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
|
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void onStartMessagePacketHandler() {
|
||||||
|
assert inHandler.get() != null : "recursion on packet handling is not supported";
|
||||||
|
inHandler.set(DUMMY);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean inHandler() {
|
||||||
|
final Object dummy = inHandler.get();
|
||||||
|
//sanity check: can't exist a thread using a marker different from DUMMY
|
||||||
|
assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong marker";
|
||||||
|
return dummy != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void onExitMessagePacketHandler() {
|
||||||
|
assert inHandler.get() == null : "marker not set";
|
||||||
|
inHandler.set(null);
|
||||||
|
}
|
||||||
|
|
||||||
public void flushExecutor() {
|
public void flushExecutor() {
|
||||||
if (!inHandler.get().get()) {
|
if (!inHandler()) {
|
||||||
packetActor.flush();
|
packetActor.flush();
|
||||||
callExecutor.flush();
|
callExecutor.flush();
|
||||||
}
|
}
|
||||||
|
@ -261,7 +281,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
|
logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
|
||||||
}
|
}
|
||||||
inHandler.get().set(true);
|
onStartMessagePacketHandler();
|
||||||
try {
|
try {
|
||||||
final byte type = packet.getType();
|
final byte type = packet.getType();
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -287,7 +307,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
inHandler.get().set(false);
|
onExitMessagePacketHandler();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue