From 04f3d0a13aef16bb81a0f678f32e4faf4ab68f09 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 11 Mar 2014 16:36:13 +0530 Subject: [PATCH 1/5] fix cli hadoop indexer * run determine partitions job using CLI --- .../indexing/common/task/HadoopIndexTask.java | 2 +- .../io/druid/cli/CliInternalHadoopIndexer.java | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 30d0750f3d0..878f950f0c4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -281,7 +281,7 @@ public class HadoopIndexTask extends AbstractTask Jobby job = new HadoopDruidDetermineConfigurationJob(config); - log.info("Starting a hadoop index generator job..."); + log.info("Starting a hadoop determine configuration job..."); if (job.run()) { return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config)); } diff --git a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java index 5c9270edda7..726cf2efa23 100644 --- a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java @@ -20,14 +20,20 @@ package io.druid.cli; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.api.client.util.Lists; import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; +import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerJob; +import io.druid.indexer.JobHelper; +import io.druid.indexer.Jobby; import java.io.File; +import java.util.ArrayList; +import java.util.List; /** */ @@ -37,17 +43,20 @@ import java.io.File; ) public class CliInternalHadoopIndexer implements Runnable { + private static final Logger log = new Logger(CliHadoopIndexer.class); @Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true) private String argumentSpec; - private static final Logger log = new Logger(CliHadoopIndexer.class); - @Override public void run() { try { - final HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(getHadoopDruidIndexerConfig()); - job.run(); + HadoopDruidIndexerConfig config = getHadoopDruidIndexerConfig(); + List jobs = Lists.newArrayList(); + jobs.add(new HadoopDruidDetermineConfigurationJob(config)); + jobs.add(new HadoopDruidIndexerJob(config)); + JobHelper.runJobs(jobs, config); + } catch (Exception e) { throw Throwables.propagate(e); From b5d9876b775d096070fa3b9f6c7c506b705ad78c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 11 Mar 2014 09:56:45 -0700 Subject: [PATCH 2/5] allow disabling of memcached op-queue limit, default to disabled --- .../java/io/druid/client/cache/MemcachedCache.java | 14 ++++++++++++-- .../druid/client/cache/MemcachedCacheConfig.java | 4 +++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index abbfb54139c..15d88e36347 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -32,6 +32,8 @@ import net.spy.memcached.FailureMode; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.ops.LinkedOperationQueueFactory; +import net.spy.memcached.ops.OperationQueueFactory; import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; @@ -56,7 +58,15 @@ public class MemcachedCache implements Cache // always use compression transcoder.setCompressionThreshold(0); - MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize()); + + OperationQueueFactory opQueueFactory; + long maxQueueBytes = config.getMaxOperationQueueSize(); + if(maxQueueBytes > 0) { + opQueueFactory = new MemcachedOperationQueueFactory(maxQueueBytes); + } else { + opQueueFactory = new LinkedOperationQueueFactory(); + } + return new MemcachedCache( new MemcachedClient( new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) @@ -68,7 +78,7 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) - .setOpQueueFactory(queueFactory) + .setOpQueueFactory(opQueueFactory) .build(), AddrUtil.getAddresses(config.getHosts()) ), diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 2d8674cdd24..fa40fd7d6c3 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -36,8 +36,10 @@ public class MemcachedCacheConfig private int maxObjectSize = 50 * 1024 * 1024; @JsonProperty private String memcachedPrefix = "druid"; + + // maximum size in bytes of memcached client operation queue. 0 means unlimited @JsonProperty - private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB + private long maxOperationQueueSize = 0; public int getExpiration() { From 8baac724c3f09ec729051c5332374634e3a2ce37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 11 Mar 2014 09:57:27 -0700 Subject: [PATCH 3/5] add comments --- .../java/io/druid/client/cache/MemcachedCacheConfig.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index fa40fd7d6c3..db23ec743af 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -25,15 +25,22 @@ import javax.validation.constraints.NotNull; public class MemcachedCacheConfig { + // default to 30 day expiration for cache entries + // values greater than 30 days are interpreted by memcached as absolute POSIX timestamps instead of duration @JsonProperty - private int expiration = 2592000; // What is this number? + private int expiration = 30 * 24 * 3600; + @JsonProperty private int timeout = 500; + + // comma delimited list of memcached servers, given as host:port combination @JsonProperty @NotNull private String hosts; + @JsonProperty private int maxObjectSize = 50 * 1024 * 1024; + @JsonProperty private String memcachedPrefix = "druid"; From 44f9b9cec98108141c7a72e8f3ee0d9bb3404def Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 11 Mar 2014 10:37:23 -0700 Subject: [PATCH 4/5] add readBufferSize option to memcached client config --- .../java/io/druid/client/cache/MemcachedCache.java | 1 + .../io/druid/client/cache/MemcachedCacheConfig.java | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 15d88e36347..8ffe995ee76 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -78,6 +78,7 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()) .setOpQueueFactory(opQueueFactory) .build(), AddrUtil.getAddresses(config.getHosts()) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index db23ec743af..adebbec8d03 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -41,10 +41,14 @@ public class MemcachedCacheConfig @JsonProperty private int maxObjectSize = 50 * 1024 * 1024; + // memcached client read buffer size, -1 uses the spymemcached library default + @JsonProperty + private int readBufferSize = -1; + @JsonProperty private String memcachedPrefix = "druid"; - // maximum size in bytes of memcached client operation queue. 0 means unlimited + // maximum size in bytes of memcached client operation queue. 0 means unbounded @JsonProperty private long maxOperationQueueSize = 0; @@ -77,4 +81,9 @@ public class MemcachedCacheConfig { return maxOperationQueueSize; } + + public int getReadBufferSize() + { + return readBufferSize; + } } From b15b544429f76af520b930673fb947c7cf8ca130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 11 Mar 2014 12:39:08 -0700 Subject: [PATCH 5/5] make defaults more explicit --- .../main/java/io/druid/client/cache/MemcachedCacheConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index adebbec8d03..4a573e5d7d2 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -20,6 +20,7 @@ package io.druid.client.cache; import com.fasterxml.jackson.annotation.JsonProperty; +import net.spy.memcached.DefaultConnectionFactory; import javax.validation.constraints.NotNull; @@ -43,7 +44,7 @@ public class MemcachedCacheConfig // memcached client read buffer size, -1 uses the spymemcached library default @JsonProperty - private int readBufferSize = -1; + private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE; @JsonProperty private String memcachedPrefix = "druid";