From d5dd29e23f9c9a05127a381aeb0b473532afe5a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 28 May 2013 14:50:26 -0700 Subject: [PATCH 1/3] enable snappy compression for memcached results --- client/pom.xml | 4 ++ .../druid/client/cache/MemcachedCache.java | 7 +-- .../druid/client/cache/SnappyTranscoder.java | 50 +++++++++++++++++++ pom.xml | 6 +++ 4 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java diff --git a/client/pom.xml b/client/pom.xml index d0ae4006937..b17cdbc4017 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -181,6 +181,10 @@ com.metamx bytebuffer-collections + + org.xerial.snappy + snappy-java + diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index 9dca20d9fb8..e5d478e341e 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -53,9 +53,10 @@ public class MemcachedCache implements Cache public static MemcachedCache create(final MemcachedCacheConfig config) { try { - SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize()); - // disable compression - transcoder.setCompressionThreshold(Integer.MAX_VALUE); + SnappyTranscoder transcoder = new SnappyTranscoder(config.getMaxObjectSize()); + + // always use compression + transcoder.setCompressionThreshold(0); return new MemcachedCache( new MemcachedClient( diff --git a/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java b/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java new file mode 100644 index 00000000000..d83e740f810 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java @@ -0,0 +1,50 @@ +package com.metamx.druid.client.cache; + +import net.spy.memcached.transcoders.SerializingTranscoder; +import org.xerial.snappy.Snappy; + +import java.io.IOException; + +public class SnappyTranscoder extends SerializingTranscoder +{ + public SnappyTranscoder() + { + super(); + } + + public SnappyTranscoder(int max) + { + super(max); + } + + @Override + protected byte[] compress(byte[] in) + { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + byte[] out; + try { + out = Snappy.compress(in); + } catch(IOException e) { + throw new RuntimeException("IO exception compressing data", e); + } + getLogger().debug("Compressed %d bytes to %d", in.length, out.length); + return out; + } + + @Override + protected byte[] decompress(byte[] in) + { + byte[] out = null; + if(in != null) { + try { + out = Snappy.uncompress(in); + } catch (IOException e) { + getLogger().warn("Failed to decompress data", e); + } + } + return out == null ? null : out; + } +} diff --git a/pom.xml b/pom.xml index dce1e771eef..0c97dd87ed7 100644 --- a/pom.xml +++ b/pom.xml @@ -320,6 +320,12 @@ jackson-mapper-asl 1.9.11 + + org.xerial.snappy + snappy-java + 1.0.5 + + From 899bae1cc5f4be94214d60c097eb0e9f44d93bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 5 Jun 2013 12:51:29 -0700 Subject: [PATCH 2/3] swap out snappy for LZ4 --- client/pom.xml | 4 +- .../druid/client/cache/LZ4Transcoder.java | 81 +++++++++++++++++++ .../druid/client/cache/MemcachedCache.java | 2 +- .../druid/client/cache/SnappyTranscoder.java | 50 ------------ .../client/cache/MemcachedCacheTest.java | 12 ++- pom.xml | 6 +- 6 files changed, 96 insertions(+), 59 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java delete mode 100644 client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java diff --git a/client/pom.xml b/client/pom.xml index b17cdbc4017..76f09cfd4b8 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -182,8 +182,8 @@ bytebuffer-collections - org.xerial.snappy - snappy-java + net.jpountz.lz4 + lz4 diff --git a/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java new file mode 100644 index 00000000000..4728430b4e7 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client.cache; + +import com.google.common.primitives.Ints; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Decompressor; +import net.jpountz.lz4.LZ4Factory; +import net.spy.memcached.transcoders.SerializingTranscoder; + +import java.nio.ByteBuffer; + +public class LZ4Transcoder extends SerializingTranscoder +{ + + private final LZ4Factory lz4Factory; + + public LZ4Transcoder() + { + super(); + lz4Factory = LZ4Factory.fastestJavaInstance(); + } + + public LZ4Transcoder(int max) + { + super(max); + lz4Factory = LZ4Factory.fastestJavaInstance(); + } + + @Override + protected byte[] compress(byte[] in) + { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + LZ4Compressor compressor = lz4Factory.fastCompressor(); + + byte[] out = new byte[compressor.maxCompressedLength(in.length)]; + int compressedLength = compressor.compress(in, 0, in.length, out, 0); + + getLogger().debug("Compressed %d bytes to %d", in.length, compressedLength); + + return ByteBuffer.allocate(Ints.BYTES + compressedLength) + .putInt(in.length) + .put(out, 0, compressedLength) + .array(); + } + + @Override + protected byte[] decompress(byte[] in) + { + byte[] out = null; + if(in != null) { + LZ4Decompressor decompressor = lz4Factory.decompressor(); + + int size = ByteBuffer.wrap(in).getInt(); + + out = new byte[size]; + decompressor.decompress(in, Ints.BYTES, out, 0, out.length); + } + return out == null ? null : out; + } +} diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index e5d478e341e..fb6fa72ce46 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -53,7 +53,7 @@ public class MemcachedCache implements Cache public static MemcachedCache create(final MemcachedCacheConfig config) { try { - SnappyTranscoder transcoder = new SnappyTranscoder(config.getMaxObjectSize()); + LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize()); // always use compression transcoder.setCompressionThreshold(0); diff --git a/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java b/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java deleted file mode 100644 index d83e740f810..00000000000 --- a/client/src/main/java/com/metamx/druid/client/cache/SnappyTranscoder.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.metamx.druid.client.cache; - -import net.spy.memcached.transcoders.SerializingTranscoder; -import org.xerial.snappy.Snappy; - -import java.io.IOException; - -public class SnappyTranscoder extends SerializingTranscoder -{ - public SnappyTranscoder() - { - super(); - } - - public SnappyTranscoder(int max) - { - super(max); - } - - @Override - protected byte[] compress(byte[] in) - { - if (in == null) { - throw new NullPointerException("Can't compress null"); - } - - byte[] out; - try { - out = Snappy.compress(in); - } catch(IOException e) { - throw new RuntimeException("IO exception compressing data", e); - } - getLogger().debug("Compressed %d bytes to %d", in.length, out.length); - return out; - } - - @Override - protected byte[] decompress(byte[] in) - { - byte[] out = null; - if(in != null) { - try { - out = Snappy.uncompress(in); - } catch (IOException e) { - getLogger().warn("Failed to decompress data", e); - } - } - return out == null ? null : out; - } -} diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java index 287d208db62..23ca0ea9693 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java @@ -52,8 +52,8 @@ import java.util.concurrent.TimeoutException; */ public class MemcachedCacheTest { - private static final byte[] HI = "hi".getBytes(); - private static final byte[] HO = "ho".getBytes(); + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); private MemcachedCache cache; @Before @@ -124,7 +124,13 @@ public class MemcachedCacheTest class MockMemcachedClient implements MemcachedClientIF { private final ConcurrentMap theMap = new ConcurrentHashMap(); - private final Transcoder transcoder = new SerializingTranscoder(); + private final SerializingTranscoder transcoder; + + public MockMemcachedClient() + { + transcoder = new LZ4Transcoder(); + transcoder.setCompressionThreshold(0); + } @Override public Collection getAvailableServers() diff --git a/pom.xml b/pom.xml index 0c97dd87ed7..478b080e74b 100644 --- a/pom.xml +++ b/pom.xml @@ -321,9 +321,9 @@ 1.9.11 - org.xerial.snappy - snappy-java - 1.0.5 + net.jpountz.lz4 + lz4 + 1.1.2 From 37a39940034422c7cc1d4eeaa6934c9d9933ed74 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 7 Jun 2013 14:08:51 -0700 Subject: [PATCH 3/3] add delegating executor service and fix bug with query priortization --- .../query/DelegatingExecutorService.java | 127 ++++++++++++++++++ .../query/MetricsEmittingExecutorService.java | 59 ++++---- .../query/PrioritizedExecutorService.java | 4 +- .../worker/WorkerCuratorCoordinator.java | 3 - .../com/metamx/druid/http/ComputeNode.java | 2 +- 5 files changed, 158 insertions(+), 37 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java diff --git a/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java b/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java new file mode 100644 index 00000000000..f9d377694e8 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java @@ -0,0 +1,127 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + */ +public class DelegatingExecutorService implements ExecutorService +{ + private final ExecutorService delegate; + + public DelegatingExecutorService(ExecutorService delegate) + { + this.delegate = delegate; + } + + @Override + public void shutdown() + { + delegate.shutdown(); + } + + @Override + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException + { + return delegate.awaitTermination(l, timeUnit); + } + + @Override + public Future submit(Callable tCallable) + { + return delegate.submit(tCallable); + } + + @Override + public Future submit(Runnable runnable, T t) + { + return delegate.submit(runnable, t); + } + + @Override + public Future submit(Runnable runnable) + { + return delegate.submit(runnable); + } + + @Override + public List> invokeAll(Collection> callables) throws InterruptedException + { + return delegate.invokeAll(callables); + } + + @Override + public List> invokeAll( + Collection> callables, + long l, + TimeUnit timeUnit + ) throws InterruptedException + { + return delegate.invokeAll(callables, l, timeUnit); + } + + @Override + public T invokeAny(Collection> callables) throws InterruptedException, ExecutionException + { + return delegate.invokeAny(callables); + } + + @Override + public T invokeAny( + Collection> callables, + long l, + TimeUnit timeUnit + ) throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(callables, l, timeUnit); + } + + @Override + public void execute(Runnable runnable) + { + delegate.execute(runnable); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java index 2e453ddbe0b..f109d2f783a 100644 --- a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java @@ -1,15 +1,40 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.query; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import java.util.Collection; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; -public class MetricsEmittingExecutorService extends AbstractExecutorService +public class MetricsEmittingExecutorService extends DelegatingExecutorService { private final ExecutorService base; private final ServiceEmitter emitter; @@ -21,41 +46,13 @@ public class MetricsEmittingExecutorService extends AbstractExecutorService ServiceMetricEvent.Builder metricBuilder ) { + super(base); + this.base = base; this.emitter = emitter; this.metricBuilder = metricBuilder; } - @Override - public void shutdown() - { - base.shutdown(); - } - - @Override - public List shutdownNow() - { - return base.shutdownNow(); - } - - @Override - public boolean isShutdown() - { - return base.isShutdown(); - } - - @Override - public boolean isTerminated() - { - return base.isTerminated(); - } - - @Override - public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException - { - return base.awaitTermination(l, timeUnit); - } - @Override public void execute(Runnable runnable) { diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java index 5e13184de8b..f943a0c112f 100644 --- a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java @@ -38,9 +38,9 @@ import java.util.concurrent.TimeUnit; */ public class PrioritizedExecutorService extends AbstractExecutorService { - public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) + public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) { - final ExecutorService service = new PrioritizedExecutorService( + final PrioritizedExecutorService service = new PrioritizedExecutorService( new ThreadPoolExecutor( config.getNumThreads(), config.getNumThreads(), diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java index 44b308165b8..77e70f52a5f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -32,10 +32,7 @@ import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.config.IndexerZkConfig; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.zookeeper.CreateMode; - import org.joda.time.DateTime; import java.util.Arrays; diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 44abd781931..7cd0990d392 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -97,7 +97,7 @@ public class ComputeNode extends BaseServerNode final List monitors = getMonitors(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); - final ExecutorService innerExecutorService = PrioritizedExecutorService.create( + final PrioritizedExecutorService innerExecutorService = PrioritizedExecutorService.create( getLifecycle(), getConfigFactory().buildWithReplacements( ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")