This commit is contained in:
Clebert Suconic 2021-03-04 10:48:44 -05:00
commit d0cbd08363
3 changed files with 132 additions and 38 deletions

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.pools;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.function.Supplier;
import io.netty.util.internal.PlatformDependent;
/**
* A simple encapsulation of Netty MpscQueue to provide a pool of objects.
* Use this pool only when the borrowing of object (consume) is done on a single thread.
* This is using a Multi Producer Single Consumer queue (MPSC).
* If you need other uses you may create different strategies for ObjectPooling.
* @param <T>
*/
public class MpscPool<T> extends Pool<T> {
public MpscPool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
super(maxSize, cleaner, supplier);
}
@Override
protected Queue<T> createQueue(int maxSize) {
final Queue<T> internalPool;
if (maxSize > 0) {
internalPool = PlatformDependent.newFixedMpscQueue(maxSize);
} else {
internalPool = null;
}
return internalPool;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.pools;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* A simple encapsulation to provide a pool of objects.
* @param <T>
*/
public abstract class Pool<T> {
private final Queue<T> internalPool;
private final Consumer<T> cleaner;
private final Supplier<T> supplier;
public Pool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
internalPool = createQueue(maxSize);
this.cleaner = cleaner;
this.supplier = supplier;
}
abstract Queue<T> createQueue(int maxSize);
/** Use this to instantiate or return objects from the pool */
public final T borrow() {
if (internalPool == null) {
return supplier.get();
}
T returnObject = internalPool.poll();
if (returnObject == null) {
returnObject = supplier.get();
} else {
cleaner.accept(returnObject);
}
return returnObject;
}
/** Return objects to the pool, they will be either reused or ignored by the max size */
public final void release(T object) {
if (internalPool != null) {
internalPool.offer(object);
}
}
}

View File

@ -20,9 +20,7 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -99,6 +97,8 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.activemq.artemis.utils.pools.Pool;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.Actor;
@ -168,9 +168,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final Object largeMessageLock = new Object();
private final Queue<NullResponseMessage> cachedNullRes;
private final Pool<NullResponseMessage> poolNullResponse;
private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
private final Pool<NullResponseMessage_V2> poolNullResponseV2;
public ServerSessionPacketHandler(final ActiveMQServer server,
final ServerSession session,
@ -199,13 +199,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
// no confirmation window size means no resend cache hence NullResponsePackets
// won't get cached on it because need confirmation
if (this.channel.getConfirmationWindowSize() == -1) {
cachedNullRes = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
cachedNullRes_V2 = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
} else {
cachedNullRes = null;
cachedNullRes_V2 = null;
}
poolNullResponse = new MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ? MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage::reset, () -> new NullResponseMessage());
poolNullResponseV2 = new MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ? MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage_V2::reset, () -> new NullResponseMessage_V2());
}
private void clearLargeMessage() {
@ -670,35 +665,17 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
assert requireNullResponseMessage_V1(packet);
NullResponseMessage response;
if (cachedNullRes != null) {
response = cachedNullRes.poll();
if (response == null) {
response = new NullResponseMessage();
} else {
response.reset();
}
} else {
response = new NullResponseMessage();
}
return response;
return poolNullResponse.borrow();
}
private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
assert !requireNullResponseMessage_V1(packet);
NullResponseMessage_V2 response;
if (cachedNullRes_V2 != null) {
response = cachedNullRes_V2.poll();
if (response == null) {
response = new NullResponseMessage_V2(packet.getCorrelationID());
} else {
response.reset();
response = poolNullResponseV2.borrow();
// this should be already set by the channel too, but let's do it just in case
response.setCorrelationID(packet.getCorrelationID());
}
} else {
response = new NullResponseMessage_V2(packet.getCorrelationID());
}
return response;
}
@ -720,15 +697,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
private void releaseResponse(Packet packet) {
if (cachedNullRes == null || cachedNullRes_V2 == null) {
if (poolNullResponse == null || poolNullResponseV2 == null) {
return;
}
if (packet instanceof NullResponseMessage) {
cachedNullRes.offer((NullResponseMessage) packet);
poolNullResponse.release((NullResponseMessage) packet);
return;
}
if (packet instanceof NullResponseMessage_V2) {
cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
poolNullResponseV2.release((NullResponseMessage_V2) packet);
}
}