From 968268ee5d1964be860b4b7d7799072a0822476e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 25 Aug 2017 18:25:04 -0400 Subject: [PATCH] ARTEMIS-1374 Improve performance and GC overhead of AMQP transfer tags Use a more efficient means of creating AMQP transfer tags and pool previously generated values for reuse on future sends. --- .../amqp/proton/AMQPSessionContext.java | 8 +- .../amqp/proton/AmqpTransferTagGenerator.java | 126 ++++++++++++ .../proton/ProtonServerSenderContext.java | 2 +- .../proton/AmqpTransferTagGeneratorTest.java | 186 ++++++++++++++++++ 4 files changed, 317 insertions(+), 5 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java create mode 100644 artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index bf60e17d8a..5cd3515203 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -45,14 +45,14 @@ public class AMQPSessionContext extends ProtonInitializable { protected final Session session; - private long currentTag = 0; - protected Map receivers = new ConcurrentHashMap<>(); protected Map senders = new ConcurrentHashMap<>(); protected boolean closed = false; + protected final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); + public AMQPSessionContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, Session session) { this.connection = connection; this.sessionSPI = sessionSPI; @@ -93,11 +93,11 @@ public class AMQPSessionContext extends ProtonInitializable { } public byte[] getTag() { - return Long.toHexString(currentTag++).getBytes(); + return tagCache.getNextTag(); } public void replaceTag(byte[] tag) { - // TODO: do we need to reuse this? + tagCache.returnTag(tag); } public void close() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java new file mode 100644 index 0000000000..2cbf9edcd7 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton; + +import java.util.ArrayDeque; +import java.util.Deque; + +/** + * Utility class that can generate and if enabled pool the binary tag values + * used to identify transfers over an AMQP link. + */ +public final class AmqpTransferTagGenerator { + + public static final int DEFAULT_TAG_POOL_SIZE = 1024; + + private final Deque tagPool; + + private long nextTagId; + private int maxPoolSize = DEFAULT_TAG_POOL_SIZE; + + public AmqpTransferTagGenerator() { + this(true); + } + + public AmqpTransferTagGenerator(boolean pool) { + if (pool) { + this.tagPool = new ArrayDeque(); + } else { + this.tagPool = null; + } + } + + /** + * Retrieves the next available tag. + * + * @return a new or unused tag depending on the pool option. + */ + public synchronized byte[] getNextTag() { + byte[] tagBytes = null; + + if (tagPool != null) { + tagBytes = tagPool.pollFirst(); + } + + if (tagBytes == null) { + long tag = nextTagId++; + int size = encodingSize(tag); + + tagBytes = new byte[size]; + + for (int i = 0; i < size; ++i) { + tagBytes[size - 1 - i] = (byte) (tag >>> (i * 8)); + } + } + + return tagBytes; + } + + /** + * When used as a pooled cache of tags the unused tags should always be + * returned once the transfer has been settled. + * + * @param data + * a previously borrowed tag that is no longer in use. + */ + public synchronized void returnTag(byte[] data) { + if (tagPool != null && tagPool.size() < maxPoolSize) { + tagPool.offerLast(data); + } + } + + /** + * Gets the current max pool size value. + * + * @return the current max tag pool size. + */ + public int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * Sets the max tag pool size. If the size is smaller than the current number + * of pooled tags the pool will drain over time until it matches the max. + * + * @param maxPoolSize + * the maximum number of tags to hold in the pool. + */ + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + /** + * @return true if the generator is using a pool of tags to reduce + * allocations. + */ + public boolean isPooling() { + return tagPool != null; + } + + private int encodingSize(long value) { + if (value < 0) { + return Long.BYTES; + } + + int size = 1; + while (size < 8 && (value >= (1L << (size * 8)))) { + size++; + } + + return size; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 868e9c80d8..c774b4d1f9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -618,7 +618,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } } - // todo add tag caching + if (!preSettle) { protonSession.replaceTag(delivery.getTag()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java new file mode 100644 index 0000000000..0ed1ffbb31 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests for the AMQP Transfer Tag Generator + */ +public class AmqpTransferTagGeneratorTest { + + @Test + public void testCreate() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(); + assertTrue(tagGen.isPooling()); + assertEquals(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, tagGen.getMaxPoolSize()); + } + + @Test + public void testCreateDisabled() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + assertFalse(tagGen.isPooling()); + assertEquals(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, tagGen.getMaxPoolSize()); + } + + @Test + public void testNewTagsOnSuccessiveCheckouts() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + + byte[] tag1 = tagGen.getNextTag(); + byte[] tag2 = tagGen.getNextTag(); + byte[] tag3 = tagGen.getNextTag(); + + assertNotSame(tag1, tag2); + assertNotSame(tag1, tag3); + assertNotSame(tag3, tag2); + + assertFalse(Arrays.equals(tag1, tag2)); + assertFalse(Arrays.equals(tag1, tag3)); + assertFalse(Arrays.equals(tag3, tag2)); + } + + @Test + public void testTagPoolingInEffect() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + + byte[] tag1 = tagGen.getNextTag(); + byte[] tag2 = tagGen.getNextTag(); + tagGen.returnTag(tag1); + tagGen.returnTag(tag2); + byte[] tag3 = tagGen.getNextTag(); + byte[] tag4 = tagGen.getNextTag(); + + assertSame(tag1, tag3); + assertSame(tag2, tag4); + assertNotSame(tag1, tag4); + assertNotSame(tag2, tag3); + } + + @Test + public void testPooledTagsReturnedInCheckedInOrder() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + + byte[] tag1 = tagGen.getNextTag(); + byte[] tag2 = tagGen.getNextTag(); + tagGen.returnTag(tag2); + tagGen.returnTag(tag1); + byte[] tag3 = tagGen.getNextTag(); + byte[] tag4 = tagGen.getNextTag(); + + assertSame(tag1, tag4); + assertSame(tag2, tag3); + assertNotSame(tag1, tag3); + assertNotSame(tag2, tag4); + } + + @Test + public void testTagArrayGrowsWithTagValue() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + + for (int i = 0; i < 512; ++i) { + byte[] tag = tagGen.getNextTag(); + + if (i < 256) { + assertEquals(1, tag.length); + } else { + assertEquals(2, tag.length); + } + } + } + + @Test + public void testTagValueMatchesParsedArray() throws IOException { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + + for (int i = 0; i < Short.MAX_VALUE; ++i) { + byte[] tag = tagGen.getNextTag(); + + ByteArrayInputStream bais = new ByteArrayInputStream(tag); + DataInputStream dis = new DataInputStream(bais); + + if (i < 256) { + assertEquals(1, tag.length); + assertEquals((byte) i, dis.readByte()); + } else { + assertEquals(2, tag.length); + assertEquals(i, dis.readShort()); + } + } + } + + @Test + public void testTagGenerationWorksWithIdRollover() throws Exception { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + + Field urisField = tagGen.getClass().getDeclaredField("nextTagId"); + urisField.setAccessible(true); + urisField.set(tagGen, Long.MAX_VALUE + 1); + + { + byte[] tag = tagGen.getNextTag(); + + ByteArrayInputStream bais = new ByteArrayInputStream(tag); + DataInputStream dis = new DataInputStream(bais); + + assertEquals(8, tag.length); + assertEquals(Long.MAX_VALUE + 1, dis.readLong()); + } + { + byte[] tag = tagGen.getNextTag(); + + ByteArrayInputStream bais = new ByteArrayInputStream(tag); + DataInputStream dis = new DataInputStream(bais); + + assertEquals(8, tag.length); + assertEquals(Long.MAX_VALUE + 2, dis.readLong()); + } + } + + @Ignore("Used to test performance") + @Test + public void testTagGeneratorOverTime() { + final AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + final int tagLoop = AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE; + final ArrayList tags = new ArrayList<>(tagLoop); + + for (int i = 0; i < Short.MAX_VALUE * 16; ++i) { + // Checkout all the tags the pool will create + for (int j = 0; j < tagLoop; ++j) { + tags.add(tagGen.getNextTag()); + } + + // Return them and then clear + tags.forEach((tag) -> tagGen.returnTag(tag)); + tags.clear(); + } + } +}