Merge pull request #411 from metamx/small-fixes-docs-searchquery-limit

Small fixes
This commit is contained in:
fjy 2014-03-04 10:30:27 -07:00
commit 0feddc3831
7 changed files with 62 additions and 20 deletions

View File

@ -67,7 +67,7 @@ You can then use the EC2 dashboard to locate the instance and confirm that it ha
If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID: If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID:
Started cluster of 1 instances Started cluster of 1 instances
Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-master, druid-broker, druid-compute, druid-realtime], publicIp= ... Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-coordinator, druid-broker, druid-historical, druid-realtime], publicIp= ...
The final message will contain login information for the instance. The final message will contain login information for the instance.

View File

@ -95,7 +95,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Possible Values|Description|Default| |Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| |`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`| |`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to historical nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache #### Local Cache

View File

@ -14,7 +14,7 @@ As a special case, the absolute minimum setup is one of the standalone examples
Minimum Physical Layout: Experimental Testing with 4GB of RAM Minimum Physical Layout: Experimental Testing with 4GB of RAM
------------------------------------------------------------- -------------------------------------------------------------
This layout can be used to load some data from deep storage onto a Druid compute node for the first time. A minimal physical layout for a 1 or 2 core machine with 4GB of RAM is: This layout can be used to load some data from deep storage onto a Druid historical node for the first time. A minimal physical layout for a 1 or 2 core machine with 4GB of RAM is:
1. node1: [Coordinator](Coordinator.html) + metadata service + zookeeper + [Historical](Historical.html) 1. node1: [Coordinator](Coordinator.html) + metadata service + zookeeper + [Historical](Historical.html)
2. transient nodes: [Indexing Service](Indexing-Service.html) 2. transient nodes: [Indexing Service](Indexing-Service.html)
@ -37,7 +37,7 @@ A minimal physical layout not constrained by cores that demonstrates parallel qu
7. node7: [Realtime](Realtime.html) (m1.small or m1.medium or m1.large) 7. node7: [Realtime](Realtime.html) (m1.small or m1.medium or m1.large)
8. transient nodes: [Indexing Service](Indexing-Service.html) 8. transient nodes: [Indexing Service](Indexing-Service.html)
This layout naturally lends itself to adding more RAM and core to Compute nodes, and to adding many more Compute nodes. Depending on the actual load, the Master, metadata server, and Zookeeper might need to use larger machines. This layout naturally lends itself to adding more RAM and core to Historical nodes, and to adding many more Historical nodes. Depending on the actual load, the Coordinator, metadata server, and Zookeeper might need to use larger machines.
High Availability Physical Layout High Availability Physical Layout
--------------------------------- ---------------------------------
@ -63,14 +63,14 @@ An HA layout allows full rolling restarts and heavy volume:
Sizing for Cores and RAM Sizing for Cores and RAM
------------------------ ------------------------
The Compute and Broker nodes will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Compute daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. The Historical and Broker nodes will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries.
The effective utilization of cores by Zookeeper, MySQL, and Master nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB. The effective utilization of cores by Zookeeper, MySQL, and Coordinator nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB.
Storage Storage
------- -------
Indexed segments should be kept in a permanent store accessible by all nodes like AWS S3 or HDFS or equivalent. Currently Druid supports S3, but this will be extended soon. Indexed segments should be kept in a permanent store accessible by all nodes like AWS S3 or HDFS or equivalent. Refer [Deep-Storage](deep-storage.html) for more details on supported storage types.
Local disk ("ephemeral" on AWS EC2) for caching is recommended over network mounted storage (example of mounted: AWS EBS, Elastic Block Store) in order to avoid network delays during times of heavy usage. If your data center is suitably provisioned for networked storage, perhaps with separate LAN/NICs just for storage, then mounted might work fine. Local disk ("ephemeral" on AWS EC2) for caching is recommended over network mounted storage (example of mounted: AWS EBS, Elastic Block Store) in order to avoid network delays during times of heavy usage. If your data center is suitably provisioned for networked storage, perhaps with separate LAN/NICs just for storage, then mounted might work fine.
@ -85,7 +85,7 @@ druid.host=someHostOrIPaddrWithPort
druid.port=8080 druid.port=8080
``` ```
`druid.server.type` should be set to "historical" for your compute nodes and realtime for the realtime nodes. The master will only assign segments to a "historical" node and the broker has some intelligence around its ability to cache results when talking to a realtime node. This does not need to be set for the master or the broker. `druid.server.type` should be set to "historical" for your historical nodes and realtime for the realtime nodes. The Coordinator will only assign segments to a "historical" node and the broker has some intelligence around its ability to cache results when talking to a realtime node. This does not need to be set for the coordinator or the broker.
`druid.host` should be set to the hostname and port that can be used to talk to the given server process. Basically, someone should be able to send a request to http://${druid.host}/ and actually talk to the process. `druid.host` should be set to the hostname and port that can be used to talk to the given server process. Basically, someone should be able to send a request to http://${druid.host}/ and actually talk to the process.

View File

@ -67,9 +67,9 @@ Runtime.properties:
``` ```
druid.host=#{IP_ADDR}:8080 druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/compute/_default druid.service=druid/prod/historical/_default
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:#{DRUID_VERSION}"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -19,6 +19,7 @@
package io.druid.query.search; package io.druid.query.search;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
@ -32,18 +33,22 @@ import java.util.TreeSet;
/** /**
*/ */
public class SearchBinaryFn implements BinaryFn<Result<SearchResultValue>, Result<SearchResultValue>, Result<SearchResultValue>> public class SearchBinaryFn
implements BinaryFn<Result<SearchResultValue>, Result<SearchResultValue>, Result<SearchResultValue>>
{ {
private final SearchSortSpec searchSortSpec; private final SearchSortSpec searchSortSpec;
private final QueryGranularity gran; private final QueryGranularity gran;
private final int limit;
public SearchBinaryFn( public SearchBinaryFn(
SearchSortSpec searchSortSpec, SearchSortSpec searchSortSpec,
QueryGranularity granularity QueryGranularity granularity,
int limit
) )
{ {
this.searchSortSpec = searchSortSpec; this.searchSortSpec = searchSortSpec;
this.gran = granularity; this.gran = granularity;
this.limit = limit;
} }
@Override @Override
@ -65,7 +70,13 @@ public class SearchBinaryFn implements BinaryFn<Result<SearchResultValue>, Resul
results.addAll(Lists.newArrayList(arg2Vals)); results.addAll(Lists.newArrayList(arg2Vals));
return (gran instanceof AllGranularity) return (gran instanceof AllGranularity)
? new Result<SearchResultValue>(arg1.getTimestamp(), new SearchResultValue(Lists.newArrayList(results))) ? new Result<SearchResultValue>(
arg1.getTimestamp(), new SearchResultValue(
Lists.newArrayList(
Iterables.limit(results, limit)
)
)
)
: new Result<SearchResultValue>( : new Result<SearchResultValue>(
gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())), gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())),
new SearchResultValue(Lists.newArrayList(results)) new SearchResultValue(Lists.newArrayList(results))

View File

@ -101,7 +101,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
) )
{ {
SearchQuery query = (SearchQuery) input; SearchQuery query = (SearchQuery) input;
return new SearchBinaryFn(query.getSort(), query.getGranularity()); return new SearchBinaryFn(query.getSort(), query.getGranularity(), query.getLimit());
} }
}; };
} }

View File

@ -90,7 +90,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -138,7 +138,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.DAY).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.DAY, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -162,7 +162,7 @@ public class SearchBinaryFnTest
Result<SearchResultValue> expected = r1; Result<SearchResultValue> expected = r1;
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -210,7 +210,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -258,7 +258,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new StrlenSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new StrlenSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -282,7 +282,38 @@ public class SearchBinaryFnTest
Result<SearchResultValue> expected = r1; Result<SearchResultValue> expected = r1;
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue());
}
@Test
public void testMergeLimit(){
Result<SearchResultValue> r1 = new Result<SearchResultValue>(
currTime,
new SearchResultValue(
ImmutableList.<SearchHit>of(
new SearchHit(
"blah",
"foo"
)
)
)
);
Result<SearchResultValue> r2 = new Result<SearchResultValue>(
currTime,
new SearchResultValue(
ImmutableList.<SearchHit>of(
new SearchHit(
"blah2",
"foo2"
)
)
)
);
Result<SearchResultValue> expected = r1;
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, 1).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }