From 05c55d382c3310cb8f98593889074e6f194c3d44 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 3 Mar 2021 15:46:14 -0500 Subject: [PATCH] ARTEMIS-3133 Just Encapsulating ObjectPool into a small utility --- .../artemis/utils/pools/MpscPool.java | 51 ++++++++++++++ .../activemq/artemis/utils/pools/Pool.java | 66 +++++++++++++++++++ .../core/ServerSessionPacketHandler.java | 53 +++++---------- 3 files changed, 132 insertions(+), 38 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java new file mode 100644 index 0000000000..86e016368e --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.pools; + +import java.util.Queue; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import io.netty.util.internal.PlatformDependent; + + +/** + * A simple encapsulation of Netty MpscQueue to provide a pool of objects. + * Use this pool only when the borrowing of object (consume) is done on a single thread. + * This is using a Multi Producer Single Consumer queue (MPSC). + * If you need other uses you may create different strategies for ObjectPooling. + * @param + */ +public class MpscPool extends Pool { + + public MpscPool(int maxSize, Consumer cleaner, Supplier supplier) { + super(maxSize, cleaner, supplier); + } + + @Override + protected Queue createQueue(int maxSize) { + final Queue internalPool; + if (maxSize > 0) { + internalPool = PlatformDependent.newFixedMpscQueue(maxSize); + } else { + internalPool = null; + } + return internalPool; + } + +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java new file mode 100644 index 0000000000..8f9340efb3 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.pools; + +import java.util.Queue; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * A simple encapsulation to provide a pool of objects. + * @param + */ +public abstract class Pool { + + private final Queue internalPool; + + private final Consumer cleaner; + private final Supplier supplier; + + public Pool(int maxSize, Consumer cleaner, Supplier supplier) { + internalPool = createQueue(maxSize); + this.cleaner = cleaner; + this.supplier = supplier; + } + + abstract Queue createQueue(int maxSize); + + /** Use this to instantiate or return objects from the pool */ + public final T borrow() { + if (internalPool == null) { + return supplier.get(); + } + + T returnObject = internalPool.poll(); + + if (returnObject == null) { + returnObject = supplier.get(); + } else { + cleaner.accept(returnObject); + } + + return returnObject; + } + + /** Return objects to the pool, they will be either reused or ignored by the max size */ + public final void release(T object) { + if (internalPool != null) { + internalPool.offer(object); + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index ce996de181..5ec11c577c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -20,9 +20,7 @@ import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; import java.util.Objects; -import java.util.Queue; -import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; @@ -99,6 +97,8 @@ import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.pools.MpscPool; +import org.apache.activemq.artemis.utils.pools.Pool; import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.actors.Actor; @@ -168,9 +168,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final Object largeMessageLock = new Object(); - private final Queue cachedNullRes; + private final Pool poolNullResponse; - private final Queue cachedNullRes_V2; + private final Pool poolNullResponseV2; public ServerSessionPacketHandler(final ActiveMQServer server, final ServerSession session, @@ -199,13 +199,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { // no confirmation window size means no resend cache hence NullResponsePackets // won't get cached on it because need confirmation - if (this.channel.getConfirmationWindowSize() == -1) { - cachedNullRes = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES); - cachedNullRes_V2 = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES); - } else { - cachedNullRes = null; - cachedNullRes_V2 = null; - } + poolNullResponse = new MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ? MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage::reset, () -> new NullResponseMessage()); + poolNullResponseV2 = new MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ? MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage_V2::reset, () -> new NullResponseMessage_V2()); } private void clearLargeMessage() { @@ -670,35 +665,17 @@ public class ServerSessionPacketHandler implements ChannelHandler { private NullResponseMessage createNullResponseMessage_V1(Packet packet) { assert requireNullResponseMessage_V1(packet); - NullResponseMessage response; - if (cachedNullRes != null) { - response = cachedNullRes.poll(); - if (response == null) { - response = new NullResponseMessage(); - } else { - response.reset(); - } - } else { - response = new NullResponseMessage(); - } - return response; + return poolNullResponse.borrow(); } private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) { assert !requireNullResponseMessage_V1(packet); NullResponseMessage_V2 response; - if (cachedNullRes_V2 != null) { - response = cachedNullRes_V2.poll(); - if (response == null) { - response = new NullResponseMessage_V2(packet.getCorrelationID()); - } else { - response.reset(); - // this should be already set by the channel too, but let's do it just in case - response.setCorrelationID(packet.getCorrelationID()); - } - } else { - response = new NullResponseMessage_V2(packet.getCorrelationID()); - } + response = poolNullResponseV2.borrow(); + + // this should be already set by the channel too, but let's do it just in case + response.setCorrelationID(packet.getCorrelationID()); + return response; } @@ -720,15 +697,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { } private void releaseResponse(Packet packet) { - if (cachedNullRes == null || cachedNullRes_V2 == null) { + if (poolNullResponse == null || poolNullResponseV2 == null) { return; } if (packet instanceof NullResponseMessage) { - cachedNullRes.offer((NullResponseMessage) packet); + poolNullResponse.release((NullResponseMessage) packet); return; } if (packet instanceof NullResponseMessage_V2) { - cachedNullRes_V2.offer((NullResponseMessage_V2) packet); + poolNullResponseV2.release((NullResponseMessage_V2) packet); } }