From d5dd29e23f9c9a05127a381aeb0b473532afe5a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 28 May 2013 14:50:26 -0700 Subject: [PATCH 1/2] enable snappy compression for memcached results --- client/pom.xml | 4 ++ .../druid/client/cache/MemcachedCache.java | 7 +-- .../druid/client/cache/SnappyTranscoder.java | 50 +++++++++++++++++++ pom.xml | 6 +++ 4 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java diff --git a/client/pom.xml b/client/pom.xml index d0ae4006937..b17cdbc4017 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -181,6 +181,10 @@ com.metamx bytebuffer-collections + + org.xerial.snappy + snappy-java + diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index 9dca20d9fb8..e5d478e341e 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -53,9 +53,10 @@ public class MemcachedCache implements Cache public static MemcachedCache create(final MemcachedCacheConfig config) { try { - SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize()); - // disable compression - transcoder.setCompressionThreshold(Integer.MAX_VALUE); + SnappyTranscoder transcoder = new SnappyTranscoder(config.getMaxObjectSize()); + + // always use compression + transcoder.setCompressionThreshold(0); return new MemcachedCache( new MemcachedClient( diff --git a/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java b/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java new file mode 100644 index 00000000000..d83e740f810 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java @@ -0,0 +1,50 @@ +package com.metamx.druid.client.cache; + +import net.spy.memcached.transcoders.SerializingTranscoder; +import org.xerial.snappy.Snappy; + +import java.io.IOException; + +public class SnappyTranscoder extends SerializingTranscoder +{ + public SnappyTranscoder() + { + super(); + } + + public SnappyTranscoder(int max) + { + super(max); + } + + @Override + protected byte[] compress(byte[] in) + { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + byte[] out; + try { + out = Snappy.compress(in); + } catch(IOException e) { + throw new RuntimeException("IO exception compressing data", e); + } + getLogger().debug("Compressed %d bytes to %d", in.length, out.length); + return out; + } + + @Override + protected byte[] decompress(byte[] in) + { + byte[] out = null; + if(in != null) { + try { + out = Snappy.uncompress(in); + } catch (IOException e) { + getLogger().warn("Failed to decompress data", e); + } + } + return out == null ? null : out; + } +} diff --git a/pom.xml b/pom.xml index dce1e771eef..0c97dd87ed7 100644 --- a/pom.xml +++ b/pom.xml @@ -320,6 +320,12 @@ jackson-mapper-asl 1.9.11 + + org.xerial.snappy + snappy-java + 1.0.5 + + From 899bae1cc5f4be94214d60c097eb0e9f44d93bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 5 Jun 2013 12:51:29 -0700 Subject: [PATCH 2/2] swap out snappy for LZ4 --- client/pom.xml | 4 +- .../druid/client/cache/LZ4Transcoder.java | 81 +++++++++++++++++++ .../druid/client/cache/MemcachedCache.java | 2 +- .../druid/client/cache/SnappyTranscoder.java | 50 ------------ .../client/cache/MemcachedCacheTest.java | 12 ++- pom.xml | 6 +- 6 files changed, 96 insertions(+), 59 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java delete mode 100644 client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java diff --git a/client/pom.xml b/client/pom.xml index b17cdbc4017..76f09cfd4b8 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -182,8 +182,8 @@ bytebuffer-collections - org.xerial.snappy - snappy-java + net.jpountz.lz4 + lz4 diff --git a/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java new file mode 100644 index 00000000000..4728430b4e7 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client.cache; + +import com.google.common.primitives.Ints; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Decompressor; +import net.jpountz.lz4.LZ4Factory; +import net.spy.memcached.transcoders.SerializingTranscoder; + +import java.nio.ByteBuffer; + +public class LZ4Transcoder extends SerializingTranscoder +{ + + private final LZ4Factory lz4Factory; + + public LZ4Transcoder() + { + super(); + lz4Factory = LZ4Factory.fastestJavaInstance(); + } + + public LZ4Transcoder(int max) + { + super(max); + lz4Factory = LZ4Factory.fastestJavaInstance(); + } + + @Override + protected byte[] compress(byte[] in) + { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + LZ4Compressor compressor = lz4Factory.fastCompressor(); + + byte[] out = new byte[compressor.maxCompressedLength(in.length)]; + int compressedLength = compressor.compress(in, 0, in.length, out, 0); + + getLogger().debug("Compressed %d bytes to %d", in.length, compressedLength); + + return ByteBuffer.allocate(Ints.BYTES + compressedLength) + .putInt(in.length) + .put(out, 0, compressedLength) + .array(); + } + + @Override + protected byte[] decompress(byte[] in) + { + byte[] out = null; + if(in != null) { + LZ4Decompressor decompressor = lz4Factory.decompressor(); + + int size = ByteBuffer.wrap(in).getInt(); + + out = new byte[size]; + decompressor.decompress(in, Ints.BYTES, out, 0, out.length); + } + return out == null ? null : out; + } +} diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index e5d478e341e..fb6fa72ce46 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -53,7 +53,7 @@ public class MemcachedCache implements Cache public static MemcachedCache create(final MemcachedCacheConfig config) { try { - SnappyTranscoder transcoder = new SnappyTranscoder(config.getMaxObjectSize()); + LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize()); // always use compression transcoder.setCompressionThreshold(0); diff --git a/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java b/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java deleted file mode 100644 index d83e740f810..00000000000 --- a/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.metamx.druid.client.cache; - -import net.spy.memcached.transcoders.SerializingTranscoder; -import org.xerial.snappy.Snappy; - -import java.io.IOException; - -public class SnappyTranscoder extends SerializingTranscoder -{ - public SnappyTranscoder() - { - super(); - } - - public SnappyTranscoder(int max) - { - super(max); - } - - @Override - protected byte[] compress(byte[] in) - { - if (in == null) { - throw new NullPointerException("Can't compress null"); - } - - byte[] out; - try { - out = Snappy.compress(in); - } catch(IOException e) { - throw new RuntimeException("IO exception compressing data", e); - } - getLogger().debug("Compressed %d bytes to %d", in.length, out.length); - return out; - } - - @Override - protected byte[] decompress(byte[] in) - { - byte[] out = null; - if(in != null) { - try { - out = Snappy.uncompress(in); - } catch (IOException e) { - getLogger().warn("Failed to decompress data", e); - } - } - return out == null ? null : out; - } -} diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java index 287d208db62..23ca0ea9693 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java @@ -52,8 +52,8 @@ import java.util.concurrent.TimeoutException; */ public class MemcachedCacheTest { - private static final byte[] HI = "hi".getBytes(); - private static final byte[] HO = "ho".getBytes(); + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); private MemcachedCache cache; @Before @@ -124,7 +124,13 @@ public class MemcachedCacheTest class MockMemcachedClient implements MemcachedClientIF { private final ConcurrentMap theMap = new ConcurrentHashMap(); - private final Transcoder transcoder = new SerializingTranscoder(); + private final SerializingTranscoder transcoder; + + public MockMemcachedClient() + { + transcoder = new LZ4Transcoder(); + transcoder.setCompressionThreshold(0); + } @Override public Collection getAvailableServers() diff --git a/pom.xml b/pom.xml index 0c97dd87ed7..478b080e74b 100644 --- a/pom.xml +++ b/pom.xml @@ -321,9 +321,9 @@ 1.9.11 - org.xerial.snappy - snappy-java - 1.0.5 + net.jpountz.lz4 + lz4 + 1.1.2