From e0881448c13ada1972f41685d2a6716681b7d192 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 21 Aug 2017 15:03:17 +0200 Subject: [PATCH] ARTEMIS-1356 Avoid allocations and atomic operations to recognize handler's thread --- .../core/ServerSessionPacketHandler.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 7b45c9f8d6..58518a93e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -159,7 +158,11 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final boolean direct; - private static final ThreadLocal inHandler = ThreadLocal.withInitial(AtomicBoolean::new); + //marker instance used to recognize if a thread is performing a packet handling + private static final Object DUMMY = Boolean.TRUE; + + //a thread that has its thread-local map populated with DUMMY is performing a packet handling + private static final ThreadLocal inHandler = new ThreadLocal<>(); public ServerSessionPacketHandler(final ActiveMQServer server, final CoreProtocolManager manager, @@ -227,8 +230,25 @@ public class ServerSessionPacketHandler implements ChannelHandler { ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName()); } + private static void onStartMessagePacketHandler() { + assert inHandler.get() != null : "recursion on packet handling is not supported"; + inHandler.set(DUMMY); + } + + private static boolean inHandler() { + final Object dummy = inHandler.get(); + //sanity check: can't exist a thread using a marker different from DUMMY + assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong marker"; + return dummy != null; + } + + private static void onExitMessagePacketHandler() { + assert inHandler.get() == null : "marker not set"; + inHandler.set(null); + } + public void flushExecutor() { - if (!inHandler.get().get()) { + if (!inHandler()) { packetActor.flush(); callExecutor.flush(); } @@ -261,7 +281,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::handlePacket," + packet); } - inHandler.get().set(true); + onStartMessagePacketHandler(); try { final byte type = packet.getType(); switch (type) { @@ -287,7 +307,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { break; } } finally { - inHandler.get().set(false); + onExitMessagePacketHandler(); } }