diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 30d0750f3d0..878f950f0c4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -281,7 +281,7 @@ public class HadoopIndexTask extends AbstractTask Jobby job = new HadoopDruidDetermineConfigurationJob(config); - log.info("Starting a hadoop index generator job..."); + log.info("Starting a hadoop determine configuration job..."); if (job.run()) { return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config)); } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index abbfb54139c..8ffe995ee76 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -32,6 +32,8 @@ import net.spy.memcached.FailureMode; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.ops.LinkedOperationQueueFactory; +import net.spy.memcached.ops.OperationQueueFactory; import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; @@ -56,7 +58,15 @@ public class MemcachedCache implements Cache // always use compression transcoder.setCompressionThreshold(0); - MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize()); + + OperationQueueFactory opQueueFactory; + long maxQueueBytes = config.getMaxOperationQueueSize(); + if(maxQueueBytes > 0) { + opQueueFactory = new MemcachedOperationQueueFactory(maxQueueBytes); + } else { + opQueueFactory = new LinkedOperationQueueFactory(); + } + return new MemcachedCache( new MemcachedClient( new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) @@ -68,7 +78,8 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) - .setOpQueueFactory(queueFactory) + .setReadBufferSize(config.getReadBufferSize()) + .setOpQueueFactory(opQueueFactory) .build(), AddrUtil.getAddresses(config.getHosts()) ), diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 2d8674cdd24..4a573e5d7d2 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -20,24 +20,38 @@ package io.druid.client.cache; import com.fasterxml.jackson.annotation.JsonProperty; +import net.spy.memcached.DefaultConnectionFactory; import javax.validation.constraints.NotNull; public class MemcachedCacheConfig { + // default to 30 day expiration for cache entries + // values greater than 30 days are interpreted by memcached as absolute POSIX timestamps instead of duration @JsonProperty - private int expiration = 2592000; // What is this number? + private int expiration = 30 * 24 * 3600; + @JsonProperty private int timeout = 500; + + // comma delimited list of memcached servers, given as host:port combination @JsonProperty @NotNull private String hosts; + @JsonProperty private int maxObjectSize = 50 * 1024 * 1024; + + // memcached client read buffer size, -1 uses the spymemcached library default + @JsonProperty + private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE; + @JsonProperty private String memcachedPrefix = "druid"; + + // maximum size in bytes of memcached client operation queue. 0 means unbounded @JsonProperty - private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB + private long maxOperationQueueSize = 0; public int getExpiration() { @@ -68,4 +82,9 @@ public class MemcachedCacheConfig { return maxOperationQueueSize; } + + public int getReadBufferSize() + { + return readBufferSize; + } } diff --git a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java index 5c9270edda7..726cf2efa23 100644 --- a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java @@ -20,14 +20,20 @@ package io.druid.cli; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.api.client.util.Lists; import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; +import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerJob; +import io.druid.indexer.JobHelper; +import io.druid.indexer.Jobby; import java.io.File; +import java.util.ArrayList; +import java.util.List; /** */ @@ -37,17 +43,20 @@ import java.io.File; ) public class CliInternalHadoopIndexer implements Runnable { + private static final Logger log = new Logger(CliHadoopIndexer.class); @Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true) private String argumentSpec; - private static final Logger log = new Logger(CliHadoopIndexer.class); - @Override public void run() { try { - final HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(getHadoopDruidIndexerConfig()); - job.run(); + HadoopDruidIndexerConfig config = getHadoopDruidIndexerConfig(); + List jobs = Lists.newArrayList(); + jobs.add(new HadoopDruidDetermineConfigurationJob(config)); + jobs.add(new HadoopDruidIndexerJob(config)); + JobHelper.runJobs(jobs, config); + } catch (Exception e) { throw Throwables.propagate(e);