ARTEMIS-3856 re-applying recursive logic on ThreadLocal

This commit is contained in:
Clebert Suconic 2022-06-09 18:27:05 -04:00
parent a3997380c7
commit 578f2fb5cf
1 changed files with 27 additions and 23 deletions

View File

@ -62,7 +62,7 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private static final FastThreadLocal<ArrayList<ReadyListener>> readyListenersPool = new FastThreadLocal<>();
private static final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
private final boolean batchingEnabled;
@ -138,47 +138,51 @@ public class NettyConnection implements Connection {
@Override
public final void fireReady(final boolean ready) {
// We are reusing a previously created ArrayList for this localArray
ArrayList<ReadyListener> localArrayList = readyListenersPool.get();
if (localArrayList == null) {
localArrayList = new ArrayList<>();
readyListenersPool.set(localArrayList);
} else {
localArrayList.clear();
ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);
}
synchronized (readyListeners) {
this.ready = ready;
if (ready) {
final int size = this.readyListeners.size();
localArrayList.ensureCapacity(size);
if (readyToCall != null) {
readyToCall.ensureCapacity(size);
}
try {
for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) {
break;
}
localArrayList.add(readyListener);
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener);
}
} finally {
readyListeners.clear();
}
}
}
try {
localArrayList.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
if (readyToCall != null) {
try {
readyToCall.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
localArrayList.clear();
}
}
}