diff --git a/docs/content/Booting-a-production-cluster.md b/docs/content/Booting-a-production-cluster.md index 86d63ba6922..fc4f5593229 100644 --- a/docs/content/Booting-a-production-cluster.md +++ b/docs/content/Booting-a-production-cluster.md @@ -67,7 +67,7 @@ You can then use the EC2 dashboard to locate the instance and confirm that it ha If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID: Started cluster of 1 instances - Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-master, druid-broker, druid-compute, druid-realtime], publicIp= ... + Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-coordinator, druid-broker, druid-historical, druid-realtime], publicIp= ... The final message will contain login information for the instance. diff --git a/docs/content/Broker-Config.md b/docs/content/Broker-Config.md index 8872a8f2697..815173b4c36 100644 --- a/docs/content/Broker-Config.md +++ b/docs/content/Broker-Config.md @@ -95,7 +95,7 @@ The broker module uses several of the default modules in [Configuration](Configu |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| |`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| -|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`| +|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to historical nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`| #### Local Cache diff --git a/docs/content/Cluster-setup.md b/docs/content/Cluster-setup.md index e4ba0e564f1..e3d13d03563 100644 --- a/docs/content/Cluster-setup.md +++ b/docs/content/Cluster-setup.md @@ -14,7 +14,7 @@ As a special case, the absolute minimum setup is one of the standalone examples Minimum Physical Layout: Experimental Testing with 4GB of RAM ------------------------------------------------------------- -This layout can be used to load some data from deep storage onto a Druid compute node for the first time. A minimal physical layout for a 1 or 2 core machine with 4GB of RAM is: +This layout can be used to load some data from deep storage onto a Druid historical node for the first time. A minimal physical layout for a 1 or 2 core machine with 4GB of RAM is: 1. node1: [Coordinator](Coordinator.html) + metadata service + zookeeper + [Historical](Historical.html) 2. transient nodes: [Indexing Service](Indexing-Service.html) @@ -37,7 +37,7 @@ A minimal physical layout not constrained by cores that demonstrates parallel qu 7. node7: [Realtime](Realtime.html) (m1.small or m1.medium or m1.large) 8. transient nodes: [Indexing Service](Indexing-Service.html) -This layout naturally lends itself to adding more RAM and core to Compute nodes, and to adding many more Compute nodes. Depending on the actual load, the Master, metadata server, and Zookeeper might need to use larger machines. +This layout naturally lends itself to adding more RAM and core to Historical nodes, and to adding many more Historical nodes. Depending on the actual load, the Coordinator, metadata server, and Zookeeper might need to use larger machines. High Availability Physical Layout --------------------------------- @@ -63,14 +63,14 @@ An HA layout allows full rolling restarts and heavy volume: Sizing for Cores and RAM ------------------------ -The Compute and Broker nodes will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Compute daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. +The Historical and Broker nodes will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. -The effective utilization of cores by Zookeeper, MySQL, and Master nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB. +The effective utilization of cores by Zookeeper, MySQL, and Coordinator nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB. Storage ------- -Indexed segments should be kept in a permanent store accessible by all nodes like AWS S3 or HDFS or equivalent. Currently Druid supports S3, but this will be extended soon. +Indexed segments should be kept in a permanent store accessible by all nodes like AWS S3 or HDFS or equivalent. Refer [Deep-Storage](deep-storage.html) for more details on supported storage types. Local disk ("ephemeral" on AWS EC2) for caching is recommended over network mounted storage (example of mounted: AWS EBS, Elastic Block Store) in order to avoid network delays during times of heavy usage. If your data center is suitably provisioned for networked storage, perhaps with separate LAN/NICs just for storage, then mounted might work fine. @@ -85,7 +85,7 @@ druid.host=someHostOrIPaddrWithPort druid.port=8080 ``` -`druid.server.type` should be set to "historical" for your compute nodes and realtime for the realtime nodes. The master will only assign segments to a "historical" node and the broker has some intelligence around its ability to cache results when talking to a realtime node. This does not need to be set for the master or the broker. +`druid.server.type` should be set to "historical" for your historical nodes and realtime for the realtime nodes. The Coordinator will only assign segments to a "historical" node and the broker has some intelligence around its ability to cache results when talking to a realtime node. This does not need to be set for the coordinator or the broker. `druid.host` should be set to the hostname and port that can be used to talk to the given server process. Basically, someone should be able to send a request to http://${druid.host}/ and actually talk to the process. diff --git a/docs/content/Historical-Config.md b/docs/content/Historical-Config.md index d8d007b8589..6ef83196c61 100644 --- a/docs/content/Historical-Config.md +++ b/docs/content/Historical-Config.md @@ -67,9 +67,9 @@ Runtime.properties: ``` druid.host=#{IP_ADDR}:8080 druid.port=8080 -druid.service=druid/prod/compute/_default +druid.service=druid/prod/historical/_default -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:#{DRUID_VERSION}"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod diff --git a/processing/src/main/java/io/druid/query/search/SearchBinaryFn.java b/processing/src/main/java/io/druid/query/search/SearchBinaryFn.java index 99093911739..1c5a0b5668e 100644 --- a/processing/src/main/java/io/druid/query/search/SearchBinaryFn.java +++ b/processing/src/main/java/io/druid/query/search/SearchBinaryFn.java @@ -19,6 +19,7 @@ package io.druid.query.search; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.guava.nary.BinaryFn; @@ -32,18 +33,22 @@ import java.util.TreeSet; /** */ -public class SearchBinaryFn implements BinaryFn, Result, Result> +public class SearchBinaryFn + implements BinaryFn, Result, Result> { private final SearchSortSpec searchSortSpec; private final QueryGranularity gran; + private final int limit; public SearchBinaryFn( SearchSortSpec searchSortSpec, - QueryGranularity granularity + QueryGranularity granularity, + int limit ) { this.searchSortSpec = searchSortSpec; this.gran = granularity; + this.limit = limit; } @Override @@ -65,7 +70,13 @@ public class SearchBinaryFn implements BinaryFn, Resul results.addAll(Lists.newArrayList(arg2Vals)); return (gran instanceof AllGranularity) - ? new Result(arg1.getTimestamp(), new SearchResultValue(Lists.newArrayList(results))) + ? new Result( + arg1.getTimestamp(), new SearchResultValue( + Lists.newArrayList( + Iterables.limit(results, limit) + ) + ) + ) : new Result( gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())), new SearchResultValue(Lists.newArrayList(results)) diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 6dd2f64e03b..980e6dc2e5c 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -101,7 +101,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); + Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); assertSearchMergeResult(expected.getValue(), actual.getValue()); } @@ -138,7 +138,7 @@ public class SearchBinaryFnTest ) ); - Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.DAY).apply(r1, r2); + Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.DAY, Integer.MAX_VALUE).apply(r1, r2); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); assertSearchMergeResult(expected.getValue(), actual.getValue()); } @@ -162,7 +162,7 @@ public class SearchBinaryFnTest Result expected = r1; - Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); + Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); assertSearchMergeResult(expected.getValue(), actual.getValue()); } @@ -210,7 +210,7 @@ public class SearchBinaryFnTest ) ); - Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); + Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); assertSearchMergeResult(expected.getValue(), actual.getValue()); } @@ -258,7 +258,7 @@ public class SearchBinaryFnTest ) ); - Result actual = new SearchBinaryFn(new StrlenSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); + Result actual = new SearchBinaryFn(new StrlenSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); assertSearchMergeResult(expected.getValue(), actual.getValue()); } @@ -282,7 +282,38 @@ public class SearchBinaryFnTest Result expected = r1; - Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); + Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2); + Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); + assertSearchMergeResult(expected.getValue(), actual.getValue()); + } + + @Test + public void testMergeLimit(){ + Result r1 = new Result( + currTime, + new SearchResultValue( + ImmutableList.of( + new SearchHit( + "blah", + "foo" + ) + ) + ) + ); + + Result r2 = new Result( + currTime, + new SearchResultValue( + ImmutableList.of( + new SearchHit( + "blah2", + "foo2" + ) + ) + ) + ); + Result expected = r1; + Result actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, 1).apply(r1, r2); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); assertSearchMergeResult(expected.getValue(), actual.getValue()); }