diff --git a/client/pom.xml b/client/pom.xml index d0ae4006937..76f09cfd4b8 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -181,6 +181,10 @@ com.metamx bytebuffer-collections + + net.jpountz.lz4 + lz4 + diff --git a/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java new file mode 100644 index 00000000000..4728430b4e7 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client.cache; + +import com.google.common.primitives.Ints; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Decompressor; +import net.jpountz.lz4.LZ4Factory; +import net.spy.memcached.transcoders.SerializingTranscoder; + +import java.nio.ByteBuffer; + +public class LZ4Transcoder extends SerializingTranscoder +{ + + private final LZ4Factory lz4Factory; + + public LZ4Transcoder() + { + super(); + lz4Factory = LZ4Factory.fastestJavaInstance(); + } + + public LZ4Transcoder(int max) + { + super(max); + lz4Factory = LZ4Factory.fastestJavaInstance(); + } + + @Override + protected byte[] compress(byte[] in) + { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + LZ4Compressor compressor = lz4Factory.fastCompressor(); + + byte[] out = new byte[compressor.maxCompressedLength(in.length)]; + int compressedLength = compressor.compress(in, 0, in.length, out, 0); + + getLogger().debug("Compressed %d bytes to %d", in.length, compressedLength); + + return ByteBuffer.allocate(Ints.BYTES + compressedLength) + .putInt(in.length) + .put(out, 0, compressedLength) + .array(); + } + + @Override + protected byte[] decompress(byte[] in) + { + byte[] out = null; + if(in != null) { + LZ4Decompressor decompressor = lz4Factory.decompressor(); + + int size = ByteBuffer.wrap(in).getInt(); + + out = new byte[size]; + decompressor.decompress(in, Ints.BYTES, out, 0, out.length); + } + return out == null ? null : out; + } +} diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index 9dca20d9fb8..fb6fa72ce46 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -53,9 +53,10 @@ public class MemcachedCache implements Cache public static MemcachedCache create(final MemcachedCacheConfig config) { try { - SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize()); - // disable compression - transcoder.setCompressionThreshold(Integer.MAX_VALUE); + LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize()); + + // always use compression + transcoder.setCompressionThreshold(0); return new MemcachedCache( new MemcachedClient( diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java index 287d208db62..23ca0ea9693 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java @@ -52,8 +52,8 @@ import java.util.concurrent.TimeoutException; */ public class MemcachedCacheTest { - private static final byte[] HI = "hi".getBytes(); - private static final byte[] HO = "ho".getBytes(); + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); private MemcachedCache cache; @Before @@ -124,7 +124,13 @@ public class MemcachedCacheTest class MockMemcachedClient implements MemcachedClientIF { private final ConcurrentMap theMap = new ConcurrentHashMap(); - private final Transcoder transcoder = new SerializingTranscoder(); + private final SerializingTranscoder transcoder; + + public MockMemcachedClient() + { + transcoder = new LZ4Transcoder(); + transcoder.setCompressionThreshold(0); + } @Override public Collection getAvailableServers() diff --git a/pom.xml b/pom.xml index dce1e771eef..478b080e74b 100644 --- a/pom.xml +++ b/pom.xml @@ -320,6 +320,12 @@ jackson-mapper-asl 1.9.11 + + net.jpountz.lz4 + lz4 + 1.1.2 + +