From 36fc85736c3f4233dee9f28894b95e82467c34c6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 8 Jul 2014 18:01:31 +0530 Subject: [PATCH 01/11] Add ShardSpec Lookup Optimize choosing shardSpec for Hash Partitions --- .../indexer/HadoopDruidIndexerConfig.java | 56 +++++++++++++------ .../partition/HashBasedNumberedShardSpec.java | 20 +++++++ .../timeline/partition/LinearShardSpec.java | 16 ++++++ .../timeline/partition/NumberedShardSpec.java | 16 ++++++ .../partition/SingleDimensionShardSpec.java | 19 +++++++ 5 files changed, 110 insertions(+), 17 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 8f0f2d43c34..11acfd8df6e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Maps; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -30,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; @@ -51,6 +53,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; +import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,6 +63,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; @@ -169,6 +173,8 @@ public class HadoopDruidIndexerConfig private volatile HadoopIngestionSpec schema; private volatile PathSpec pathSpec; private volatile ColumnConfig columnConfig; + private volatile Map shardSpecLookups = Maps.newHashMap(); + private volatile Map hadoopShardSpecLookup = Maps.newHashMap(); @JsonCreator public HadoopDruidIndexerConfig( @@ -178,6 +184,30 @@ public class HadoopDruidIndexerConfig this.columnConfig = columnConfig; this.schema = schema; this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class); + for (Map.Entry> entry : schema.getTuningConfig().getShardSpecs().entrySet()) { + if (entry.getValue() == null || entry.getValue().isEmpty()) { + continue; + } + final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec(); + shardSpecLookups.put( + entry.getKey(), actualSpec.getLookup( + Lists.transform( + entry.getValue(), new Function() + { + @Nullable + @Override + public ShardSpec apply(@Nullable HadoopyShardSpec input) + { + return input.getActualSpec(); + } + } + ) + ) + ); + for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) { + hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec); + } + } } @JsonProperty @@ -306,25 +336,17 @@ public class HadoopDruidIndexerConfig return Optional.absent(); } - final List shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart()); - if (shards == null || shards.isEmpty()) { - return Optional.absent(); - } + final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow); + final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec); - for (final HadoopyShardSpec hadoopyShardSpec : shards) { - final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec(); - if (actualSpec.isInChunk(inputRow)) { - return Optional.of( - new Bucket( - hadoopyShardSpec.getShardNum(), - timeBucket.get().getStart(), - actualSpec.getPartitionNum() - ) - ); - } - } + return Optional.of( + new Bucket( + hadoopyShardSpec.getShardNum(), + timeBucket.get().getStart(), + actualSpec.getPartitionNum() + ) + ); - throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); } public Optional> getSegmentGranularIntervals() diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 5110f886601..ac8b570578b 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -25,12 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import java.util.List; +import java.util.Map; public class HashBasedNumberedShardSpec extends NumberedShardSpec { @@ -74,4 +76,22 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec '}'; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + final ImmutableMap.Builder shardSpecsMapBuilder = ImmutableMap.builder(); + for (ShardSpec spec : shardSpecs) { + shardSpecsMapBuilder.put(spec.getPartitionNum(), spec); + } + final Map shardSpecMap = shardSpecsMapBuilder.build(); + + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecMap.get((long) hash(row) % getPartitions()); + } + }; + } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index ea7d3256229..6f9dd6258e0 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; +import java.util.List; +import java.util.Set; + public class LinearShardSpec implements ShardSpec { private int partitionNum; @@ -42,6 +45,19 @@ public class LinearShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecs.get(0); + } + }; + } + @Override public PartitionChunk createChunk(T obj) { return new LinearPartitionChunk(partitionNum, obj); diff --git a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java index 73a3437a80a..683836d95fb 100644 --- a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java @@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; +import java.util.List; +import java.util.Set; + public class NumberedShardSpec implements ShardSpec { @JsonIgnore @@ -52,6 +55,19 @@ public class NumberedShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecs.get(0); + } + }; + } + @JsonProperty("partitions") public int getPartitions() { diff --git a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java index 3cb3e5a72ba..197d5e6129f 100644 --- a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java @@ -20,6 +20,7 @@ package io.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; import io.druid.data.input.InputRow; import java.util.List; @@ -94,6 +95,24 @@ public class SingleDimensionShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + for (ShardSpec spec : shardSpecs) { + if (spec.isInChunk(row)) { + return spec; + } + } + throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); + } + }; + } + public void setPartitionNum(int partitionNum) { this.partitionNum = partitionNum; From fa43049240e69b9a4dfb344b97f762d57fabd3fe Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 10 Jul 2014 11:48:46 +0530 Subject: [PATCH 02/11] review comments & pom changes --- .../indexer/HadoopDruidIndexerConfig.java | 7 ++----- pom.xml | 2 +- .../partition/HashBasedNumberedShardSpec.java | 21 ++++++++++++------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 11acfd8df6e..1ff6e5a4482 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -36,17 +36,16 @@ import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; -import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; +import io.druid.guice.GuiceInjectors; import io.druid.guice.JsonConfigProvider; import io.druid.guice.annotations.Self; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.path.PathSpec; -import io.druid.guice.GuiceInjectors; import io.druid.initialization.Initialization; import io.druid.segment.column.ColumnConfig; import io.druid.segment.indexing.granularity.GranularitySpec; @@ -63,7 +62,6 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; @@ -194,9 +192,8 @@ public class HadoopDruidIndexerConfig Lists.transform( entry.getValue(), new Function() { - @Nullable @Override - public ShardSpec apply(@Nullable HadoopyShardSpec input) + public ShardSpec apply(HadoopyShardSpec input) { return input.getActualSpec(); } diff --git a/pom.xml b/pom.xml index 2b7f88276e8..7d270176674 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.4 + 0.2.5 diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index ac8b570578b..afcb2feb922 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -25,14 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; +import java.util.Collections; +import java.util.Comparator; import java.util.List; -import java.util.Map; public class HashBasedNumberedShardSpec extends NumberedShardSpec { @@ -79,18 +79,25 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec @Override public ShardSpecLookup getLookup(final List shardSpecs) { - final ImmutableMap.Builder shardSpecsMapBuilder = ImmutableMap.builder(); - for (ShardSpec spec : shardSpecs) { - shardSpecsMapBuilder.put(spec.getPartitionNum(), spec); + // Sort on basis of partitionNumber + Collections.sort( + shardSpecs, new Comparator() + { + @Override + public int compare(ShardSpec o1, ShardSpec o2) + { + return Integer.compare(o1.getPartitionNum(), o2.getPartitionNum()); + } } - final Map shardSpecMap = shardSpecsMapBuilder.build(); + ); return new ShardSpecLookup() { @Override public ShardSpec getShardSpec(InputRow row) { - return shardSpecMap.get((long) hash(row) % getPartitions()); + int index = (int) ((long) hash(row)) % getPartitions(); + return shardSpecs.get(index); } }; } From a12688bc8bab700899af0a525a76c22d79aceab8 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 10 Jul 2014 12:21:27 +0530 Subject: [PATCH 03/11] fix partitionNum calculation & remove extra sorting --- pom.xml | 2 +- .../partition/HashBasedNumberedShardSpec.java | 16 +--------------- .../shard/HashBasedNumberedShardSpecTest.java | 12 ++++++++++++ 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 7d270176674..dfa203ec905 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.5 + 0.2.5-SNAPSHOT diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index afcb2feb922..8f347ee6cfd 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -30,8 +30,6 @@ import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; -import java.util.Collections; -import java.util.Comparator; import java.util.List; public class HashBasedNumberedShardSpec extends NumberedShardSpec @@ -79,24 +77,12 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec @Override public ShardSpecLookup getLookup(final List shardSpecs) { - // Sort on basis of partitionNumber - Collections.sort( - shardSpecs, new Comparator() - { - @Override - public int compare(ShardSpec o1, ShardSpec o2) - { - return Integer.compare(o1.getPartitionNum(), o2.getPartitionNum()); - } - } - ); - return new ShardSpecLookup() { @Override public ShardSpec getShardSpec(InputRow row) { - int index = (int) ((long) hash(row)) % getPartitions(); + int index = Math.abs(hash(row) % getPartitions()); return shardSpecs.get(index); } }; diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index bfd573dae89..5f176015c2a 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -194,4 +194,16 @@ public class HashBasedNumberedShardSpecTest return 0; } } + + @Test + public void testValidity(){ + for(int i=Integer.MIN_VALUE;i<=Integer.MAX_VALUE;i++){ + { + int partitionNum = Math.abs((int) ((long) i % 2)); + if(partitionNum != 0 && partitionNum != 1){ + throw new ISE("for i "+ i+ "partitionNum "+ partitionNum); + } + } + } + } } From a31376ee8371a62a092662c04ddde3a360d0d009 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 10 Jul 2014 12:23:20 +0530 Subject: [PATCH 04/11] point to correct version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index dfa203ec905..7d270176674 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.5-SNAPSHOT + 0.2.5 From f56d60b4511d15c3caaeda1dee56b8deb3f14c4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 14 Jul 2014 16:24:01 -0700 Subject: [PATCH 05/11] fix storage class npe --- .../druid/storage/s3/S3DataSegmentMover.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index ea585a0be44..379dd8374fa 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentMover; @@ -120,23 +121,30 @@ public class S3DataSegmentMover implements DataSegmentMover if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) { log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path); - } else if (s3Client.getObjectDetails(s3Bucket, s3Path) - .getStorageClass() - .equals(S3Object.STORAGE_CLASS_GLACIER)) { - log.warn("Cannot move file[s3://%s/%s] of storage class glacier."); } else { - log.info( - "Moving file[s3://%s/%s] to [s3://%s/%s]", - s3Bucket, - s3Path, - targetS3Bucket, - targetS3Path - ); - final S3Object target = new S3Object(targetS3Path); - if(!config.getDisableAcl()) { - target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, ""); + if (list.length == 0) { + // should never happen + throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path); + } + final S3Object s3Object = list[0]; + if (s3Object.getStorageClass() != null && + s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) { + log.warn("Cannot move file[s3://%s/%s] of storage class glacier, skipping."); + } else { + log.info( + "Moving file[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3Path, + targetS3Bucket, + targetS3Path + ); + final S3Object target = new S3Object(targetS3Path); + if (!config.getDisableAcl()) { + target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + } + s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false); } - s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false); } } else { // ensure object exists in target location From 12bc3ac27bed120cb17bca69a719a044cbcdee3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 15 Jul 2014 10:58:46 -0700 Subject: [PATCH 06/11] fix broken S3 moving test --- .../storage/s3/S3DataSegmentMoverTest.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java b/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java index 9497d9a05fc..b5bad120d38 100644 --- a/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java +++ b/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java @@ -133,15 +133,18 @@ public class S3DataSegmentMoverTest } @Override - public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException + public S3Object[] listObjects(String bucketName, String objectKey, String separator) { - if (isObjectInBucket(bucketName, objectKey)) { - final S3Object object = new S3Object(objectKey); - object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD); - return object; - } else { - return null; + try { + if (isObjectInBucket(bucketName, objectKey)) { + final S3Object object = new S3Object(objectKey); + object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD); + return new S3Object[]{object}; + } + } catch (ServiceException e) { + // return empty list } + return new S3Object[]{}; } @Override From bdfeccd092b3e0155715d338d851452f90c2eba9 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 15 Jul 2014 15:57:58 -0700 Subject: [PATCH 07/11] doc and timeout fix --- docs/content/Production-Cluster-Configuration.md | 2 ++ .../main/java/io/druid/segment/loading/SegmentLoaderConfig.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/content/Production-Cluster-Configuration.md b/docs/content/Production-Cluster-Configuration.md index 6eeb3a0c97b..689a34b98a9 100644 --- a/docs/content/Production-Cluster-Configuration.md +++ b/docs/content/Production-Cluster-Configuration.md @@ -4,6 +4,8 @@ layout: doc_page Production Cluster Configuration ================================ +__This configuration is an example of what a production cluster could look like. Many other hardware combinations are possible! Cheaper hardware is absolutely possible.__ + This production Druid cluster assumes that MySQL and Zookeeper are already set up. The deep storage that is used for examples is S3 and memcached is used for a distributed cache. The nodes that respond to queries (Historical, Broker, and Middle manager nodes) will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. SSDs are highly recommended for Historical nodes not all data is loaded in available memory. diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index cefb275e946..2cba40194a0 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -38,7 +38,7 @@ public class SegmentLoaderConfig private boolean deleteOnRemove = true; @JsonProperty("dropSegmentDelayMillis") - private int dropSegmentDelayMillis = 5 * 60 * 1000; // 5 mins + private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds @JsonProperty private File infoDir = null; From 27c47507803729774b54977844d54495cfcb5b3b Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 15 Jul 2014 16:00:56 -0700 Subject: [PATCH 08/11] fix infinite loop in test --- .../server/shard/HashBasedNumberedShardSpecTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index 5f176015c2a..2ab82655bf0 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -196,14 +196,15 @@ public class HashBasedNumberedShardSpecTest } @Test - public void testValidity(){ - for(int i=Integer.MIN_VALUE;i<=Integer.MAX_VALUE;i++){ + public void testValidity() + { + for (int i = Integer.MIN_VALUE; i < Integer.MAX_VALUE; i++) { { int partitionNum = Math.abs((int) ((long) i % 2)); - if(partitionNum != 0 && partitionNum != 1){ - throw new ISE("for i "+ i+ "partitionNum "+ partitionNum); + if (partitionNum != 0 && partitionNum != 1) { + throw new ISE("for i " + i + "partitionNum " + partitionNum); } } - } + } } } From 935b642b2cf6ae58bbee2d6251b9c9ec659ba673 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 15 Jul 2014 16:13:11 -0700 Subject: [PATCH 09/11] fix overlord binding problems --- services/src/main/java/io/druid/cli/CliOverlord.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 9f95feb43ad..95780b57156 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -44,6 +44,7 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; @@ -114,6 +115,7 @@ public class CliOverlord extends ServerRunnable public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class); + JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class); From b74845e94293e619fc6d26d8a70c4088250834a1 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 16 Jul 2014 09:39:51 +0530 Subject: [PATCH 10/11] fix test committed it by mistake. --- .../shard/HashBasedNumberedShardSpecTest.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index 2ab82655bf0..bfd573dae89 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -194,17 +194,4 @@ public class HashBasedNumberedShardSpecTest return 0; } } - - @Test - public void testValidity() - { - for (int i = Integer.MIN_VALUE; i < Integer.MAX_VALUE; i++) { - { - int partitionNum = Math.abs((int) ((long) i % 2)); - if (partitionNum != 0 && partitionNum != 1) { - throw new ISE("for i " + i + "partitionNum " + partitionNum); - } - } - } - } } From 534acc9eab8748d0131174ec478bb40ab1f65f41 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 15 Jul 2014 22:50:38 -0700 Subject: [PATCH 11/11] fix mm --- services/src/main/java/io/druid/cli/CliMiddleManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index e3356012358..381b382269a 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -35,6 +35,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; +import io.druid.indexing.common.config.TaskConfig; import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.indexing.overlord.ForkingTaskRunner; import io.druid.indexing.overlord.TaskRunner; @@ -75,6 +76,7 @@ public class CliMiddleManager extends ServerRunnable { IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); + JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class); binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);