From d76b615ad09f9072c1d8e8cac0b0df517e36d50a Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 6 Sep 2017 13:28:26 -0400 Subject: [PATCH] AMQ-6804 Improve Transfer tag generation and pooling Reduce GC overhead and improve tag generation for faster tag creation. (cherry picked from commit ed395d1a8544bb0df7283d6335a88f6c2a3c867f) --- .../protocol/AmqpTransferTagGenerator.java | 66 ++++--- .../amqp/client/AmqpTransferTagGenerator.java | 66 ++++--- .../AmqpTransferTagGeneratorTest.java | 186 ++++++++++++++++++ 3 files changed, 274 insertions(+), 44 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGeneratorTest.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java index a1e3d84412..801eca2b56 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,10 +16,8 @@ */ package org.apache.activemq.transport.amqp.protocol; -import java.io.UnsupportedEncodingException; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.ArrayDeque; +import java.util.Deque; /** * Utility class that can generate and if enabled pool the binary tag values @@ -29,18 +27,18 @@ public final class AmqpTransferTagGenerator { public static final int DEFAULT_TAG_POOL_SIZE = 1024; + private final Deque tagPool; + private long nextTagId; private int maxPoolSize = DEFAULT_TAG_POOL_SIZE; - private final Set tagPool; - public AmqpTransferTagGenerator() { - this(false); + this(true); } public AmqpTransferTagGenerator(boolean pool) { if (pool) { - this.tagPool = new LinkedHashSet(); + this.tagPool = new ArrayDeque<>(); } else { this.tagPool = null; } @@ -52,20 +50,24 @@ public final class AmqpTransferTagGenerator { * @return a new or unused tag depending on the pool option. */ public byte[] getNextTag() { - byte[] rc; - if (tagPool != null && !tagPool.isEmpty()) { - final Iterator iterator = tagPool.iterator(); - rc = iterator.next(); - iterator.remove(); - } else { - try { - rc = Long.toHexString(nextTagId++).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - // This should never happen since we control the input. - throw new RuntimeException(e); + byte[] tagBytes = null; + + if (tagPool != null) { + tagBytes = tagPool.pollFirst(); + } + + if (tagBytes == null) { + long tag = nextTagId++; + int size = encodingSize(tag); + + tagBytes = new byte[size]; + + for (int i = 0; i < size; ++i) { + tagBytes[size - 1 - i] = (byte) (tag >>> (i * 8)); } } - return rc; + + return tagBytes; } /** @@ -77,7 +79,7 @@ public final class AmqpTransferTagGenerator { */ public void returnTag(byte[] data) { if (tagPool != null && tagPool.size() < maxPoolSize) { - tagPool.add(data); + tagPool.offerLast(data); } } @@ -100,4 +102,24 @@ public final class AmqpTransferTagGenerator { public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } + + /** + * @return true if the generator is using a pool of tags to reduce allocations. + */ + public boolean isPooling() { + return tagPool != null; + } + + private int encodingSize(long value) { + if (value < 0) { + return Long.BYTES; + } + + int size = 1; + while (size < 8 && (value >= (1L << (size * 8)))) { + size++; + } + + return size; + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java index 08db018080..69edc9b4fb 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,10 +16,8 @@ */ package org.apache.activemq.transport.amqp.client; -import java.io.UnsupportedEncodingException; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.ArrayDeque; +import java.util.Deque; /** * Utility class that can generate and if enabled pool the binary tag values @@ -29,18 +27,18 @@ public final class AmqpTransferTagGenerator { public static final int DEFAULT_TAG_POOL_SIZE = 1024; + private final Deque tagPool; + private long nextTagId; private int maxPoolSize = DEFAULT_TAG_POOL_SIZE; - private final Set tagPool; - public AmqpTransferTagGenerator() { - this(false); + this(true); } public AmqpTransferTagGenerator(boolean pool) { if (pool) { - this.tagPool = new LinkedHashSet(); + this.tagPool = new ArrayDeque<>(); } else { this.tagPool = null; } @@ -52,20 +50,24 @@ public final class AmqpTransferTagGenerator { * @return a new or unused tag depending on the pool option. */ public byte[] getNextTag() { - byte[] rc; - if (tagPool != null && !tagPool.isEmpty()) { - final Iterator iterator = tagPool.iterator(); - rc = iterator.next(); - iterator.remove(); - } else { - try { - rc = Long.toHexString(nextTagId++).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - // This should never happen since we control the input. - throw new RuntimeException(e); + byte[] tagBytes = null; + + if (tagPool != null) { + tagBytes = tagPool.pollFirst(); + } + + if (tagBytes == null) { + long tag = nextTagId++; + int size = encodingSize(tag); + + tagBytes = new byte[size]; + + for (int i = 0; i < size; ++i) { + tagBytes[size - 1 - i] = (byte) (tag >>> (i * 8)); } } - return rc; + + return tagBytes; } /** @@ -77,7 +79,7 @@ public final class AmqpTransferTagGenerator { */ public void returnTag(byte[] data) { if (tagPool != null && tagPool.size() < maxPoolSize) { - tagPool.add(data); + tagPool.offerLast(data); } } @@ -100,4 +102,24 @@ public final class AmqpTransferTagGenerator { public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } + + /** + * @return true if the generator is using a pool of tags to reduce allocations. + */ + public boolean isPooling() { + return tagPool != null; + } + + private int encodingSize(long value) { + if (value < 0) { + return Long.BYTES; + } + + int size = 1; + while (size < 8 && (value >= (1L << (size * 8)))) { + size++; + } + + return size; + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGeneratorTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGeneratorTest.java new file mode 100644 index 0000000000..f10f42bac9 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGeneratorTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.protocol; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests for the AMQP Transfer Tag Generator + */ +public class AmqpTransferTagGeneratorTest { + + @Test + public void testCreate() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(); + assertTrue(tagGen.isPooling()); + assertEquals(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, tagGen.getMaxPoolSize()); + } + + @Test + public void testCreateDisabled() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + assertFalse(tagGen.isPooling()); + assertEquals(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, tagGen.getMaxPoolSize()); + } + + @Test + public void testNewTagsOnSuccessiveCheckouts() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + + byte[] tag1 = tagGen.getNextTag(); + byte[] tag2 = tagGen.getNextTag(); + byte[] tag3 = tagGen.getNextTag(); + + assertNotSame(tag1, tag2); + assertNotSame(tag1, tag3); + assertNotSame(tag3, tag2); + + assertFalse(Arrays.equals(tag1, tag2)); + assertFalse(Arrays.equals(tag1, tag3)); + assertFalse(Arrays.equals(tag3, tag2)); + } + + @Test + public void testTagPoolingInEffect() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + + byte[] tag1 = tagGen.getNextTag(); + byte[] tag2 = tagGen.getNextTag(); + tagGen.returnTag(tag1); + tagGen.returnTag(tag2); + byte[] tag3 = tagGen.getNextTag(); + byte[] tag4 = tagGen.getNextTag(); + + assertSame(tag1, tag3); + assertSame(tag2, tag4); + assertNotSame(tag1, tag4); + assertNotSame(tag2, tag3); + } + + @Test + public void testPooledTagsReturnedInCheckedInOrder() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + + byte[] tag1 = tagGen.getNextTag(); + byte[] tag2 = tagGen.getNextTag(); + tagGen.returnTag(tag2); + tagGen.returnTag(tag1); + byte[] tag3 = tagGen.getNextTag(); + byte[] tag4 = tagGen.getNextTag(); + + assertSame(tag1, tag4); + assertSame(tag2, tag3); + assertNotSame(tag1, tag3); + assertNotSame(tag2, tag4); + } + + @Test + public void testTagArrayGrowsWithTagValue() { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + + for (int i = 0; i < 512; ++i) { + byte[] tag = tagGen.getNextTag(); + + if (i < 256) { + assertEquals(1, tag.length); + } else { + assertEquals(2, tag.length); + } + } + } + + @Test + public void testTagValueMatchesParsedArray() throws IOException { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + + for (int i = 0; i < Short.MAX_VALUE; ++i) { + byte[] tag = tagGen.getNextTag(); + + ByteArrayInputStream bais = new ByteArrayInputStream(tag); + DataInputStream dis = new DataInputStream(bais); + + if (i < 256) { + assertEquals(1, tag.length); + assertEquals((byte) i, dis.readByte()); + } else { + assertEquals(2, tag.length); + assertEquals(i, dis.readShort()); + } + } + } + + @Test + public void testTagGenerationWorksWithIdRollover() throws Exception { + AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false); + + Field urisField = tagGen.getClass().getDeclaredField("nextTagId"); + urisField.setAccessible(true); + urisField.set(tagGen, Long.MAX_VALUE + 1); + + { + byte[] tag = tagGen.getNextTag(); + + ByteArrayInputStream bais = new ByteArrayInputStream(tag); + DataInputStream dis = new DataInputStream(bais); + + assertEquals(8, tag.length); + assertEquals(Long.MAX_VALUE + 1, dis.readLong()); + } + { + byte[] tag = tagGen.getNextTag(); + + ByteArrayInputStream bais = new ByteArrayInputStream(tag); + DataInputStream dis = new DataInputStream(bais); + + assertEquals(8, tag.length); + assertEquals(Long.MAX_VALUE + 2, dis.readLong()); + } + } + + @Ignore("Used to test performance") + @Test + public void testTagGeneratorOverTime() { + final AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true); + final int tagLoop = AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE; + final ArrayList tags = new ArrayList<>(tagLoop); + + for (int i = 0; i < Short.MAX_VALUE * 16; ++i) { + // Checkout all the tags the pool will create + for (int j = 0; j < tagLoop; ++j) { + tags.add(tagGen.getNextTag()); + } + + // Return them and then clear + tags.forEach((tag) -> tagGen.returnTag(tag)); + tags.clear(); + } + } +}