Merge branch 'master' into new-schema

This commit is contained in:
fjy 2014-03-04 14:42:16 -08:00
commit 0e8e527662
11 changed files with 167 additions and 59 deletions

View File

@ -67,7 +67,7 @@ You can then use the EC2 dashboard to locate the instance and confirm that it ha
If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID: If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID:
Started cluster of 1 instances Started cluster of 1 instances
Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-master, druid-broker, druid-compute, druid-realtime], publicIp= ... Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-coordinator, druid-broker, druid-historical, druid-realtime], publicIp= ...
The final message will contain login information for the instance. The final message will contain login information for the instance.

View File

@ -95,7 +95,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Possible Values|Description|Default| |Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| |`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`| |`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to historical nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache #### Local Cache

View File

@ -14,7 +14,7 @@ As a special case, the absolute minimum setup is one of the standalone examples
Minimum Physical Layout: Experimental Testing with 4GB of RAM Minimum Physical Layout: Experimental Testing with 4GB of RAM
------------------------------------------------------------- -------------------------------------------------------------
This layout can be used to load some data from deep storage onto a Druid compute node for the first time. A minimal physical layout for a 1 or 2 core machine with 4GB of RAM is: This layout can be used to load some data from deep storage onto a Druid historical node for the first time. A minimal physical layout for a 1 or 2 core machine with 4GB of RAM is:
1. node1: [Coordinator](Coordinator.html) + metadata service + zookeeper + [Historical](Historical.html) 1. node1: [Coordinator](Coordinator.html) + metadata service + zookeeper + [Historical](Historical.html)
2. transient nodes: [Indexing Service](Indexing-Service.html) 2. transient nodes: [Indexing Service](Indexing-Service.html)
@ -37,7 +37,7 @@ A minimal physical layout not constrained by cores that demonstrates parallel qu
7. node7: [Realtime](Realtime.html) (m1.small or m1.medium or m1.large) 7. node7: [Realtime](Realtime.html) (m1.small or m1.medium or m1.large)
8. transient nodes: [Indexing Service](Indexing-Service.html) 8. transient nodes: [Indexing Service](Indexing-Service.html)
This layout naturally lends itself to adding more RAM and core to Compute nodes, and to adding many more Compute nodes. Depending on the actual load, the Master, metadata server, and Zookeeper might need to use larger machines. This layout naturally lends itself to adding more RAM and core to Historical nodes, and to adding many more Historical nodes. Depending on the actual load, the Coordinator, metadata server, and Zookeeper might need to use larger machines.
High Availability Physical Layout High Availability Physical Layout
--------------------------------- ---------------------------------
@ -63,14 +63,14 @@ An HA layout allows full rolling restarts and heavy volume:
Sizing for Cores and RAM Sizing for Cores and RAM
------------------------ ------------------------
The Compute and Broker nodes will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Compute daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. The Historical and Broker nodes will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries.
The effective utilization of cores by Zookeeper, MySQL, and Master nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB. The effective utilization of cores by Zookeeper, MySQL, and Coordinator nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB.
Storage Storage
------- -------
Indexed segments should be kept in a permanent store accessible by all nodes like AWS S3 or HDFS or equivalent. Currently Druid supports S3, but this will be extended soon. Indexed segments should be kept in a permanent store accessible by all nodes like AWS S3 or HDFS or equivalent. Refer to [Deep-Storage](deep-storage.html) for more details on supported storage types.
Local disk ("ephemeral" on AWS EC2) for caching is recommended over network mounted storage (example of mounted: AWS EBS, Elastic Block Store) in order to avoid network delays during times of heavy usage. If your data center is suitably provisioned for networked storage, perhaps with separate LAN/NICs just for storage, then mounted might work fine. Local disk ("ephemeral" on AWS EC2) for caching is recommended over network mounted storage (example of mounted: AWS EBS, Elastic Block Store) in order to avoid network delays during times of heavy usage. If your data center is suitably provisioned for networked storage, perhaps with separate LAN/NICs just for storage, then mounted might work fine.
@ -85,7 +85,7 @@ druid.host=someHostOrIPaddrWithPort
druid.port=8080 druid.port=8080
``` ```
`druid.server.type` should be set to "historical" for your compute nodes and realtime for the realtime nodes. The master will only assign segments to a "historical" node and the broker has some intelligence around its ability to cache results when talking to a realtime node. This does not need to be set for the master or the broker. `druid.server.type` should be set to "historical" for your historical nodes and realtime for the realtime nodes. The Coordinator will only assign segments to a "historical" node and the broker has some intelligence around its ability to cache results when talking to a realtime node. This does not need to be set for the coordinator or the broker.
`druid.host` should be set to the hostname and port that can be used to talk to the given server process. Basically, someone should be able to send a request to http://${druid.host}/ and actually talk to the process. `druid.host` should be set to the hostname and port that can be used to talk to the given server process. Basically, someone should be able to send a request to http://${druid.host}/ and actually talk to the process.

View File

@ -4,7 +4,7 @@ layout: doc_page
# Deep Storage # Deep Storage
Deep storage is where segments are stored. It is a storage mechanism that Druid does not provide. This deep storage infrastructure defines the level of durability of your data, as long as Druid nodes can see this storage infrastructure and get at the segments stored on it, you will not lose data no matter how many Druid nodes you lose. If segments disappear from this storage layer, then you will lose whatever data those segments represented. Deep storage is where segments are stored. It is a storage mechanism that Druid does not provide. This deep storage infrastructure defines the level of durability of your data, as long as Druid nodes can see this storage infrastructure and get at the segments stored on it, you will not lose data no matter how many Druid nodes you lose. If segments disappear from this storage layer, then you will lose whatever data those segments represented.
The currently supported types of deep storage follow. The currently supported types of deep storage follow. Other deep-storage options, such as [Cassandra](http://planetcassandra.org/blog/post/cassandra-as-a-deep-storage-mechanism-for-druid-real-time-analytics-engine/), have been developed by members of the community.
## S3-compatible ## S3-compatible

View File

@ -67,9 +67,9 @@ Runtime.properties:
``` ```
druid.host=#{IP_ADDR}:8080 druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/compute/_default druid.service=druid/prod/historical/_default
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:#{DRUID_VERSION}"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -19,6 +19,7 @@
package io.druid.query.search; package io.druid.query.search;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
@ -32,18 +33,22 @@ import java.util.TreeSet;
/** /**
*/ */
public class SearchBinaryFn implements BinaryFn<Result<SearchResultValue>, Result<SearchResultValue>, Result<SearchResultValue>> public class SearchBinaryFn
implements BinaryFn<Result<SearchResultValue>, Result<SearchResultValue>, Result<SearchResultValue>>
{ {
private final SearchSortSpec searchSortSpec; private final SearchSortSpec searchSortSpec;
private final QueryGranularity gran; private final QueryGranularity gran;
private final int limit;
public SearchBinaryFn( public SearchBinaryFn(
SearchSortSpec searchSortSpec, SearchSortSpec searchSortSpec,
QueryGranularity granularity QueryGranularity granularity,
int limit
) )
{ {
this.searchSortSpec = searchSortSpec; this.searchSortSpec = searchSortSpec;
this.gran = granularity; this.gran = granularity;
this.limit = limit;
} }
@Override @Override
@ -65,7 +70,13 @@ public class SearchBinaryFn implements BinaryFn<Result<SearchResultValue>, Resul
results.addAll(Lists.newArrayList(arg2Vals)); results.addAll(Lists.newArrayList(arg2Vals));
return (gran instanceof AllGranularity) return (gran instanceof AllGranularity)
? new Result<SearchResultValue>(arg1.getTimestamp(), new SearchResultValue(Lists.newArrayList(results))) ? new Result<SearchResultValue>(
arg1.getTimestamp(), new SearchResultValue(
Lists.newArrayList(
Iterables.limit(results, limit)
)
)
)
: new Result<SearchResultValue>( : new Result<SearchResultValue>(
gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())), gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())),
new SearchResultValue(Lists.newArrayList(results)) new SearchResultValue(Lists.newArrayList(results))

View File

@ -101,7 +101,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
) )
{ {
SearchQuery query = (SearchQuery) input; SearchQuery query = (SearchQuery) input;
return new SearchBinaryFn(query.getSort(), query.getGranularity()); return new SearchBinaryFn(query.getSort(), query.getGranularity(), query.getLimit());
} }
}; };
} }

View File

@ -42,6 +42,7 @@ public class TmpFileIOPeon implements IOPeon
File retFile = createdFiles.get(filename); File retFile = createdFiles.get(filename);
if (retFile == null) { if (retFile == null) {
retFile = File.createTempFile("filePeon", filename); retFile = File.createTempFile("filePeon", filename);
retFile.deleteOnExit();
createdFiles.put(filename, retFile); createdFiles.put(filename, retFile);
} }
return new BufferedOutputStream(new FileOutputStream(retFile)); return new BufferedOutputStream(new FileOutputStream(retFile));

View File

@ -152,34 +152,30 @@ public class IncrementalIndex implements Iterable<Row>
} }
final List<String> rowDimensions = row.getDimensions(); final List<String> rowDimensions = row.getDimensions();
String[][] dims = new String[dimensionOrder.size()][];
String[][] dims;
List<String[]> overflow = null; List<String[]> overflow = null;
for (String dimension : rowDimensions) { synchronized (dimensionOrder) {
dimension = dimension.toLowerCase(); dims = new String[dimensionOrder.size()][];
List<String> dimensionValues = row.getDimension(dimension); for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
Integer index = dimensionOrder.get(dimension); if (overflow == null) {
if (index == null) { overflow = Lists.newArrayList();
synchronized (dimensionOrder) {
index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
} }
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
} }
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
} }
} }
if (overflow != null) { if (overflow != null) {
// Merge overflow and non-overflow // Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][]; String[][] newDims = new String[dims.length + overflow.size()][];
@ -292,8 +288,9 @@ public class IncrementalIndex implements Iterable<Row>
Aggregator[] prev = facts.putIfAbsent(key, aggs); Aggregator[] prev = facts.putIfAbsent(key, aggs);
if (prev != null) { if (prev != null) {
aggs = prev; aggs = prev;
} else {
numEntries.incrementAndGet();
} }
numEntries.incrementAndGet();
} }
synchronized (this) { synchronized (this) {

View File

@ -90,7 +90,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -138,7 +138,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.DAY).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.DAY, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -162,7 +162,7 @@ public class SearchBinaryFnTest
Result<SearchResultValue> expected = r1; Result<SearchResultValue> expected = r1;
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -210,7 +210,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -258,7 +258,7 @@ public class SearchBinaryFnTest
) )
); );
Result<SearchResultValue> actual = new SearchBinaryFn(new StrlenSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new StrlenSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }
@ -282,7 +282,38 @@ public class SearchBinaryFnTest
Result<SearchResultValue> expected = r1; Result<SearchResultValue> expected = r1;
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL).apply(r1, r2); Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, Integer.MAX_VALUE).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue());
}
@Test
public void testMergeLimit(){
Result<SearchResultValue> r1 = new Result<SearchResultValue>(
currTime,
new SearchResultValue(
ImmutableList.<SearchHit>of(
new SearchHit(
"blah",
"foo"
)
)
)
);
Result<SearchResultValue> r2 = new Result<SearchResultValue>(
currTime,
new SearchResultValue(
ImmutableList.<SearchHit>of(
new SearchHit(
"blah2",
"foo2"
)
)
)
);
Result<SearchResultValue> expected = r1;
Result<SearchResultValue> actual = new SearchBinaryFn(new LexicographicSearchSortSpec(), QueryGranularity.ALL, 1).apply(r1, r2);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertSearchMergeResult(expected.getValue(), actual.getValue()); assertSearchMergeResult(expected.getValue(), actual.getValue());
} }

View File

@ -24,18 +24,59 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** /**
*/ */
public class IncrementalIndexTest public class IncrementalIndexTest
{ {
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
{
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("Dim1", "DiM2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "DIM1", "3", "dIM2", "4")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("diM1", "dIM2"),
ImmutableMap.<String, Object>of("Dim1", "1", "DiM2", "2", "dim1", "3", "dim2", "4")
)
);
return index;
}
public static MapBasedInputRow getRow(long timestamp, int rowID, int dimensionCount)
{
List<String> dimensionList = new ArrayList<String>(dimensionCount);
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
for (int i = 0; i < dimensionCount; i++) {
String dimName = String.format("Dim_%d", i);
dimensionList.add(dimName);
builder.put(dimName, dimName + rowID);
}
return new MapBasedInputRow(timestamp, dimensionList, builder.build());
}
@Test @Test
public void testCaseInsensitivity() throws Exception public void testCaseInsensitivity() throws Exception
{ {
@ -58,25 +99,52 @@ public class IncrementalIndexTest
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
} }
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp) @Test
public void testConcurrentAdd() throws Exception
{ {
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); final IncrementalIndex index = new IncrementalIndex(
0L,
index.add( QueryGranularity.NONE,
new MapBasedInputRow( new AggregatorFactory[]{new CountAggregatorFactory("count")}
timestamp,
Arrays.asList("Dim1", "DiM2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "DIM1", "3", "dIM2", "4")
)
); );
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final long timestamp = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(threadCount);
for (int j = 0; j < threadCount; j++) {
executor.submit(
new Runnable()
{
@Override
public void run()
{
try {
for (int i = 0; i < elementsPerThread; i++) {
index.add(getRow(timestamp + i, i, dimensionCount));
}
}
catch (Exception e) {
e.printStackTrace();
}
latch.countDown();
}
}
);
}
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
index.add( Assert.assertEquals(dimensionCount, index.getDimensions().size());
new MapBasedInputRow( Assert.assertEquals(elementsPerThread, index.size());
timestamp, Iterator<Row> iterator = index.iterator();
Arrays.asList("diM1", "dIM2"), int curr = 0;
ImmutableMap.<String, Object>of("Dim1", "1", "DiM2", "2", "dim1", "3", "dim2", "4") while (iterator.hasNext()) {
) Row row = iterator.next();
); Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch());
return index; Assert.assertEquals(Float.valueOf(threadCount), row.getFloatMetric("count"));
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
} }
} }