From 52f7a698d9e6e57f8b0fd09101b785e8024c9c49 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 21 Dec 2017 14:02:34 +0100 Subject: [PATCH] ARTEMIS-1573 Improve UTF translation allowing zero copy The UTF translations has been improved by: - zero copy on array based buffers - zero copy UTF length calculation - faster array access using Netty PlatformDependent.get|putByte - improved perf tests UTF8Test --- .../activemq/artemis/utils/UTF8Util.java | 313 +++++++++++++----- .../artemis/tests/timing/util/UTF8Test.java | 38 +-- 2 files changed, 241 insertions(+), 110 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java index bd00bb1cdf..afee81ce76 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.utils; import java.lang.ref.SoftReference; import io.netty.buffer.ByteBuf; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -34,7 +35,7 @@ public final class UTF8Util { private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled(); - private static final ThreadLocal> currenBuffer = new ThreadLocal<>(); + private static final ThreadLocal> currentBuffer = new ThreadLocal<>(); private UTF8Util() { // utility class @@ -48,6 +49,12 @@ public final class UTF8Util { } } + private static void writeAsShorts(final ByteBuf buffer, final String val) { + for (int i = 0; i < val.length(); i++) { + buffer.writeShort((short) val.charAt(i)); + } + } + public static void writeString(final ByteBuf buffer, final String val) { int length = val.length(); @@ -55,9 +62,7 @@ public final class UTF8Util { if (length < 9) { // If very small it's more performant to store char by char - for (int i = 0; i < val.length(); i++) { - buffer.writeShort((short) val.charAt(i)); - } + writeAsShorts(buffer, val); } else if (length < 0xfff) { // Store as UTF - this is quicker than char by char for most strings saveUTF(buffer, val); @@ -68,13 +73,12 @@ public final class UTF8Util { } public static void saveUTF(final ByteBuf out, final String str) { - StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); if (str.length() > 0xffff) { throw ActiveMQUtilBundle.BUNDLE.stringTooLong(str.length()); } - final int len = UTF8Util.calculateUTFSize(str, buffer); + final int len = UTF8Util.calculateUTFSize(str); if (len > 0xffff) { throw ActiveMQUtilBundle.BUNDLE.stringTooLong(len); @@ -82,83 +86,161 @@ public final class UTF8Util { out.writeShort((short) len); - if (len > buffer.byteBuffer.length) { - buffer.resizeByteBuffer(len); + final int stringLength = str.length(); + + if (UTF8Util.isTrace) { + // This message is too verbose for debug, that's why we are using trace here + ActiveMQUtilLogger.LOGGER.trace("Saving string with utfSize=" + len + " stringSize=" + stringLength); } - if (len == (long) str.length()) { - for (int byteLocation = 0; byteLocation < len; byteLocation++) { - buffer.byteBuffer[byteLocation] = (byte) buffer.charBuffer[byteLocation]; + if (out.hasArray()) { + out.ensureWritable(len); + final byte[] bytes = out.array(); + final int writerIndex = out.writerIndex(); + final int index = out.arrayOffset() + writerIndex; + if (PlatformDependent.hasUnsafe()) { + unsafeOnHeapWriteUTF(str, bytes, index, stringLength); + } else { + writeUTF(str, bytes, index, stringLength); } - out.writeBytes(buffer.byteBuffer, 0, len); + out.writerIndex(writerIndex + len); } else { - if (UTF8Util.isTrace) { - // This message is too verbose for debug, that's why we are using trace here - ActiveMQUtilLogger.LOGGER.trace("Saving string with utfSize=" + len + " stringSize=" + str.length()); + if (PlatformDependent.hasUnsafe() && out.hasMemoryAddress()) { + out.ensureWritable(len); + final long addressBytes = out.memoryAddress(); + final int writerIndex = out.writerIndex(); + unsafeOffHeapWriteUTF(str, addressBytes, writerIndex, stringLength); + out.writerIndex(writerIndex + len); + } else { + final StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); + final byte[] bytes = buffer.borrowByteBuffer(len); + writeUTF(str, bytes, 0, stringLength); + out.writeBytes(bytes, 0, len); } - - int stringLength = str.length(); - - int charCount = 0; - - for (int i = 0; i < stringLength; i++) { - char charAtPos = buffer.charBuffer[i]; - if (charAtPos <= 0x7f) { - buffer.byteBuffer[charCount++] = (byte) charAtPos; - } else if (charAtPos >= 0x800) { - buffer.byteBuffer[charCount++] = (byte) (0xE0 | charAtPos >> 12 & 0x0F); - buffer.byteBuffer[charCount++] = (byte) (0x80 | charAtPos >> 6 & 0x3F); - buffer.byteBuffer[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F); - } else { - buffer.byteBuffer[charCount++] = (byte) (0xC0 | charAtPos >> 6 & 0x1F); - buffer.byteBuffer[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F); - } - } - out.writeBytes(buffer.byteBuffer, 0, len); } } + private static int writeUTF(final CharSequence str, final byte[] bytes, final int index, final int length) { + int charCount = index; + + for (int i = 0; i < length; i++) { + char charAtPos = str.charAt(i); + if (charAtPos <= 0x7f) { + bytes[charCount++] = (byte) charAtPos; + } else if (charAtPos >= 0x800) { + bytes[charCount++] = (byte) (0xE0 | charAtPos >> 12 & 0x0F); + bytes[charCount++] = (byte) (0x80 | charAtPos >> 6 & 0x3F); + bytes[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F); + } else { + bytes[charCount++] = (byte) (0xC0 | charAtPos >> 6 & 0x1F); + bytes[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F); + } + } + + final int writtenBytes = (charCount - index); + return writtenBytes; + } + + private static int unsafeOnHeapWriteUTF(final CharSequence str, final byte[] bytes, final int index, final int length) { + int charCount = index; + for (int i = 0; i < length; i++) { + char charAtPos = str.charAt(i); + if (charAtPos <= 0x7f) { + PlatformDependent.putByte(bytes, charCount++, (byte) charAtPos); + } else if (charAtPos >= 0x800) { + PlatformDependent.putByte(bytes, charCount++, (byte) (0xE0 | charAtPos >> 12 & 0x0F)); + PlatformDependent.putByte(bytes, charCount++, (byte) (0x80 | charAtPos >> 6 & 0x3F)); + PlatformDependent.putByte(bytes, charCount++, (byte) (0x80 | charAtPos >> 0 & 0x3F)); + } else { + PlatformDependent.putByte(bytes, charCount++, (byte) (0xC0 | charAtPos >> 6 & 0x1F)); + PlatformDependent.putByte(bytes, charCount++, (byte) (0x80 | charAtPos >> 0 & 0x3F)); + } + } + + final int writtenBytes = (charCount - index); + return writtenBytes; + } + + private static int unsafeOffHeapWriteUTF(final CharSequence str, final long addressBytes, final int index, final int length) { + int charCount = index; + for (int i = 0; i < length; i++) { + char charAtPos = str.charAt(i); + if (charAtPos <= 0x7f) { + PlatformDependent.putByte(addressBytes + charCount++, (byte) charAtPos); + } else if (charAtPos >= 0x800) { + PlatformDependent.putByte(addressBytes + charCount++, (byte) (0xE0 | charAtPos >> 12 & 0x0F)); + PlatformDependent.putByte(addressBytes + charCount++, (byte) (0x80 | charAtPos >> 6 & 0x3F)); + PlatformDependent.putByte(addressBytes + charCount++, (byte) (0x80 | charAtPos >> 0 & 0x3F)); + } else { + PlatformDependent.putByte(addressBytes + charCount++, (byte) (0xC0 | charAtPos >> 6 & 0x1F)); + PlatformDependent.putByte(addressBytes + charCount++, (byte) (0x80 | charAtPos >> 0 & 0x3F)); + } + } + + final int writtenBytes = (charCount - index); + return writtenBytes; + } + public static String readUTF(final ActiveMQBuffer input) { StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); final int size = input.readUnsignedShort(); - if (size > buffer.byteBuffer.length) { - buffer.resizeByteBuffer(size); - } - - if (size > buffer.charBuffer.length) { - buffer.resizeCharBuffer(size); - } - if (UTF8Util.isTrace) { // This message is too verbose for debug, that's why we are using trace here ActiveMQUtilLogger.LOGGER.trace("Reading string with utfSize=" + size); } + if (PlatformDependent.hasUnsafe() && input.byteBuf() != null && input.byteBuf().hasMemoryAddress()) { + final ByteBuf byteBuf = input.byteBuf(); + final long addressBytes = byteBuf.memoryAddress(); + final int index = byteBuf.readerIndex(); + byteBuf.skipBytes(size); + final char[] chars = buffer.borrowCharBuffer(size); + return unsafeOffHeapReadUTF(addressBytes, index, chars, size); + } + final byte[] bytes; + final int index; + if (input.byteBuf() != null && input.byteBuf().hasArray()) { + final ByteBuf byteBuf = input.byteBuf(); + bytes = byteBuf.array(); + index = byteBuf.arrayOffset() + byteBuf.readerIndex(); + byteBuf.skipBytes(size); + } else { + bytes = buffer.borrowByteBuffer(size); + index = 0; + input.readBytes(bytes, 0, size); + } + final char[] chars = buffer.borrowCharBuffer(size); + if (PlatformDependent.hasUnsafe()) { + return unsafeOnHeapReadUTF(bytes, index, chars, size); + } else { + return readUTF(bytes, index, chars, size); + } + } - int count = 0; + private static String readUTF(final byte[] bytes, final int index, final char[] chars, final int size) { + int count = index; + final int limit = index + size; int byte1, byte2, byte3; int charCount = 0; - input.readBytes(buffer.byteBuffer, 0, size); - - while (count < size) { - byte1 = buffer.byteBuffer[count++]; + while (count < limit) { + byte1 = bytes[count++]; if (byte1 >= 0 && byte1 <= 0x7F) { - buffer.charBuffer[charCount++] = (char) byte1; + chars[charCount++] = (char) byte1; } else { int c = byte1 & 0xff; switch (c >> 4) { case 0xc: case 0xd: - byte2 = buffer.byteBuffer[count++]; - buffer.charBuffer[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F); + byte2 = bytes[count++]; + chars[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F); break; case 0xe: - byte2 = buffer.byteBuffer[count++]; - byte3 = buffer.byteBuffer[count++]; - buffer.charBuffer[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0); + byte2 = bytes[count++]; + byte3 = bytes[count++]; + chars[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0); break; default: throw new InternalError("unhandled utf8 byte " + c); @@ -166,17 +248,85 @@ public final class UTF8Util { } } - return new String(buffer.charBuffer, 0, charCount); - + return new String(chars, 0, charCount); } - public static StringUtilBuffer getThreadLocalBuffer() { - SoftReference softReference = UTF8Util.currenBuffer.get(); + private static String unsafeOnHeapReadUTF(final byte[] bytes, final int index, final char[] chars, final int size) { + int count = index; + final int limit = index + size; + int byte1, byte2, byte3; + int charCount = 0; + + while (count < limit) { + byte1 = PlatformDependent.getByte(bytes, count++); + + if (byte1 >= 0 && byte1 <= 0x7F) { + chars[charCount++] = (char) byte1; + } else { + int c = byte1 & 0xff; + switch (c >> 4) { + case 0xc: + case 0xd: + byte2 = PlatformDependent.getByte(bytes, count++); + chars[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F); + break; + case 0xe: + byte2 = PlatformDependent.getByte(bytes, count++); + byte3 = PlatformDependent.getByte(bytes, count++); + chars[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0); + break; + default: + throw new InternalError("unhandled utf8 byte " + c); + } + } + } + + return new String(chars, 0, charCount); + } + + private static String unsafeOffHeapReadUTF(final long addressBytes, + final int index, + final char[] chars, + final int size) { + int count = index; + final int limit = index + size; + int byte1, byte2, byte3; + int charCount = 0; + + while (count < limit) { + byte1 = PlatformDependent.getByte(addressBytes + count++); + + if (byte1 >= 0 && byte1 <= 0x7F) { + chars[charCount++] = (char) byte1; + } else { + int c = byte1 & 0xff; + switch (c >> 4) { + case 0xc: + case 0xd: + byte2 = PlatformDependent.getByte(addressBytes + count++); + chars[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F); + break; + case 0xe: + byte2 = PlatformDependent.getByte(addressBytes + count++); + byte3 = PlatformDependent.getByte(addressBytes + count++); + chars[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0); + break; + default: + throw new InternalError("unhandled utf8 byte " + c); + } + } + } + + return new String(chars, 0, charCount); + } + + private static StringUtilBuffer getThreadLocalBuffer() { + SoftReference softReference = UTF8Util.currentBuffer.get(); StringUtilBuffer value; if (softReference == null) { value = new StringUtilBuffer(); softReference = new SoftReference<>(value); - UTF8Util.currenBuffer.set(softReference); + UTF8Util.currentBuffer.set(softReference); } else { value = softReference.get(); } @@ -184,33 +334,23 @@ public final class UTF8Util { if (value == null) { value = new StringUtilBuffer(); softReference = new SoftReference<>(value); - UTF8Util.currenBuffer.set(softReference); + UTF8Util.currentBuffer.set(softReference); } return value; } public static void clearBuffer() { - SoftReference ref = UTF8Util.currenBuffer.get(); - if (ref.get() != null) { + SoftReference ref = UTF8Util.currentBuffer.get(); + if (ref != null && ref.get() != null) { ref.clear(); } } - public static int calculateUTFSize(final String str, final StringUtilBuffer stringBuffer) { + public static int calculateUTFSize(final String str) { int calculatedLen = 0; - - int stringLength = str.length(); - - if (stringLength > stringBuffer.charBuffer.length) { - stringBuffer.resizeCharBuffer(stringLength); - } - - str.getChars(0, stringLength, stringBuffer.charBuffer, 0); - - for (int i = 0; i < stringLength; i++) { - char c = stringBuffer.charBuffer[i]; - + for (int i = 0, stringLength = str.length(); i < stringLength; i++) { + final char c = str.charAt(i); if (c <= 0x7f) { calculatedLen++; } else if (c >= 0x800) { @@ -222,31 +362,24 @@ public final class UTF8Util { return calculatedLen; } - public static class StringUtilBuffer { + private static final class StringUtilBuffer { - public char[] charBuffer; + private char[] charBuffer = null; - public byte[] byteBuffer; + private byte[] byteBuffer = null; - public void resizeCharBuffer(final int newSize) { - if (newSize > charBuffer.length) { + public char[] borrowCharBuffer(final int newSize) { + if (charBuffer == null || newSize > charBuffer.length) { charBuffer = new char[newSize]; } + return charBuffer; } - public void resizeByteBuffer(final int newSize) { - if (newSize > byteBuffer.length) { + public byte[] borrowByteBuffer(final int newSize) { + if (byteBuffer == null || newSize > byteBuffer.length) { byteBuffer = new byte[newSize]; } - } - - public StringUtilBuffer() { - this(1024, 1024); - } - - public StringUtilBuffer(final int sizeChar, final int sizeByte) { - charBuffer = new char[sizeChar]; - byteBuffer = new byte[sizeByte]; + return byteBuffer; } } diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/util/UTF8Test.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/util/UTF8Test.java index e0d5a74a6c..efa06d9b17 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/util/UTF8Test.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/util/UTF8Test.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.timing.util; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.UTF8Util; import org.junit.After; import org.junit.Assert; @@ -26,31 +27,31 @@ import org.junit.Test; public class UTF8Test extends ActiveMQTestBase { - private final String str = "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5"; + private static final String str = "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5" + "abcdef&^*&!^ghijkl\uB5E2\uCAC7\uB2BB\uB7DD\uB7C7\uB3A3\uBCE4\uB5A5"; final int TIMES = 5; - final long numberOfIteractions = 1000000; + final int numberOfIteractions = 1000000; + + //It's needed to be sure that the JVM won't perform Dead Code Elimination + //on String/ActiveMQBuffer operations + private volatile Object blackHole; @Test public void testWriteUTF() throws Exception { ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(10 * 1024); - long start = System.currentTimeMillis(); - for (int c = 0; c < TIMES; c++) { + final long start = System.currentTimeMillis(); for (long i = 0; i < numberOfIteractions; i++) { - if (i == 10000) { - start = System.currentTimeMillis(); - } - buffer.clear(); buffer.writeUTF(str); + blackHole = buffer; } + final long spentTime = System.currentTimeMillis() - start; - long spentTime = System.currentTimeMillis() - start; - - System.out.println("Time WriteUTF = " + spentTime); + System.out.println("Time writeUTF = " + spentTime + " ms"); + System.out.println("Throughput writeUTF = " + numberOfIteractions / spentTime + " ops/ms"); } } @@ -60,22 +61,19 @@ public class UTF8Test extends ActiveMQTestBase { buffer.writeUTF(str); - long start = System.currentTimeMillis(); - for (int c = 0; c < TIMES; c++) { + ThreadLeakCheckRule.forceGC(); + final long start = System.currentTimeMillis(); for (long i = 0; i < numberOfIteractions; i++) { - if (i == 10000) { - start = System.currentTimeMillis(); - } - buffer.resetReaderIndex(); String newstr = buffer.readUTF(); Assert.assertEquals(str, newstr); + blackHole = newstr; } + final long spentTime = System.currentTimeMillis() - start; - long spentTime = System.currentTimeMillis() - start; - - System.out.println("Time readUTF = " + spentTime); + System.out.println("Time readUTF = " + spentTime + " ms"); + System.out.println("Throughput readUTF = " + numberOfIteractions / spentTime + " ops/ms"); } }