mirror of https://github.com/apache/druid.git
Merge branch 'master' into rejigger-cache
Conflicts: pom.xml
This commit is contained in:
commit
de8cb55260
|
@ -4,6 +4,8 @@ layout: doc_page
|
|||
Production Cluster Configuration
|
||||
================================
|
||||
|
||||
__This configuration is an example of what a production cluster could look like. Many other hardware combinations are possible! Cheaper hardware is absolutely possible.__
|
||||
|
||||
This production Druid cluster assumes that MySQL and Zookeeper are already set up. The deep storage that is used for examples is S3 and memcached is used for a distributed cache.
|
||||
|
||||
The nodes that respond to queries (Historical, Broker, and Middle manager nodes) will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. SSDs are highly recommended for Historical nodes not all data is loaded in available memory.
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.util.Maps;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Optional;
|
||||
|
@ -30,27 +31,28 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Splitter;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.indexer.partitions.PartitionsSpec;
|
||||
import io.druid.indexer.path.PathSpec;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
import io.druid.timeline.partition.ShardSpecLookup;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -169,6 +171,8 @@ public class HadoopDruidIndexerConfig
|
|||
private volatile HadoopIngestionSpec schema;
|
||||
private volatile PathSpec pathSpec;
|
||||
private volatile ColumnConfig columnConfig;
|
||||
private volatile Map<DateTime,ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
|
||||
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
|
||||
|
||||
@JsonCreator
|
||||
public HadoopDruidIndexerConfig(
|
||||
|
@ -178,6 +182,29 @@ public class HadoopDruidIndexerConfig
|
|||
this.columnConfig = columnConfig;
|
||||
this.schema = schema;
|
||||
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
|
||||
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : schema.getTuningConfig().getShardSpecs().entrySet()) {
|
||||
if (entry.getValue() == null || entry.getValue().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec();
|
||||
shardSpecLookups.put(
|
||||
entry.getKey(), actualSpec.getLookup(
|
||||
Lists.transform(
|
||||
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
|
||||
{
|
||||
@Override
|
||||
public ShardSpec apply(HadoopyShardSpec input)
|
||||
{
|
||||
return input.getActualSpec();
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
|
||||
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -306,25 +333,17 @@ public class HadoopDruidIndexerConfig
|
|||
return Optional.absent();
|
||||
}
|
||||
|
||||
final List<HadoopyShardSpec> shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart());
|
||||
if (shards == null || shards.isEmpty()) {
|
||||
return Optional.absent();
|
||||
}
|
||||
final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow);
|
||||
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
|
||||
|
||||
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
|
||||
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
|
||||
if (actualSpec.isInChunk(inputRow)) {
|
||||
return Optional.of(
|
||||
new Bucket(
|
||||
hadoopyShardSpec.getShardNum(),
|
||||
timeBucket.get().getStart(),
|
||||
actualSpec.getPartitionNum()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
return Optional.of(
|
||||
new Bucket(
|
||||
hadoopyShardSpec.getShardNum(),
|
||||
timeBucket.get().getStart(),
|
||||
actualSpec.getPartitionNum()
|
||||
)
|
||||
);
|
||||
|
||||
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
|
||||
}
|
||||
|
||||
public Optional<Set<Interval>> getSegmentGranularIntervals()
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -41,7 +41,7 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.26.5</metamx.java-util.version>
|
||||
<apache.curator.version>2.5.0</apache.curator.version>
|
||||
<druid.api.version>0.2.5-SNAPSHOT</druid.api.version>
|
||||
<druid.api.version>0.2.6-SNAPSHOT</druid.api.version>
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.MapUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.segment.loading.DataSegmentMover;
|
||||
|
@ -120,23 +121,30 @@ public class S3DataSegmentMover implements DataSegmentMover
|
|||
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
|
||||
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
|
||||
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
|
||||
} else if (s3Client.getObjectDetails(s3Bucket, s3Path)
|
||||
.getStorageClass()
|
||||
.equals(S3Object.STORAGE_CLASS_GLACIER)) {
|
||||
log.warn("Cannot move file[s3://%s/%s] of storage class glacier.");
|
||||
} else {
|
||||
log.info(
|
||||
"Moving file[s3://%s/%s] to [s3://%s/%s]",
|
||||
s3Bucket,
|
||||
s3Path,
|
||||
targetS3Bucket,
|
||||
targetS3Path
|
||||
);
|
||||
final S3Object target = new S3Object(targetS3Path);
|
||||
if(!config.getDisableAcl()) {
|
||||
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
||||
final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, "");
|
||||
if (list.length == 0) {
|
||||
// should never happen
|
||||
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
|
||||
}
|
||||
final S3Object s3Object = list[0];
|
||||
if (s3Object.getStorageClass() != null &&
|
||||
s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) {
|
||||
log.warn("Cannot move file[s3://%s/%s] of storage class glacier, skipping.");
|
||||
} else {
|
||||
log.info(
|
||||
"Moving file[s3://%s/%s] to [s3://%s/%s]",
|
||||
s3Bucket,
|
||||
s3Path,
|
||||
targetS3Bucket,
|
||||
targetS3Path
|
||||
);
|
||||
final S3Object target = new S3Object(targetS3Path);
|
||||
if (!config.getDisableAcl()) {
|
||||
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
||||
}
|
||||
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false);
|
||||
}
|
||||
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false);
|
||||
}
|
||||
} else {
|
||||
// ensure object exists in target location
|
||||
|
|
|
@ -133,15 +133,18 @@ public class S3DataSegmentMoverTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException
|
||||
public S3Object[] listObjects(String bucketName, String objectKey, String separator)
|
||||
{
|
||||
if (isObjectInBucket(bucketName, objectKey)) {
|
||||
final S3Object object = new S3Object(objectKey);
|
||||
object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD);
|
||||
return object;
|
||||
} else {
|
||||
return null;
|
||||
try {
|
||||
if (isObjectInBucket(bucketName, objectKey)) {
|
||||
final S3Object object = new S3Object(objectKey);
|
||||
object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD);
|
||||
return new S3Object[]{object};
|
||||
}
|
||||
} catch (ServiceException e) {
|
||||
// return empty list
|
||||
}
|
||||
return new S3Object[]{};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,7 +38,7 @@ public class SegmentLoaderConfig
|
|||
private boolean deleteOnRemove = true;
|
||||
|
||||
@JsonProperty("dropSegmentDelayMillis")
|
||||
private int dropSegmentDelayMillis = 5 * 60 * 1000; // 5 mins
|
||||
private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
|
||||
|
||||
@JsonProperty
|
||||
private File infoDir = null;
|
||||
|
|
|
@ -74,4 +74,17 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
|
|||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
|
||||
{
|
||||
return new ShardSpecLookup()
|
||||
{
|
||||
@Override
|
||||
public ShardSpec getShardSpec(InputRow row)
|
||||
{
|
||||
int index = Math.abs(hash(row) % getPartitions());
|
||||
return shardSpecs.get(index);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import io.druid.data.input.InputRow;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class LinearShardSpec implements ShardSpec
|
||||
{
|
||||
private int partitionNum;
|
||||
|
@ -42,6 +44,19 @@ public class LinearShardSpec implements ShardSpec
|
|||
return partitionNum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
|
||||
{
|
||||
return new ShardSpecLookup()
|
||||
{
|
||||
@Override
|
||||
public ShardSpec getShardSpec(InputRow row)
|
||||
{
|
||||
return shardSpecs.get(0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> PartitionChunk<T> createChunk(T obj) {
|
||||
return new LinearPartitionChunk<T>(partitionNum, obj);
|
||||
|
|
|
@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import io.druid.data.input.InputRow;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class NumberedShardSpec implements ShardSpec
|
||||
{
|
||||
@JsonIgnore
|
||||
|
@ -52,6 +55,19 @@ public class NumberedShardSpec implements ShardSpec
|
|||
return partitionNum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
|
||||
{
|
||||
return new ShardSpecLookup()
|
||||
{
|
||||
@Override
|
||||
public ShardSpec getShardSpec(InputRow row)
|
||||
{
|
||||
return shardSpecs.get(0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@JsonProperty("partitions")
|
||||
public int getPartitions()
|
||||
{
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.timeline.partition;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.data.input.InputRow;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -94,6 +95,24 @@ public class SingleDimensionShardSpec implements ShardSpec
|
|||
return partitionNum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
|
||||
{
|
||||
return new ShardSpecLookup()
|
||||
{
|
||||
@Override
|
||||
public ShardSpec getShardSpec(InputRow row)
|
||||
{
|
||||
for (ShardSpec spec : shardSpecs) {
|
||||
if (spec.isInChunk(row)) {
|
||||
return spec;
|
||||
}
|
||||
}
|
||||
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void setPartitionNum(int partitionNum)
|
||||
{
|
||||
this.partitionNum = partitionNum;
|
||||
|
|
|
@ -35,6 +35,7 @@ import io.druid.guice.LazySingleton;
|
|||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||
import io.druid.indexing.overlord.ForkingTaskRunner;
|
||||
import io.druid.indexing.overlord.TaskRunner;
|
||||
|
@ -75,6 +76,7 @@ public class CliMiddleManager extends ServerRunnable
|
|||
{
|
||||
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
|
||||
|
||||
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
|
||||
|
|
|
@ -44,6 +44,7 @@ import io.druid.guice.PolyBind;
|
|||
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.config.TaskStorageConfig;
|
||||
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
|
||||
|
@ -114,6 +115,7 @@ public class CliOverlord extends ServerRunnable
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
|
||||
|
||||
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
|
||||
|
||||
|
|
Loading…
Reference in New Issue