1) make chat handler resource work again

2) add more default configs
3) make examples work again
This commit is contained in:
fjy 2013-10-02 14:22:39 -07:00
parent 216bed9c36
commit bc8db7daa5
42 changed files with 335 additions and 260 deletions

View File

@ -54,6 +54,6 @@ public class CassandraDruidModule implements DruidModule
.addBinding("c*")
.to(CassandraDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", CassandraDataSegmentConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", CassandraDataSegmentConfig.class);
}
}

View File

@ -215,7 +215,7 @@ public class DbConnector
public void createConfigTable()
{
if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getConfigTable());
createConfigTable(dbi, dbTables.get().getConfigTable());
}
}

View File

@ -33,8 +33,9 @@ public class DbTablesConfig
return new DbTablesConfig(base, null, null, null, null, null, null);
}
@JsonProperty
@NotNull
private static String defaultBase = "druid";
@JsonProperty("base")
private final String base;
@JsonProperty("segments")
@ -66,7 +67,7 @@ public class DbTablesConfig
@JsonProperty("taskLock") String taskLockTable
)
{
this.base = base;
this.base = (base == null) ? defaultBase : base;
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config");

View File

@ -68,8 +68,8 @@ druid.processing.numThreads=3
druid.computation.buffer.size=100000000
# S3 dest for realtime indexer
druid.pusher.s3.bucket=
druid.pusher.s3.baseKey=
druid.storage.s3.bucket=
druid.storage.s3.baseKey=
druid.bard.cache.sizeInBytes=40000000
druid.master.merger.service=blah_blah
@ -86,8 +86,8 @@ These properties are for connecting with S3 and using it to pull down segments.
|--------|-----------|-------|
|`com.metamx.aws.accessKey`|The access key to use to access S3.|none|
|`com.metamx.aws.secretKey`|The secret key to use to access S3.|none|
|`druid.pusher.s3.bucket`|The bucket to store segments, this is used by Realtime and the Indexing service.|none|
|`druid.pusher.s3.baseKey`|The base key to use when storing segments, this is used by Realtime and the Indexing service|none|
|`druid.storage.s3.bucket`|The bucket to store segments, this is used by Realtime and the Indexing service.|none|
|`druid.storage.s3.baseKey`|The base key to use when storing segments, this is used by Realtime and the Indexing service|none|
### JDBC connection

View File

@ -14,8 +14,8 @@ S3 configuration parameters are
```
com.metamx.aws.accessKey=<S3 access key>
com.metamx.aws.secretKey=<S3 secret_key>
druid.pusher.s3.bucket=<bucket to store in>
druid.pusher.s3.baseKey=<base key prefix to use, i.e. what directory>
druid.storage.s3.bucket=<bucket to store in>
druid.storage.s3.baseKey=<base key prefix to use, i.e. what directory>
```
## HDFS
@ -25,8 +25,8 @@ As of 0.4.0, HDFS can be used for storage of segments as well.
In order to use hdfs for deep storage, you need to set the following configuration on your realtime nodes.
```
druid.pusher.hdfs=true
druid.pusher.hdfs.storageDirectory=<directory for storing segments>
druid.storage.hdfs=true
druid.storage.hdfs.storageDirectory=<directory for storing segments>
```
If you are using the Hadoop indexer, set your output directory to be a location on Hadoop and it will work
@ -39,10 +39,10 @@ A local mount can be used for storage of segments as well. This allows you to u
In order to use a local mount for deep storage, you need to set the following configuration on your realtime nodes.
```
druid.pusher.local=true
druid.pusher.local.storageDirectory=<directory for storing segments>
druid.storage.local=true
druid.storage.local.storageDirectory=<directory for storing segments>
```
Note that you should generally set `druid.pusher.local.storageDirectory` to something different from `druid.paths.indexCache`.
Note that you should generally set `druid.storage.local.storageDirectory` to something different from `druid.paths.indexCache`.
If you are using the Hadoop indexer in local mode, then just give it a local file as your output directory and it will work.

View File

@ -81,7 +81,7 @@ Instructions for booting a Zookeeper and then Kafka cluster are available [here]
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.storage.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
@ -262,7 +262,7 @@ If you've already setup a realtime node, be aware that although you can run mult
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.storage.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
@ -316,7 +316,7 @@ If you've already setup a realtime node, be aware that although you can run mult
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.storage.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
@ -334,8 +334,8 @@ If you've already setup a realtime node, be aware that although you can run mult
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
# Setup local storage mode
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true
druid.storage.local.storageDirectory=/tmp/druid/localStorage
druid.storage.local=true
```
2. Launch the compute node:

View File

@ -32,7 +32,7 @@ Before we start querying druid, we're going to finish setting up a complete clus
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.storage.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
@ -49,8 +49,8 @@ Before we start querying druid, we're going to finish setting up a complete clus
druid.paths.indexCache=/tmp/druid/indexCache
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true
druid.storage.local.storageDirectory=/tmp/druid/localStorage
druid.storage.local=true
# thread pool size for servicing queries
druid.client.http.connections=30

View File

@ -63,4 +63,4 @@ DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
echo "Running command:"
(set -x; java ${JAVA_ARGS} -classpath ${DRUID_CP} io.druid.cli.Main druid server realtimeStandalone)
(set -x; java ${JAVA_ARGS} -classpath ${DRUID_CP} io.druid.cli.Main example realtime)

View File

@ -22,9 +22,9 @@ First create the schema above. (I use a new keyspace called `druid`)
Then, add the following properties to your properties file to enable a Cassandra
backend.
druid.pusher.cassandra=true
druid.pusher.cassandra.host=localhost:9160
druid.pusher.cassandra.keyspace=druid
druid.storage.cassandra=true
druid.storage.cassandra.host=localhost:9160
druid.storage.cassandra.keyspace=druid
Use the `druid-development@googlegroups.com` mailing list if you have questions,
or feel free to reach out directly: `bone@alumni.brown.edu`.

View File

@ -1,15 +1,5 @@
druid.host=127.0.0.1:8080
druid.port=8080
druid.host=localhost
druid.service=broker
druid.port=8080
# logging
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# zk
druid.zk.service.host=localhost
druid.zk.paths.base=/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
# thread pool size for servicing queries
druid.client.http.connections=10
druid.zk.service.host=localhost

View File

@ -1,28 +0,0 @@
druid.host=127.0.0.1:8081
druid.port=8081
druid.service=compute
# logging
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# zk
druid.zk.service.host=localhost
druid.zk.paths.base=/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
# processing
druid.processing.buffer.sizeBytes=10000000
# aws (dummy user)
com.metamx.aws.accessKey=AKIAIMKECRUYKDQGR6YQ
com.metamx.aws.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
# Path on local FS for storage of segments; dir will be created if needed
druid.paths.indexCache=/tmp/druid/indexCache
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
# server
druid.server.maxSize=100000000

View File

@ -0,0 +1,12 @@
druid.host=localhost
druid.service=master
druid.port=8082
druid.zk.service.host=localhost
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.s3.secretKey=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
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd

View File

@ -0,0 +1,15 @@
druid.host=localhost
druid.service=compute
druid.port=8081
druid.zk.service.host=localhost
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=100000000
druid.processing.buffer.sizeBytes=10000000
druid.segmentCache.infoPath=/tmp/druid/segmentInfoCache
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]

View File

@ -1,27 +0,0 @@
druid.host=127.0.0.1:8082
druid.port=8082
druid.service=master
# logging
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# zk
druid.zk.service.host=localhost
druid.zk.paths.base=/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
# aws (dummy user)
com.metamx.aws.accessKey=AKIAIMKECRUYKDQGR6YQ
com.metamx.aws.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
# db
druid.database.segmentTable=segments
druid.database.user=druid
druid.database.password=diurd
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.database.ruleTable=rules
druid.database.configTable=config
# master runtime configs
druid.master.startDelay=PT60S

View File

@ -1,29 +1,61 @@
[{
"schema" : { "dataSource":"druidtest",
"aggregators":[ {"type":"count", "name":"impressions"},
{"type":"doubleSum","name":"wp","fieldName":"wp"}],
"indexGranularity":"minute",
"shardSpec" : { "type": "none" } },
"config" : { "maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT10m" },
"firehose" : { "type" : "kafka-0.7.2",
"consumerProps" : { "zk.connect" : "localhost:2181",
"zk.connectiontimeout.ms" : "15000",
"zk.sessiontimeout.ms" : "15000",
"zk.synctime.ms" : "5000",
"groupid" : "topic-pixel-local",
"fetch.size" : "1048586",
"autooffset.reset" : "largest",
"autocommit.enable" : "false" },
"feed" : "druidtest",
"parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"] } },
"plumber" : { "type" : "realtime",
"windowPeriod" : "PT10m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": {"type": "messageTime"} }
}]
[
{
"schema": {
"dataSource": "druidtest",
"aggregators": [
{
"type": "count",
"name": "impressions"
},
{
"type": "doubleSum",
"name": "wp",
"fieldName": "wp"
}
],
"indexGranularity": "minute",
"shardSpec": {
"type": "none"
}
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "topic-pixel-local",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "druidtest",
"parser": {
"timestampSpec": {
"column": "utcdt",
"format": "iso"
},
"data": {
"format": "json"
},
"dimensionExclusions": [
"wp"
]
}
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
}
}
]

View File

@ -1,37 +1,16 @@
druid.host=127.0.0.1:8083
druid.port=8083
druid.host=localhost
druid.service=realtime
druid.port=8083
# logging
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# zk
druid.zk.service.host=localhost
druid.zk.paths.base=/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
# processing
druid.processing.buffer.sizeBytes=10000000
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.s3.secretKey=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
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
# schema
druid.realtime.specFile=config/realtime/realtime.spec
# aws (dummy user)
com.metamx.aws.accessKey=AKIAIMKECRUYKDQGR6YQ
com.metamx.aws.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
# db
druid.database.segmentTable=segments
druid.database.user=druid
druid.database.password=diurd
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.database.ruleTable=rules
druid.database.configTable=config
# Path on local FS for storage of segments; dir will be created if needed
druid.paths.indexCache=/tmp/druid/indexCache
# handoff
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true
druid.processing.buffer.sizeBytes=10000000

View File

@ -54,12 +54,6 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
this.handlers = Maps.newConcurrentMap();
}
@Override
public String getType()
{
return "eventReceiving";
}
@Override
public void register(final String service, ChatHandler handler)
{

View File

@ -24,7 +24,6 @@ import com.google.inject.Inject;
import io.druid.indexing.common.index.ChatHandler;
import io.druid.indexing.common.index.ChatHandlerProvider;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
@ -40,7 +39,6 @@ public class ChatHandlerResource
this.handlers = handlers;
}
@POST
@Path("/chat/{id}")
public Object doTaskChat(
@PathParam("id") String handlerId

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
/**
*/
public class QueryConfig
{
@JsonProperty
private Period chunkPeriod = Period.months(1);
public Period getChunkPeriod()
{
return chunkPeriod;
}
}

View File

@ -20,10 +20,11 @@
package io.druid.query.groupby;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
/**
*/
public class GroupByQueryConfig
public class GroupByQueryConfig extends QueryConfig
{
@JsonProperty
private boolean singleThreaded = false;

View File

@ -37,6 +37,7 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -210,4 +211,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
return TYPE_REFERENCE;
}
@Override
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod());
}
}

View File

@ -53,7 +53,6 @@ import io.druid.query.search.search.SearchQueryConfig;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -257,7 +256,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(QueryRunner<Result<SearchResultValue>> runner)
{
return new SearchThresholdAdjustingQueryRunner(
new IntervalChunkingQueryRunner<Result<SearchResultValue>>(runner, Period.months(1)),
new IntervalChunkingQueryRunner<Result<SearchResultValue>>(runner, config.getChunkPeriod()),
config
);
}

View File

@ -20,12 +20,13 @@
package io.druid.query.search.search;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
import javax.validation.constraints.Min;
/**
*/
public class SearchQueryConfig
public class SearchQueryConfig extends QueryConfig
{
@JsonProperty
@Min(1)

View File

@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
@ -35,6 +36,7 @@ import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -47,7 +49,6 @@ import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -68,6 +69,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE =
new TypeReference<Result<TimeseriesResultValue>>() {};
private final QueryConfig config;
@Inject
public TimeseriesQueryQueryToolChest(QueryConfig config)
{
this.config = config;
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> mergeResults(QueryRunner<Result<TimeseriesResultValue>> queryRunner)
{
@ -251,7 +260,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
{
return new IntervalChunkingQueryRunner<Result<TimeseriesResultValue>>(runner, Period.months(1));
return new IntervalChunkingQueryRunner<Result<TimeseriesResultValue>>(runner, config.getChunkPeriod());
}
public Ordering<Result<TimeseriesResultValue>> getOrdering()

View File

@ -19,10 +19,12 @@
package io.druid.query.timeseries;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -37,13 +39,31 @@ import java.util.concurrent.ExecutorService;
public class TimeseriesQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeseriesResultValue>, TimeseriesQuery>
{
private static final TimeseriesQueryQueryToolChest toolChest = new TimeseriesQueryQueryToolChest();
private static final TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
public static TimeseriesQueryRunnerFactory create()
{
return new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine()
);
}
private final TimeseriesQueryQueryToolChest toolChest;
private final TimeseriesQueryEngine engine;
@Inject
public TimeseriesQueryRunnerFactory(
TimeseriesQueryQueryToolChest toolChest,
TimeseriesQueryEngine engine
)
{
this.toolChest = toolChest;
this.engine = engine;
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> createRunner(final Segment segment)
{
return new TimeseriesQueryRunner(segment);
return new TimeseriesQueryRunner(engine, segment.asStorageAdapter());
}
@Override
@ -64,11 +84,13 @@ public class TimeseriesQueryRunnerFactory
private static class TimeseriesQueryRunner implements QueryRunner<Result<TimeseriesResultValue>>
{
private final TimeseriesQueryEngine engine;
private final StorageAdapter adapter;
public TimeseriesQueryRunner(Segment segment)
private TimeseriesQueryRunner(TimeseriesQueryEngine engine, StorageAdapter adapter)
{
this.adapter = segment.asStorageAdapter();
this.engine = engine;
this.adapter = adapter;
}
@Override

View File

@ -87,7 +87,7 @@ public class TimeseriesQueryRunnerBonusTest
private static List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
{
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
final QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
final QueryRunner<Result<TimeseriesResultValue>> runner = makeQueryRunner(
factory,
new IncrementalIndexSegment(index)

View File

@ -60,7 +60,9 @@ public class TimeseriesQueryRunnerTest
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(new TimeseriesQueryRunnerFactory());
return QueryRunnerTestHelper.makeQueryRunners(
TimeseriesQueryRunnerFactory.create()
);
}
private final QueryRunner runner;
@ -81,7 +83,10 @@ public class TimeseriesQueryRunnerTest
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.asList(QueryRunnerTestHelper.rowsCount, QueryRunnerTestHelper.indexDoubleSum)
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
@ -271,7 +276,13 @@ public class TimeseriesQueryRunnerTest
)
)
)
.granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles")))
.granularity(
new PeriodGranularity(
new Period("P1D"),
null,
DateTimeZone.forID("America/Los_Angeles")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
@ -375,50 +386,56 @@ public class TimeseriesQueryRunnerTest
@Test
public void testTimeseriesGranularityNotAlignedOnSegmentBoundariesWithFilter()
{
TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.filters(QueryRunnerTestHelper.providerDimension, "spot", "upfront", "total_market")
.granularity(new PeriodGranularity(new Period("P7D"), null, DateTimeZone.forID("America/Los_Angeles")))
.intervals(
Arrays.asList(
new Interval(
"2011-01-12T00:00:00.000-08:00/2011-01-20T00:00:00.000-08:00"
)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
{
TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.filters(QueryRunnerTestHelper.providerDimension, "spot", "upfront", "total_market")
.granularity(
new PeriodGranularity(
new Period("P7D"),
null,
DateTimeZone.forID("America/Los_Angeles")
)
)
.intervals(
Arrays.asList(
new Interval(
"2011-01-12T00:00:00.000-08:00/2011-01-20T00:00:00.000-08:00"
)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-01-06T00:00:00.000-08:00", DateTimeZone.forID("America/Los_Angeles")),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6071L)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-01-13T00:00:00.000-08:00", DateTimeZone.forID("America/Los_Angeles")),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 91L, "idx", 33382L)
)
)
);
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-01-06T00:00:00.000-08:00", DateTimeZone.forID("America/Los_Angeles")),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6071L)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-01-13T00:00:00.000-08:00", DateTimeZone.forID("America/Los_Angeles")),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 91L, "idx", 33382L)
)
)
);
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
runner.run(query1),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
}
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
runner.run(query1),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
}
@Test
public void testTimeseriesWithVaryingGranWithFilter()
@ -588,7 +605,12 @@ public class TimeseriesQueryRunnerTest
.granularity(QueryRunnerTestHelper.dayGran)
.filters(new RegexDimFilter(QueryRunnerTestHelper.providerDimension, "^.p.*$")) // spot and upfront
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.rowsCount, QueryRunnerTestHelper.indexLongSum))
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
@ -1341,7 +1363,11 @@ public class TimeseriesQueryRunnerTest
.value("spot")
.build(),
Druids.newOrDimFilterBuilder()
.fields(QueryRunnerTestHelper.qualityDimension, "automotive", "business")
.fields(
QueryRunnerTestHelper.qualityDimension,
"automotive",
"business"
)
.build()
)
)

View File

@ -434,7 +434,7 @@ public class SpatialFilterBonusTest
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -516,7 +516,7 @@ public class SpatialFilterBonusTest
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()

View File

@ -464,7 +464,7 @@ public class SpatialFilterTest
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -546,7 +546,7 @@ public class SpatialFilterTest
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()

View File

@ -50,7 +50,7 @@ public class DataSegmentPusherPullerModule implements Module
bindDeepStorageHdfs(binder);
PolyBind.createChoice(
binder, "druid.pusher.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
);
}
@ -65,7 +65,7 @@ public class DataSegmentPusherPullerModule implements Module
.addBinding("local")
.to(LocalDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", LocalDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class);
}
private static void bindDeepStorageS3(Binder binder)
@ -79,7 +79,7 @@ public class DataSegmentPusherPullerModule implements Module
.addBinding("s3")
.to(S3DataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
}
private static void bindDeepStorageHdfs(Binder binder)
@ -95,6 +95,6 @@ public class DataSegmentPusherPullerModule implements Module
.addBinding("hdfs")
.to(HdfsDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", HdfsDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);
}
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryToolChest;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
@ -63,6 +64,7 @@ public class QueryToolChestModule implements Module
binder.bind(entry.getValue()).in(LazySingleton.class);
}
JsonConfigProvider.bind(binder, "druid.query", QueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
}

View File

@ -28,7 +28,7 @@ import java.io.File;
public class LocalDataSegmentPusherConfig
{
@JsonProperty
public File storageDirectory;
public File storageDirectory = new File("/tmp/druid/localStorage");
public File getStorageDirectory()
{

View File

@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class CuratorDiscoveryConfig
{
@JsonProperty
private String path = null;
private String path = "/druid/discovery";
public String getPath()
{

View File

@ -25,7 +25,10 @@ import org.skife.config.Config;
public abstract class ZkPathsConfig
{
@Config("druid.zk.paths.base")
public abstract String getZkBasePath();
public String getZkBasePath()
{
return "druid";
}
@Config("druid.zk.paths.propertiesPath")
public String getPropertiesPath()

View File

@ -27,14 +27,14 @@ import java.util.Set;
/**
*/
public class MergerWhitelist
public class DatasourceWhitelist
{
public static final String CONFIG_KEY = "merger.whitelist";
public static final String CONFIG_KEY = "master.whitelist";
private final Set<String> dataSources;
@JsonCreator
public MergerWhitelist(Set<String> dataSources)
public DatasourceWhitelist(Set<String> dataSources)
{
this.dataSources = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
this.dataSources.addAll(dataSources);

View File

@ -486,7 +486,7 @@ public class DruidMaster
masterRunnables.add(
Pair.of(
new MasterIndexingServiceRunnable(
makeIndexingServiceHelpers(configManager.watch(MergerWhitelist.CONFIG_KEY, MergerWhitelist.class))
makeIndexingServiceHelpers(configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class))
),
config.getMasterSegmentMergerPeriod()
)
@ -559,7 +559,7 @@ public class DruidMaster
}
}
private List<DruidMasterHelper> makeIndexingServiceHelpers(final AtomicReference<MergerWhitelist> whitelistRef)
private List<DruidMasterHelper> makeIndexingServiceHelpers(final AtomicReference<DatasourceWhitelist> whitelistRef)
{
List<DruidMasterHelper> helpers = Lists.newArrayList();
@ -597,11 +597,11 @@ public class DruidMaster
public static class DruidMasterVersionConverter implements DruidMasterHelper
{
private final IndexingServiceClient indexingServiceClient;
private final AtomicReference<MergerWhitelist> whitelistRef;
private final AtomicReference<DatasourceWhitelist> whitelistRef;
public DruidMasterVersionConverter(
IndexingServiceClient indexingServiceClient,
AtomicReference<MergerWhitelist> whitelistRef
AtomicReference<DatasourceWhitelist> whitelistRef
)
{
this.indexingServiceClient = indexingServiceClient;
@ -611,7 +611,7 @@ public class DruidMaster
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MergerWhitelist whitelist = whitelistRef.get();
DatasourceWhitelist whitelist = whitelistRef.get();
for (DataSegment dataSegment : params.getAvailableSegments()) {
if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) {

View File

@ -31,7 +31,7 @@ public abstract class DruidMasterConfig
public abstract String getHost();
@Config("druid.master.startDelay")
@Default("PT600s")
@Default("PT60s")
public abstract Duration getMasterStartDelay();
@Config("druid.master.period")

View File

@ -52,11 +52,11 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
private static final Logger log = new Logger(DruidMasterSegmentMerger.class);
private final IndexingServiceClient indexingServiceClient;
private final AtomicReference<MergerWhitelist> whiteListRef;
private final AtomicReference<DatasourceWhitelist> whiteListRef;
public DruidMasterSegmentMerger(
IndexingServiceClient indexingServiceClient,
AtomicReference<MergerWhitelist> whitelistRef
AtomicReference<DatasourceWhitelist> whitelistRef
)
{
this.indexingServiceClient = indexingServiceClient;
@ -66,7 +66,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MergerWhitelist whitelist = whiteListRef.get();
DatasourceWhitelist whitelist = whiteListRef.get();
MasterStats stats = new MasterStats();
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = Maps.newHashMap();

View File

@ -455,7 +455,7 @@ public class DruidMasterSegmentMergerTest
}
};
final AtomicReference<MergerWhitelist> whitelistRef = new AtomicReference<MergerWhitelist>(null);
final AtomicReference<DatasourceWhitelist> whitelistRef = new AtomicReference<DatasourceWhitelist>(null);
final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef);
final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder()
.withAvailableSegments(ImmutableSet.copyOf(segments))

View File

@ -27,10 +27,9 @@ import io.airlift.command.Command;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.RealtimeModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
@ -64,11 +63,10 @@ public class CliRealtimeExample extends ServerRunnable
@Override
public void configure(Binder binder)
{
binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class);
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class);
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class);
binder.bind(InventoryView.class).to(NoopInventoryView.class);
binder.bind(ServerView.class).to(NoopServerView.class);
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class).in(LazySingleton.class);
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class).in(LazySingleton.class);
binder.bind(InventoryView.class).to(NoopInventoryView.class).in(LazySingleton.class);
binder.bind(ServerView.class).to(NoopServerView.class).in(LazySingleton.class);
}
}
);

View File

@ -58,12 +58,16 @@ public class ConvertProperties implements Runnable
new Rename("druid.database.user", "druid.db.connector.user"),
new Rename("druid.database.password", "druid.db.connector.password"),
new Rename("com.metamx.emitter", "druid.emitter"),
new Rename("com.metamx.emitter.logging", "druid.emitter.logging"),
new Rename("com.metamx.emitter.logging.level", "druid.emitter.logging.logLevel"),
new Rename("com.metamx.emitter.http", "druid.emitter.http"),
new Rename("com.metamx.emitter.http.url", "druid.emitter.http.url"),
new Rename("com.metamx.druid.emitter.period", "druid.emitter.emissionPeriod"),
new IndexCacheConverter(),
new Rename("druid.paths.segmentInfoCache", "druid.segmentCache.infoPath"),
new Rename("com.metamx.aws.accessKey", "druid.s3.accessKey"),
new Rename("com.metamx.aws.secretKey", "druid.s3.secretKey"),
new Rename("druid.bard.maxIntervalDuration", "druid.query.chunkDuration"),
new PrefixRename("druid.bard.cache", "druid.broker.cache"),
new Rename("druid.client.http.connections", "druid.broker.http.numConnections"),
new Rename("com.metamx.query.groupBy.maxResults", "druid.query.groupBy.maxResults"),
@ -103,10 +107,11 @@ public class ConvertProperties implements Runnable
new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"),
new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"),
new Rename("druid.master.merger.on", "druid.master.merge.on"),
new PrefixRename("druid.pusher", "druid.storage"),
new DataSegmentPusherDefaultConverter(),
new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"),
new Rename("druid.pusher.cassandra.host", "druid.pusher.host"),
new Rename("druid.pusher.cassandra.keySpace", "druid.pusher.keySpace")
new Rename("druid.pusher.hdfs.storageDirectory", "druid.storage.storageDirectory"),
new Rename("druid.pusher.cassandra.host", "druid.storage.host"),
new Rename("druid.pusher.cassandra.keySpace", "druid.storage.keySpace")
);
@Option(name = "-f", title = "file", description = "The properties file to convert", required = true)

View File

@ -54,16 +54,16 @@ public class DataSegmentPusherDefaultConverter implements PropertyConverter
}
if (type != null) {
return ImmutableMap.of("druid.pusher.type", type);
return ImmutableMap.of("druid.storage.type", type);
}
// It's an s3 property, which means we need to set the type and convert the other values.
Map<String, String> retVal = Maps.newHashMap();
retVal.put("druid.pusher.type", type);
retVal.putAll(new Rename("druid.pusher.s3.bucket", "druid.pusher.bucket").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.baseKey", "druid.pusher.baseKey").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.disableAcl", "druid.pusher.disableAcl").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.bucket", "druid.storage.bucket").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.baseKey", "druid.storage.baseKey").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.disableAcl", "druid.storage.disableAcl").convert(props));
return retVal;
}