This commit is contained in:
fjy 2015-06-29 10:49:30 -07:00
commit e42d5ac785
36 changed files with 843 additions and 83 deletions

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
public class ConciseComplementBenchmark
{
// Number of rows to read, the test will read random rows
@Param({"1000", "10000", "100000", "1000000", "1000000"})
int emptyRows;
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void uncompressed(Blackhole blackhole)
{
final ImmutableConciseSet set = ImmutableConciseSet.complement(null, emptyRows);
blackhole.consume(set);
assert (emptyRows == set.size());
}
}

View File

@ -68,7 +68,7 @@ The indexing service also uses its own set of paths. These configs can be includ
If `druid.zk.paths.base` and `druid.zk.paths.indexer.base` are both set, and none of the other `druid.zk.paths.*` or `druid.zk.paths.indexer.*` values are set, then the other properties will be evaluated relative to their respective `base`.
For example, if `druid.zk.paths.base` is set to `/druid1` and `druid.zk.paths.indexer.base` is set to `/druid2` then `druid.zk.paths.announcementsPath` will default to `/druid1/announcements` while `druid.zk.paths.indexer.announcementsPath` will default to `/druid2/announcements`.
The following path is used service discovery and are **not** affected by `druid.zk.paths.base` and **must** be specified separately.
The following path is used for service discovery. It is **not** affected by `druid.zk.paths.base` and **must** be specified separately.
|Property|Description|Default|
|--------|-----------|-------|

View File

@ -95,7 +95,7 @@ The overlord can dynamically change worker behavior.
The JSON object can be submitted to the overlord via a POST request at:
```
http://<COORDINATOR_IP>:<port>/druid/indexer/v1/worker
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker
```
Optional Header Parameters for auditing the config change can also be specified.
@ -153,7 +153,7 @@ Issuing a GET request at the same URL will return the current worker config spec
To view the audit history of worker config issue a GET request to the URL -
```
http://<COORDINATOR_IP>:<port>/druid/indexer/v1/worker/history?interval=<interval>
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker/history?interval=<interval>
```
default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in overlord runtime.properties.

View File

@ -39,7 +39,7 @@ This separation allows each node to only care about what it is best at. By separ
The following diagram shows how queries and data flow through this architecture, and which nodes (and external dependencies, discussed below) are involved:
<img src="../img/druid-dataflow-3.png" width="800"/>
<img src="../../img/druid-dataflow-3.png" width="800"/>
All nodes can be run in some highly available fashion, either as symmetric peers in a share-nothing cluster or as hot-swap failover nodes.
@ -51,7 +51,7 @@ Aside from these nodes, there are 3 external dependencies to the system:
The following diagram illustrates the cluster's management layer, showing how certain nodes and dependencies help manage the cluster by tracking and exchanging metadata:
<img src="../img/druid-manage-1.png" width="800"/>
<img src="../../img/druid-manage-1.png" width="800"/>
### Segments and Data Storage

View File

@ -13,7 +13,7 @@ Overlords and middle managers may run on the same node or across multiple nodes
Indexing Service Overview
-------------------------
![Indexing Service](../img/indexing_service.png "Indexing Service")
![Indexing Service](../../img/indexing_service.png "Indexing Service")
<!--
Preamble

View File

@ -19,7 +19,7 @@ Segment Propagation
The segment propagation diagram for real-time data ingestion can be seen below:
![Segment Propagation](../img/segmentPropagation.png "Segment Propagation")
![Segment Propagation](../../img/segmentPropagation.png "Segment Propagation")
You can read about the various components shown in this diagram under the Architecture section (see the menu on the right). Note that some of the names are now outdated.

View File

@ -4,7 +4,125 @@ layout: doc_page
Segments
========
Druid segments contain data for a time interval, stored as separate columns. Dimensions (string columns) have inverted indexes associated with them for each dimension value. Metric columns are LZ4 compressed.
Druid stores its index in *segment files*, which are partitioned by
time. In a basic setup, one segment file is created for each time
interval, where the time inteval is configurable in the
`segmentGranularity` parameter of the `granularitySpec`, which is
documented [here](../ingestion/batch-ingestion.html). For druid to
operate well under heavy query load, it is important for the segment
file size to be within the recommended range of 300mb-700mb. If your
segment files are larger than this range, then consider either
changing the the granularity of the time interval or partitioning your
data and tweaking the `targetPartitionSize` in your `partitioningSpec`
(a good starting point for this parameter is 5 million rows). See the
sharding section below and the 'Partitioning specification' section of
the [Batch ingestion](../ingestion/batch-ingestion.html) documentation
for more information.
### A segment file's core data structures
Here we describe the internal structure of segment files, which is
essentially *columnar*: the data for each column is laid out in
separate data structures. By storing each column separately, Druid can
decrease query latency by scanning only those columns actually needed
for a query. There are three basic column types: the timestamp
column, dimension columns, and metric columns, as illustrated in the
image below:
![Druid column types](../../img/druid-column-types.png "Druid Column Types")
The timestamp and metric columns are simple: behind the scenes each of
these is an array of integer or floating point values compressed with
LZ4. Once a query knows which rows it needs to select, it simply
decompresses these, pulls out the relevant rows, and applies the
desired aggregation operator. As with all columns, if a query doesnt
require a column, then that columns data is just skipped over.
Dimensions columns are different because they support filter and
group-by operations, so each dimension requires the following
three data structures:
1. A dictionary that maps values (which are always treated as strings) to integer IDs,
2. For each distinct value in the column, a bitmap that indicates which rows contain that value, and
3. A list of the columns values, encoded using the dictionary in 1.
Why these three data structures? The dictionary simply maps string
values to integer ids so that the values in 2 and 3 can be
represented compactly. The bitmaps in 2 -- also known as *inverted
indexes* allow for quick filtering operations (specifically, bitmaps
are convenient for quickly applying AND and OR operators). Finally,
the list of values in 3 are needed for *group by* and *TopN*
queries. In other words, queries that solely aggregate metrics based
on filters do not need to touch the list of dimension values stored in
3.
To get a concrete sense of these data structures, consider the page
column from the example data above. The three data structures that
represent this dimension are illustrated in the diagram below.
```
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}
2: Column data
[0,
0,
1,
1]
3: Bitmaps - one for each unique value of the column
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,0,1,1]
```
Note that the bitmap is different from the first two data structures:
whereas the first two grow linearly in the size of the data (in the
worst case), the size of the bitmap section is the product of data
size * column cardinality. Compression will help us here though
because we know that each row will have only non-zero entry in a only
a single bitmap. This means that high cardinality columns will have
extremely sparse, and therefore highly compressible, bitmaps. Druid
exploits this using compression algorithms that are specially suited
for bitmaps, such as roaring bitmap compression.
### Multi-value columns
If a data source makes use of multi-value columns, then the data
structures within the segment files look a bit different. Let's
imagine that in the example above, the second row were tagged with
both the 'Ke$ha' *and* 'Justin Bieber' topics. In this case, the three
data structures would now look as follows:
```
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}
2: Column data
[0,
[0,1], <--Row value of multi-value column can have array of values
1,
1]
3: Bitmaps - one for each unique value
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,1,1,1]
^
|
|
Multi-value column has multiple non-zero entries
```
Note the changes to the second row in the column data and the Ke$ha
bitmap. If a row has more than one value for a column, its entry in
the 'column data' is an array of values. Additionally, a row with *n*
values in a column columns will have *n* non-zero valued entries in
that column's bitmaps.
Naming Convention
-----------------
@ -17,7 +135,7 @@ datasource_intervalStart_intervalEnd_version_partitionNum
Segment Components
------------------
A segment is comprised of several files, listed below.
Behind the scenes, a segment is comprised of several files, listed below.
* `version.bin`
@ -52,4 +170,10 @@ Sharding Data to Create Segments
### Sharding Data by Dimension
If the cumulative total number of rows for the different values of a given column exceed some configurable threshold, multiple segments representing the same time interval for the same datasource may be created. These segments will contain some partition number as part of their identifier. Sharding by dimension reduces some of the the costs associated with operations over high cardinality dimensions.
If the cumulative total number of rows for the different values of a
given column exceed some configurable threshold, multiple segments
representing the same time interval for the same datasource may be
created. These segments will contain some partition number as part of
their identifier. Sharding by dimension reduces some of the the costs
associated with operations over high cardinality dimensions. For more
information on sharding, see the ingestion documentat

View File

@ -4,4 +4,4 @@ layout: doc_page
# Integrating Druid With Other Technologies
This page discusses how we can integrate druid with other technologies. Event streams can be stored in a distributed queue like Kafka, then it can be streamed to a distributed realtime computation system like Twitter Storm / Samza and then it can be feed into Druid via Tranquility plugin. With Tranquility, Middlemanager & Peons will act as a realtime node and they handle realtime queries, segment handoff and realtime indexing.
<img src="../img/druid-production.png" width="800"/>
<img src="../../img/druid-production.png" width="800"/>

View File

@ -38,7 +38,58 @@ See [Examples](../tutorials/examples.html). This firehose creates a stream of ra
#### RabbitMqFirehose
This firehose ingests events from a define rabbit-mq queue.
<br>
**Note:** Add **amqp-client-3.2.1.jar** to lib directory of druid to use this firehose.
<br>
A sample spec for rabbitmq firehose:
```json
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"port": "5672",
"username": "test-dude",
"password": "test-word",
"virtualHost": "test-vhost",
"uri": "amqp://mqserver:1234/vhost",
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
}
```
|property|description|Default|required?|
|--------|-----------|---------|
|host|The hostname of the RabbitMQ broker to connect to|localhost|no|
|port|The port number to connect to on the RabbitMQ broker|5672|no|
|username|The username to use to connect to RabbitMQ|guest|no|
|password|The password to use to connect to RabbitMQ|guest|no|
|virtualHost|The virtual host to connect to|/|no|
|uri|The URI string to use to connect to RabbitMQ| |no|
|exchange|The exchange to connect to| |yes|
|queue|The queue to connect to or create| |yes|
|routingKey|The routing key to use to bind the queue to the exchange| |yes|
|durable|Whether the queue should be durable|false|no|
|exclusive|Whether the queue should be exclusive|false|no|
|autoDelete|Whether the queue should auto-delete on disconnect|false|no|
|maxRetries|The max number of reconnection retry attempts| |yes|
|retryIntervalSeconds|The reconnection interval| |yes|
|maxDurationSeconds|The max duration of trying to reconnect| |yes|
#### LocalFirehose
This Firehose can be used to read the data from files on local disk.

View File

@ -251,7 +251,6 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}

View File

@ -29,7 +29,6 @@ Available Metrics
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/node/time`|Milliseconds taken to query individual historical/realtime nodes.|id, status, server.|< 1s|
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|
### Historical

View File

@ -92,7 +92,7 @@ All JavaScript functions must return numerical values.
```json
{
"type": "javascript",
"name": "sum(log(x)/y) + 10",
"name": "sum(log(x)*y) + 10",
"fieldNames": ["x", "y"],
"fnAggregate" : "function(current, a, b) { return current + (Math.log(a) * b); }",
"fnCombine" : "function(partialA, partialB) { return partialA + partialB; }",
@ -137,11 +137,11 @@ SELECT COUNT(DISTINCT(value)) FROM (
#### Cardinality by row
When setting `byRow` to `true` it computes the cardinality by row, i.e. the cardinality of distinct dimension combinations
When setting `byRow` to `true` it computes the cardinality by row, i.e. the cardinality of distinct dimension combinations.
This is equivalent to something akin to
```sql
SELECT COUNT(*) FROM ( SELECT DIM1, DIM2, DIM3 FROM <datasource> GROUP BY DIM1, DIM2, DIM3
SELECT COUNT(*) FROM ( SELECT DIM1, DIM2, DIM3 FROM <datasource> GROUP BY DIM1, DIM2, DIM3 )
```
**Example**

View File

@ -56,7 +56,7 @@ There are 7 main parts to a timeseries query:
|postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no|
|context|See [Context](../querying/query-context.html)|no|
To pull it all together, the above query would return 2 data points, one for each day between 2012-01-01 and 2012-01-03, from the "sample\_datasource" table. Each data point would be the (long) sum of sample\_fieldName1, the (double) sum of sample\_fieldName2 and the (double) the result of sample\_fieldName1 divided by sample\_fieldName2 for the filter set. The output looks like this:
To pull it all together, the above query would return 2 data points, one for each day between 2012-01-01 and 2012-01-03, from the "sample\_datasource" table. Each data point would be the (long) sum of sample\_fieldName1, the (double) sum of sample\_fieldName2 and the (double) result of sample\_fieldName1 divided by sample\_fieldName2 for the filter set. The output looks like this:
```json
[

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

View File

@ -81,6 +81,10 @@ public class HdfsStorageDruidModule implements DruidModule
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
final Configuration conf = new Configuration();
// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
conf.setClassLoader(getClass().getClassLoader());
if (props != null) {
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {

View File

@ -518,7 +518,7 @@ public class JobHelper
public static Path prependFSIfNullScheme(FileSystem fs, Path path)
{
if (path.toUri().getScheme() == null) {
path = new Path(fs.getUri().toString(), String.format("./%s", path));
path = fs.makeQualified(path);
}
return path;
}

View File

@ -128,7 +128,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
<version>0.1.5</version>
<version>0.1.6</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -431,7 +431,7 @@
<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>2.11.4</version>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>

View File

@ -44,7 +44,6 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.QueryableDruidServer;
@ -53,7 +52,6 @@ import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -69,7 +67,6 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -94,7 +91,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig;
private final ListeningExecutorService backgroundExecutorService;
private final ServiceEmitter emitter;
@Inject
public CachingClusteredClient(
@ -103,8 +99,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Cache cache,
@Smile ObjectMapper objectMapper,
@BackgroundCaching ExecutorService backgroundExecutorService,
CacheConfig cacheConfig,
ServiceEmitter emitter
CacheConfig cacheConfig
)
{
this.warehouse = warehouse;
@ -113,7 +108,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
this.objectMapper = objectMapper;
this.cacheConfig = cacheConfig;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
this.emitter = emitter;
serverView.registerSegmentCallback(
Executors.newFixedThreadPool(
@ -332,20 +326,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final DruidServer server = entry.getKey();
final List<SegmentDescriptor> descriptors = entry.getValue();
final QueryRunner clientQueryable = new MetricsEmittingQueryRunner(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
serverView.getQueryRunner(server),
"query/node/time",
ImmutableMap.of("server",server.getName())
);
final QueryRunner clientQueryable = serverView.getQueryRunner(server);
if (clientQueryable == null) {
log.error("WTF!? server[%s] doesn't have a client Queryable?", server);

View File

@ -17,16 +17,19 @@
package io.druid.client.cache;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm;
import net.spy.memcached.FailureMode;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture;
@ -49,6 +52,23 @@ public class MemcachedCache implements Cache
{
private static final Logger log = new Logger(MemcachedCache.class);
final static HashAlgorithm MURMUR3_128 = new HashAlgorithm()
{
final HashFunction fn = Hashing.murmur3_128();
@Override
public long hash(String k)
{
return fn.hashString(k, Charsets.UTF_8).asLong();
}
@Override
public String toString()
{
return fn.toString();
}
};
public static MemcachedCache create(final MemcachedCacheConfig config)
{
try {
@ -67,18 +87,22 @@ public class MemcachedCache implements Cache
return new MemcachedCache(
new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true)
.setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder)
.setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
.setReadBufferSize(config.getReadBufferSize())
.setOpQueueFactory(opQueueFactory)
.build(),
new MemcachedCustomConnectionFactoryBuilder()
// 1000 repetitions gives us good distribution with murmur3_128
// (approx < 5% difference in counts across nodes, with 5 cache nodes)
.setKetamaNodeRepetitions(1000)
.setHashAlg(MURMUR3_128)
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true)
.setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder)
.setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
.setReadBufferSize(config.getReadBufferSize())
.setOpQueueFactory(opQueueFactory)
.build(),
AddrUtil.getAddresses(config.getHosts())
),
config

View File

@ -0,0 +1,197 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import net.spy.memcached.ArrayModNodeLocator;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.FailureMode;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.KetamaNodeLocator;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.metrics.MetricCollector;
import net.spy.memcached.metrics.MetricType;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
class MemcachedCustomConnectionFactoryBuilder extends ConnectionFactoryBuilder
{
private int repetitions = new DefaultKetamaNodeLocatorConfiguration().getNodeRepetitions();
public MemcachedCustomConnectionFactoryBuilder setKetamaNodeRepetitions(int repetitions)
{
this.repetitions = repetitions;
return this;
}
// borrowed from ConnectionFactoryBuilder to allow setting number of repetitions for KetamaNodeLocator
@Override
public ConnectionFactory build()
{
return new DefaultConnectionFactory() {
@Override
public NodeLocator createLocator(List<MemcachedNode> nodes) {
switch (locator) {
case ARRAY_MOD:
return new ArrayModNodeLocator(nodes, getHashAlg());
case CONSISTENT:
return new KetamaNodeLocator(
nodes,
getHashAlg(),
new DefaultKetamaNodeLocatorConfiguration()
{
@Override
public int getNodeRepetitions()
{
return repetitions;
}
}
);
default:
throw new IllegalStateException("Unhandled locator type: " + locator);
}
}
@Override
public BlockingQueue<Operation> createOperationQueue() {
return opQueueFactory == null ? super.createOperationQueue()
: opQueueFactory.create();
}
@Override
public BlockingQueue<Operation> createReadOperationQueue() {
return readQueueFactory == null ? super.createReadOperationQueue()
: readQueueFactory.create();
}
@Override
public BlockingQueue<Operation> createWriteOperationQueue() {
return writeQueueFactory == null ? super.createReadOperationQueue()
: writeQueueFactory.create();
}
@Override
public Transcoder<Object> getDefaultTranscoder() {
return transcoder == null ? super.getDefaultTranscoder() : transcoder;
}
@Override
public FailureMode getFailureMode() {
return failureMode == null ? super.getFailureMode() : failureMode;
}
@Override
public HashAlgorithm getHashAlg() {
return hashAlg == null ? super.getHashAlg() : hashAlg;
}
public Collection<ConnectionObserver> getInitialObservers() {
return initialObservers;
}
@Override
public OperationFactory getOperationFactory() {
return opFact == null ? super.getOperationFactory() : opFact;
}
@Override
public long getOperationTimeout() {
return opTimeout == -1 ? super.getOperationTimeout() : opTimeout;
}
@Override
public int getReadBufSize() {
return readBufSize == -1 ? super.getReadBufSize() : readBufSize;
}
@Override
public boolean isDaemon() {
return isDaemon;
}
@Override
public boolean shouldOptimize() {
return shouldOptimize;
}
@Override
public boolean useNagleAlgorithm() {
return useNagle;
}
@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
@Override
public AuthDescriptor getAuthDescriptor() {
return authDescriptor;
}
@Override
public long getOpQueueMaxBlockTime() {
return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime
: super.getOpQueueMaxBlockTime();
}
@Override
public int getTimeoutExceptionThreshold() {
return timeoutExceptionThreshold;
}
@Override
public MetricType enableMetrics() {
return metricType == null ? super.enableMetrics() : metricType;
}
@Override
public MetricCollector getMetricCollector() {
return collector == null ? super.getMetricCollector() : collector;
}
@Override
public ExecutorService getListenerExecutorService() {
return executorService == null ? super.getListenerExecutorService() : executorService;
}
@Override
public boolean isDefaultExecutorService() {
return executorService == null;
}
@Override
public long getAuthWaitTime() {
return authWaitTime;
}
};
}
}

View File

@ -165,16 +165,15 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
@Override
public Optional<EntryType> withHandle(Handle handle) throws Exception
{
byte[] res = handle.createQuery(
String.format("SELECT payload FROM %s WHERE id = :id", entryTable)
)
.bind("id", entryId)
.map(ByteArrayMapper.FIRST)
.first();
return Optional.fromNullable(
jsonMapper.<EntryType>readValue(
handle.createQuery(
String.format("SELECT payload FROM %s WHERE id = :id", entryTable)
)
.bind("id", entryId)
.map(ByteArrayMapper.FIRST)
.first(),
entryType
)
res == null ? null : jsonMapper.<EntryType>readValue(res, entryType)
);
}
}
@ -190,16 +189,15 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
@Override
public Optional<StatusType> withHandle(Handle handle) throws Exception
{
byte[] res = handle.createQuery(
String.format("SELECT status_payload FROM %s WHERE id = :id", entryTable)
)
.bind("id", entryId)
.map(ByteArrayMapper.FIRST)
.first();
return Optional.fromNullable(
jsonMapper.<StatusType>readValue(
handle.createQuery(
String.format("SELECT status_payload FROM %s WHERE id = :id", entryTable)
)
.bind("id", entryId)
.map(ByteArrayMapper.FIRST)
.first(),
statusType
)
res == null ? null : jsonMapper.<StatusType>readValue(res, statusType)
);
}
}

View File

@ -19,6 +19,8 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -36,8 +38,8 @@ public class ForeverLoadRule extends LoadRule
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants
)
{
validateTieredReplicants(tieredReplicants);
this.tieredReplicants = tieredReplicants;
this.tieredReplicants = tieredReplicants == null ? ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS) : tieredReplicants;
validateTieredReplicants(this.tieredReplicants);
}
@Override

View File

@ -19,7 +19,9 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -41,9 +43,9 @@ public class IntervalLoadRule extends LoadRule
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants
)
{
validateTieredReplicants(tieredReplicants);
this.tieredReplicants = tieredReplicants == null ? ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS) : tieredReplicants;
validateTieredReplicants(this.tieredReplicants);
this.interval = interval;
this.tieredReplicants = tieredReplicants;
}
@Override

View File

@ -242,7 +242,11 @@ public abstract class LoadRule implements Rule
}
protected void validateTieredReplicants(Map<String, Integer> tieredReplicants){
if(tieredReplicants.size() == 0)
throw new IAE("A rule with empty tiered replicants is invalid");
for (Map.Entry<String, Integer> entry: tieredReplicants.entrySet()) {
if (entry.getValue() == null)
throw new IAE("Replicant value cannot be empty");
if (entry.getValue() < 0)
throw new IAE("Replicant value [%d] is less than 0, which is not allowed", entry.getValue());
}

View File

@ -19,7 +19,9 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -42,9 +44,9 @@ public class PeriodLoadRule extends LoadRule
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants
)
{
validateTieredReplicants(tieredReplicants);
this.tieredReplicants = tieredReplicants == null ? ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS) : tieredReplicants;
validateTieredReplicants(this.tieredReplicants);
this.period = period;
this.tieredReplicants = tieredReplicants;
}
@Override

View File

@ -217,7 +217,6 @@ public class CachingClusteredClientTest
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
protected ServiceEmitter emitter;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
@ -247,7 +246,6 @@ public class CachingClusteredClientTest
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
serverView = EasyMock.createStrictMock(TimelineServerView.class);
cache = MapCache.create(100000);
emitter = EasyMock.createStrictMock(ServiceEmitter.class);
client = makeClient(MoreExecutors.sameThreadExecutor());
servers = new DruidServer[]{
@ -2097,8 +2095,7 @@ public class CachingClusteredClientTest
{
return true;
}
},
emitter
}
);
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import net.spy.memcached.DefaultHashAlgorithm;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.KetamaNodeLocator;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
import org.apache.commons.codec.digest.DigestUtils;
import org.easymock.EasyMock;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@RunWith(Parameterized.class)
public class CacheDistributionTest
{
public static final int KEY_COUNT = 1_000_000;
@Parameterized.Parameters(name = "repetitions={0}, hash={1}")
public static Iterable<Object[]> data() {
List<HashAlgorithm> hash = ImmutableList.of(
DefaultHashAlgorithm.FNV1A_64_HASH, DefaultHashAlgorithm.KETAMA_HASH, MemcachedCache.MURMUR3_128
);
List<Integer> repetitions = Arrays.asList(160, 500, 1000, 2500, 5000);
Set<List<Object>> values = Sets.cartesianProduct(
Sets.newLinkedHashSet(hash),
Sets.newLinkedHashSet(repetitions)
);
return Iterables.transform(
values, new Function<List<Object>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<Object> input)
{
return input.toArray();
}
}
);
}
final HashAlgorithm hash;
final int reps;
@BeforeClass
public static void header() {
System.out.printf(
"%25s\t%5s\t%10s\t%10s\t%10s\t%10s\t%10s\t%7s\t%5s\n",
"hash", "reps", "node 1", "node 2", "node 3", "node 4", "node 5", "min/max", "ns"
);
}
public CacheDistributionTest(final HashAlgorithm hash, final int reps)
{
this.hash = hash;
this.reps = reps;
}
// run to get a sense of cache key distribution for different ketama reps / hash functions
@Test
public void testDistribution() throws Exception
{
KetamaNodeLocator locator = new KetamaNodeLocator(
ImmutableList.of(
dummyNode("druid-cache.0001", 11211),
dummyNode("druid-cache.0002", 11211),
dummyNode("druid-cache.0003", 11211),
dummyNode("druid-cache.0004", 11211),
dummyNode("druid-cache.0005", 11211)
),
hash,
new DefaultKetamaNodeLocatorConfiguration()
{
@Override
public int getNodeRepetitions()
{
return reps;
}
}
);
Map<MemcachedNode, AtomicLong> counter = Maps.newHashMap();
long t = 0;
for(int i = 0; i < KEY_COUNT; ++i) {
final String k = DigestUtils.sha1Hex("abc" + i) + ":" + DigestUtils.sha1Hex("xyz" + i);
long t0 = System.nanoTime();
MemcachedNode node = locator.getPrimary(k);
t += System.nanoTime() - t0;
if(counter.containsKey(node)) {
counter.get(node).incrementAndGet();
} else {
counter.put(node, new AtomicLong(1));
}
}
long min = Long.MAX_VALUE;
long max = 0;
System.out.printf("%25s\t%5d\t", hash, reps);
for(AtomicLong count : counter.values()) {
System.out.printf("%10d\t", count.get());
min = Math.min(min, count.get());
max = Math.max(max, count.get());
}
System.out.printf("%7.2f\t%5.0f\n", (double) min / (double) max, (double)t / KEY_COUNT);
}
private static MemcachedNode dummyNode(String host, int port) {
SocketAddress address = new InetSocketAddress(host, port);
MemcachedNode node = EasyMock.createNiceMock(MemcachedNode.class);
EasyMock.expect(node.getSocketAddress()).andReturn(address).anyTimes();
EasyMock.replay(node);
return node;
}
}

View File

@ -118,8 +118,12 @@ public class SQLMetadataStorageActionHandlerTest
handler.getEntry(entryId)
);
Assert.assertEquals(Optional.absent(), handler.getEntry("non_exist_entry"));
Assert.assertEquals(Optional.absent(), handler.getStatus(entryId));
Assert.assertEquals(Optional.absent(), handler.getStatus("non_exist_entry"));
Assert.assertTrue(handler.setStatus(entryId, true, status1));
Assert.assertEquals(
@ -179,6 +183,11 @@ public class SQLMetadataStorageActionHandlerTest
handler.insert(entryId, new DateTime("2014-01-01"), "test", entry, true, status);
Assert.assertEquals(
ImmutableList.of(),
handler.getLogs("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.of(),
handler.getLocks(entryId)
@ -206,6 +215,11 @@ public class SQLMetadataStorageActionHandlerTest
handler.insert(entryId, new DateTime("2014-01-01"), "test", entry, true, status);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
handler.getLocks("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
handler.getLocks(entryId)

View File

@ -0,0 +1,90 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.IAE;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class ForeverLoadRuleTest
{
@Test
public void testSerdeNullTieredReplicants() throws Exception
{
ForeverLoadRule rule = new ForeverLoadRule(
null
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
Assert.assertEquals(rule.getTieredReplicants(), ((ForeverLoadRule)reread).getTieredReplicants());
Assert.assertEquals(ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), rule.getTieredReplicants());
}
@Test
public void testMappingNullTieredReplicants() throws Exception{
String inputJson = "{\n"
+ " \"type\": \"loadForever\"\n"
+ "}";
String expectedJson = " {\n"
+ " \"tieredReplicants\": {\n"
+ " \""+ DruidServer.DEFAULT_TIER +"\": "+ DruidServer.DEFAULT_NUM_REPLICANTS +"\n"
+ " },\n"
+ " \"type\": \"loadForever\"\n"
+ " }";
ObjectMapper jsonMapper = new DefaultObjectMapper();
ForeverLoadRule inputForeverLoadRule = jsonMapper.readValue(inputJson, ForeverLoadRule.class);
ForeverLoadRule expectedForeverLoadRule = jsonMapper.readValue(expectedJson, ForeverLoadRule.class);
Assert.assertEquals(expectedForeverLoadRule.getTieredReplicants(), inputForeverLoadRule.getTieredReplicants());
}
@Test(expected = IAE.class)
public void testEmptyTieredReplicants() throws Exception
{
ForeverLoadRule rule = new ForeverLoadRule(
ImmutableMap.<String, Integer>of()
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
}
@Test(expected = IAE.class)
public void testEmptyReplicantValue() throws Exception
{
// Immutable map does not allow null values
Map<String, Integer> tieredReplicants= new HashMap<>();
tieredReplicants.put("tier", null);
ForeverLoadRule rule = new ForeverLoadRule(
tieredReplicants
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
}
}

View File

@ -21,8 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
/**
@ -42,4 +42,37 @@ import org.junit.Test;
Assert.assertEquals(rule, reread);
}
@Test
public void testSerdeNullTieredReplicants() throws Exception
{
IntervalLoadRule rule = new IntervalLoadRule(
new Interval("0/3000"), null
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
Assert.assertEquals(rule, reread);
Assert.assertEquals(ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), rule.getTieredReplicants());
}
@Test
public void testMappingNullTieredReplicants() throws Exception{
String inputJson = " {\n"
+ " \"interval\": \"0000-01-01T00:00:00.000-05:50:36/3000-01-01T00:00:00.000-06:00\",\n"
+ " \"type\": \"loadByInterval\"\n"
+ " }";
String expectedJson = "{\n"
+ " \"interval\": \"0000-01-01T00:00:00.000-05:50:36/3000-01-01T00:00:00.000-06:00\",\n"
+ " \"tieredReplicants\": {\n"
+ " \""+ DruidServer.DEFAULT_TIER +"\": "+ DruidServer.DEFAULT_NUM_REPLICANTS +"\n"
+ " },\n"
+ " \"type\": \"loadByInterval\"\n"
+ " }";
ObjectMapper jsonMapper = new DefaultObjectMapper();
IntervalLoadRule inputIntervalLoadRule = jsonMapper.readValue(inputJson, IntervalLoadRule.class);
IntervalLoadRule expectedIntervalLoadRule = jsonMapper.readValue(expectedJson, IntervalLoadRule.class);
Assert.assertEquals(expectedIntervalLoadRule, inputIntervalLoadRule);
}
}

View File

@ -17,7 +17,10 @@
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
@ -74,4 +77,39 @@ public class PeriodLoadRuleTest
)
);
}
@Test
public void testSerdeNullTieredReplicants() throws Exception
{
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P1D"), null
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
Assert.assertEquals(rule.getPeriod(), ((PeriodLoadRule)reread).getPeriod());
Assert.assertEquals(rule.getTieredReplicants(), ((PeriodLoadRule)reread).getTieredReplicants());
Assert.assertEquals(ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), rule.getTieredReplicants());
}
@Test
public void testMappingNullTieredReplicants() throws Exception{
String inputJson = "{\n"
+ " \"period\": \"P1D\",\n"
+ " \"type\": \"loadByPeriod\"\n"
+ " }";
String expectedJson = "{\n"
+ " \"period\": \"P1D\",\n"
+ " \"tieredReplicants\": {\n"
+ " \""+ DruidServer.DEFAULT_TIER +"\": "+ DruidServer.DEFAULT_NUM_REPLICANTS +"\n"
+ " },\n"
+ " \"type\": \"loadByPeriod\"\n"
+ " }";
ObjectMapper jsonMapper = new DefaultObjectMapper();
PeriodLoadRule inputPeriodLoadRule = jsonMapper.readValue(inputJson, PeriodLoadRule.class);
PeriodLoadRule expectedPeriodLoadRule = jsonMapper.readValue(expectedJson, PeriodLoadRule.class);
Assert.assertEquals(expectedPeriodLoadRule.getTieredReplicants(), inputPeriodLoadRule.getTieredReplicants());
Assert.assertEquals(expectedPeriodLoadRule.getPeriod(), inputPeriodLoadRule.getPeriod());
}
}