mirror of https://github.com/apache/druid.git
Merge branch 'master' into onheap-incremental-index
Conflicts: server/src/main/java/io/druid/segment/realtime/RealtimeManager.java
This commit is contained in:
commit
c88de97370
|
@ -33,6 +33,6 @@ The broker module uses several of the default modules in [Configuration](Configu
|
|||
|--------|-----------|-------|
|
||||
|`druid.broker.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|
||||
|`druid.broker.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|
||||
|`druid.broker.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
||||
|`druid.broker.cache.hosts`|Comma separated list of Memcached hosts `<host:port>`.|none|
|
||||
|`druid.broker.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
||||
|`druid.broker.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
||||
|
|
|
@ -19,6 +19,10 @@ Some great folks have written their own libraries to interact with Druid
|
|||
|
||||
- [RDruid](https://github.com/metamx/RDruid) - Druid connector for R
|
||||
|
||||
#### Node.js
|
||||
|
||||
- [7eggs/node-druid-query](https://github.com/7eggs/node-druid-query) - A Node.js client for Druid
|
||||
|
||||
#### Helper Libraries
|
||||
|
||||
* [madvertise/druid-dumbo](https://github.com/madvertise/druid-dumbo) - Scripts to help generate batch configs for the ingestion of data into Druid
|
||||
|
|
|
@ -21,6 +21,8 @@ package io.druid.segment.realtime;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class FireDepartmentMetrics
|
||||
|
@ -79,4 +81,20 @@ public class FireDepartmentMetrics
|
|||
retVal.rowOutputCount.set(rowOutputCount.get());
|
||||
return retVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* merge other FireDepartmentMetrics, will modify this object's data
|
||||
*
|
||||
* @return this object
|
||||
*/
|
||||
public FireDepartmentMetrics merge(FireDepartmentMetrics other)
|
||||
{
|
||||
Preconditions.checkNotNull(other, "Cannot merge a null FireDepartmentMetrics");
|
||||
FireDepartmentMetrics otherSnapshot = other.snapshot();
|
||||
processedCount.addAndGet(otherSnapshot.processed());
|
||||
thrownAwayCount.addAndGet(otherSnapshot.thrownAway());
|
||||
rowOutputCount.addAndGet(otherSnapshot.rowOutput());
|
||||
unparseableCount.addAndGet(otherSnapshot.unparseable());
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.segment.realtime;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -31,6 +33,7 @@ import com.metamx.common.parsers.ParseException;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
import io.druid.query.NoopQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
|
@ -51,8 +54,10 @@ import org.joda.time.Period;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -62,17 +67,24 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
|
||||
private final List<FireDepartment> fireDepartments;
|
||||
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||
private ExecutorService executorService;
|
||||
|
||||
/**
|
||||
* key=data source name,value=FireChiefs of all partition of that data source
|
||||
*/
|
||||
private final Map<String, List<FireChief>> chiefs;
|
||||
|
||||
private final Map<String, FireChief> chiefs;
|
||||
|
||||
@Inject
|
||||
public RealtimeManager(
|
||||
List<FireDepartment> fireDepartments,
|
||||
QueryRunnerFactoryConglomerate conglomerate
|
||||
QueryRunnerFactoryConglomerate conglomerate,
|
||||
@JacksonInject @Processing ExecutorService executorService
|
||||
)
|
||||
{
|
||||
this.fireDepartments = fireDepartments;
|
||||
this.conglomerate = conglomerate;
|
||||
this.executorService = executorService;
|
||||
|
||||
this.chiefs = Maps.newHashMap();
|
||||
}
|
||||
|
@ -84,7 +96,12 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
DataSchema schema = fireDepartment.getDataSchema();
|
||||
|
||||
final FireChief chief = new FireChief(fireDepartment);
|
||||
chiefs.put(schema.getDataSource(), chief);
|
||||
List<FireChief> chiefs = this.chiefs.get(schema.getDataSource());
|
||||
if (chiefs == null) {
|
||||
chiefs = new ArrayList<RealtimeManager.FireChief>();
|
||||
this.chiefs.put(schema.getDataSource(), chiefs);
|
||||
}
|
||||
chiefs.add(chief);
|
||||
|
||||
chief.setName(String.format("chief-%s", schema.getDataSource()));
|
||||
chief.setDaemon(true);
|
||||
|
@ -96,34 +113,56 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
for (FireChief chief : chiefs.values()) {
|
||||
CloseQuietly.close(chief);
|
||||
for (Iterable<FireChief> chiefs : this.chiefs.values()) {
|
||||
for (FireChief chief : chiefs) {
|
||||
CloseQuietly.close(chief);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public FireDepartmentMetrics getMetrics(String datasource)
|
||||
{
|
||||
FireChief chief = chiefs.get(datasource);
|
||||
if (chief == null) {
|
||||
{
|
||||
List<FireChief> chiefs = this.chiefs.get(datasource);
|
||||
if (chiefs == null) {
|
||||
return null;
|
||||
}
|
||||
return chief.getMetrics();
|
||||
FireDepartmentMetrics snapshot = null;
|
||||
for (FireChief chief : chiefs) {
|
||||
if (snapshot == null) {
|
||||
snapshot = chief.getMetrics().snapshot();
|
||||
} else {
|
||||
snapshot.merge(chief.getMetrics());
|
||||
}
|
||||
}
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||
{
|
||||
final FireChief chief = chiefs.get(getDataSourceName(query));
|
||||
|
||||
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
|
||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
|
||||
{
|
||||
return getQueryRunnerForSegments(query, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
{
|
||||
final FireChief chief = chiefs.get(getDataSourceName(query));
|
||||
|
||||
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
|
||||
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
||||
Iterable<FireChief> chiefsOfDataSource = chiefs.get(getDataSourceName(query));
|
||||
return chiefsOfDataSource == null ? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
|
||||
factory.mergeRunners(
|
||||
executorService,
|
||||
Iterables.transform(
|
||||
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
|
||||
{
|
||||
@Override
|
||||
public QueryRunner<T> apply(FireChief input)
|
||||
{
|
||||
return input.getQueryRunner(query);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private <T> String getDataSourceName(Query<T> query)
|
||||
|
|
|
@ -65,11 +65,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
|||
public EventReceiverFirehoseFactory(
|
||||
@JsonProperty("serviceName") String serviceName,
|
||||
@JsonProperty("bufferSize") Integer bufferSize,
|
||||
@JsonProperty("parser") InputRowParser parser,
|
||||
@JacksonInject ChatHandlerProvider chatHandlerProvider
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(parser, "parser");
|
||||
Preconditions.checkNotNull(serviceName, "serviceName");
|
||||
|
||||
this.serviceName = serviceName;
|
||||
|
|
|
@ -120,6 +120,7 @@ public class RealtimeManagerTest
|
|||
tuningConfig
|
||||
)
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue