Merge branch 'master' into onheap-incremental-index

This commit is contained in:
nishantmonu51 2014-12-05 10:40:28 +05:30
commit 6e03a6245f
14 changed files with 82 additions and 20 deletions

View File

@ -14,6 +14,10 @@ druid.metadata.storage.connector.password=diurd
druid.storage.type=local druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Cache (we use a simple 10mb heap-based local cache on the broker)
druid.cache.type=local
druid.cache.sizeInBytes=10000000
# Indexing service discovery # Indexing service discovery
druid.selectors.indexing.serviceName=overlord druid.selectors.indexing.serviceName=overlord

View File

@ -1,6 +1,9 @@
druid.host=localhost druid.host=localhost
druid.service=broker
druid.port=8080 druid.port=8080
druid.service=broker
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
# Bump these up only for faster nested groupBy # Bump these up only for faster nested groupBy
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000

View File

@ -1,5 +1,7 @@
druid.host=localhost druid.host=localhost
druid.service=coordinator
druid.port=8082 druid.port=8082
druid.service=coordinator
# The coordinator begins assignment operations after the start delay.
# We override the default here to start things up faster for examples.
druid.coordinator.startDelay=PT70s druid.coordinator.startDelay=PT70s

View File

@ -1,8 +1,9 @@
druid.host=localhost druid.host=localhost
druid.service=historical
druid.port=8081 druid.port=8081
druid.service=historical
# Change these to make Druid faster # We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1

View File

@ -2,6 +2,7 @@ druid.host=localhost
druid.port=8080 druid.port=8080
druid.service=overlord druid.service=overlord
# Run the overlord in local mode with a single peon to execute tasks
druid.indexer.queue.startDelay=PT0M druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m" druid.indexer.runner.javaOpts="-server -Xmx256m"
druid.indexer.fork.property.druid.processing.numThreads=1 druid.indexer.fork.property.druid.processing.numThreads=1

View File

@ -1,10 +1,12 @@
druid.host=localhost druid.host=localhost
druid.service=realtime
druid.port=8083 druid.port=8083
druid.service=realtime
# Change this config to metadata to hand off to the rest of the Druid cluster # Change this config to 'metadata' to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop
# We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1

View File

@ -19,6 +19,7 @@
package io.druid.indexer; package io.druid.indexer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -45,6 +46,11 @@ public class HadoopDruidIndexerJob implements Jobby
config.verify(); config.verify();
this.config = config; this.config = config;
Preconditions.checkArgument(
!config.isUpdaterJobSpecSet() || handler != null,
"MetadataStorageUpdaterJobHandler must not be null if ioConfig.metadataUpdateSpec is specified."
);
if (config.isUpdaterJobSpecSet()) { if (config.isUpdaterJobSpecSet()) {
metadataStorageUpdaterJob = new MetadataStorageUpdaterJob( metadataStorageUpdaterJob = new MetadataStorageUpdaterJob(
config, config,

View File

@ -290,10 +290,16 @@ public class HadoopIndexTask extends AbstractTask
.withTuningConfig(theSchema.getTuningConfig().withVersion(version)) .withTuningConfig(theSchema.getTuningConfig().withVersion(version))
); );
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob( // MetadataStorageUpdaterJobHandler is only needed when running standalone without indexing service
config, // In that case the whatever runs the Hadoop Index Task must ensure MetadataStorageUpdaterJobHandler
injector.getInstance(MetadataStorageUpdaterJobHandler.class) // can be injected based on the configuration given in config.getSchema().getIOConfig().getMetadataUpdateSpec()
); final MetadataStorageUpdaterJobHandler maybeHandler;
if (config.isUpdaterJobSpecSet()) {
maybeHandler = injector.getInstance(MetadataStorageUpdaterJobHandler.class);
} else {
maybeHandler = null;
}
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config, maybeHandler);
log.info("Starting a hadoop index generator job..."); log.info("Starting a hadoop index generator job...");
if (job.run()) { if (job.run()) {

View File

@ -234,7 +234,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Logging task %s output to: %s", task.getId(), logFile); log.info("Logging task %s output to: %s", task.getId(), logFile);
boolean runFailed = true; boolean runFailed = true;
try (final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput()) { try (final OutputStream toLogfile = Files.asByteSink(logFile).openBufferedStream()) {
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
final int statusCode = processHolder.process.waitFor(); final int statusCode = processHolder.process.waitFor();
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());

View File

@ -31,10 +31,10 @@ public class CacheConfig
public static final String POPULATE_CACHE = "populateCache"; public static final String POPULATE_CACHE = "populateCache";
@JsonProperty @JsonProperty
private boolean useCache = true; private boolean useCache = false;
@JsonProperty @JsonProperty
private boolean populateCache = true; private boolean populateCache = false;
@JsonProperty @JsonProperty
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);

View File

@ -1902,6 +1902,18 @@ public class CachingClusteredClientTest
jsonMapper, jsonMapper,
new CacheConfig() new CacheConfig()
{ {
@Override
public boolean isPopulateCache()
{
return true;
}
@Override
public boolean isUseCache()
{
return true;
}
@Override @Override
public boolean isQueryCacheable(Query query) public boolean isQueryCacheable(Query query)
{ {

View File

@ -130,7 +130,19 @@ public class CachingQueryRunnerTest
} }
}, },
new CacheConfig() new CacheConfig()
{
@Override
public boolean isPopulateCache()
{
return true;
}
@Override
public boolean isUseCache()
{
return true;
}
}
); );
TopNQuery query = builder.build(); TopNQuery query = builder.build();
@ -220,6 +232,19 @@ public class CachingQueryRunnerTest
} }
}, },
new CacheConfig() new CacheConfig()
{
@Override
public boolean isPopulateCache()
{
return true;
}
@Override
public boolean isUseCache()
{
return true;
}
}
); );
HashMap<String, Object> context = new HashMap<String, Object>(); HashMap<String, Object> context = new HashMap<String, Object>();

View File

@ -90,7 +90,7 @@ public class CliBroker extends ServerRunnable
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class); JsonConfigProvider.bind(binder, "druid.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
JsonConfigProvider.bind(binder, "druid.broker.select", TierSelectorStrategy.class); JsonConfigProvider.bind(binder, "druid.broker.select", TierSelectorStrategy.class);
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class); JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class);

View File

@ -88,7 +88,7 @@ public class CliHistorical extends ServerRunnable
LifecycleModule.register(binder, ZkCoordinator.class); LifecycleModule.register(binder, ZkCoordinator.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class); JsonConfigProvider.bind(binder, "druid.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class); JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
MetricsModule.register(binder, CacheMonitor.class); MetricsModule.register(binder, CacheMonitor.class);
} }