mirror of https://github.com/apache/druid.git
finished merging into druid-0.7.x; derby not working (to be fixed)
This commit is contained in:
parent
43cc6283d3
commit
6a641621b2
|
@ -34,13 +34,13 @@ import java.util.PriorityQueue;
|
|||
/**
|
||||
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
|
||||
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
|
||||
* <p/>
|
||||
*
|
||||
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
|
||||
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
|
||||
* be provided in the order of the first element of each iterator.
|
||||
* <p/>
|
||||
*
|
||||
* If this doesn't make sense, check out OrderedMergeIteratorTest.testScrewsUpOnOutOfOrderBeginningOfList()
|
||||
* <p/>
|
||||
*
|
||||
* It places this extra restriction on the input data in order to implement an optimization that allows it to
|
||||
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
|
||||
*/
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.collections;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
@ -38,13 +37,13 @@ import java.util.PriorityQueue;
|
|||
/**
|
||||
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
|
||||
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
|
||||
* <p/>
|
||||
*
|
||||
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
|
||||
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
|
||||
* be provided in the order of the first element of each iterator.
|
||||
* <p/>
|
||||
*
|
||||
* If this doesn't make sense, check out OrderedMergeSequenceTest.testScrewsUpOnOutOfOrderBeginningOfList()
|
||||
* <p/>
|
||||
*
|
||||
* It places this extra restriction on the input data in order to implement an optimization that allows it to
|
||||
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
|
||||
*/
|
||||
|
|
|
@ -48,7 +48,7 @@ public class ConfigManager
|
|||
private final Object lock = new Object();
|
||||
private boolean started = false;
|
||||
|
||||
private final MetadataDbConnector metadataDbConnector;
|
||||
private final MetadataDbConnector dbConnector;
|
||||
private final Supplier<ConfigManagerConfig> config;
|
||||
|
||||
private final ScheduledExecutorService exec;
|
||||
|
@ -58,9 +58,9 @@ public class ConfigManager
|
|||
private volatile ConfigManager.PollingCallable poller;
|
||||
|
||||
@Inject
|
||||
public ConfigManager(MetadataDbConnector metadataDbConnector, Supplier<MetadataTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
|
||||
public ConfigManager(MetadataDbConnector dbConnector, Supplier<MetadataTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
|
||||
{
|
||||
this.metadataDbConnector = metadataDbConnector;
|
||||
this.dbConnector = dbConnector;
|
||||
this.config = config;
|
||||
|
||||
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
|
||||
|
@ -105,7 +105,7 @@ public class ConfigManager
|
|||
{
|
||||
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
|
||||
try {
|
||||
if (entry.getValue().swapIfNew(metadataDbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
|
||||
if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
|
||||
log.info("New value for key[%s] seen.", entry.getKey());
|
||||
}
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ public class ConfigManager
|
|||
// Multiple of these callables can be submitted at the same time, but the callables themselves
|
||||
// are executed serially, so double check that it hasn't already been populated.
|
||||
if (!watchedConfigs.containsKey(key)) {
|
||||
byte[] value = metadataDbConnector.lookup(configTable, "name", "payload", key);
|
||||
byte[] value = dbConnector.lookup(configTable, "name", "payload", key);
|
||||
ConfigHolder<T> holder = new ConfigHolder<T>(value, serde);
|
||||
watchedConfigs.put(key, holder);
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ public class ConfigManager
|
|||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
metadataDbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
|
||||
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
|
||||
|
||||
final ConfigHolder configHolder = watchedConfigs.get(key);
|
||||
if (configHolder != null) {
|
||||
|
|
|
@ -262,4 +262,4 @@ public class SerializerUtils
|
|||
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class JacksonConfigManagerModule implements Module
|
|||
|
||||
@Provides @ManageLifecycle
|
||||
public ConfigManager getConfigManager(
|
||||
final MetadataDbConnector metadataDbConnector,
|
||||
final MetadataDbConnector dbConnector,
|
||||
final Supplier<MetadataTablesConfig> dbTables,
|
||||
final Supplier<ConfigManagerConfig> config,
|
||||
final Lifecycle lifecycle
|
||||
|
@ -55,7 +55,7 @@ public class JacksonConfigManagerModule implements Module
|
|||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
metadataDbConnector.createConfigTable();
|
||||
dbConnector.createConfigTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,6 +66,6 @@ public class JacksonConfigManagerModule implements Module
|
|||
}
|
||||
);
|
||||
|
||||
return new ConfigManager(metadataDbConnector, dbTables, config);
|
||||
return new ConfigManager(dbConnector, dbTables, config);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,17 +42,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
/**
|
||||
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
|
||||
* <p/>
|
||||
*
|
||||
* It associates a jodatime Interval and a generically-typed version with the object that is being stored.
|
||||
* <p/>
|
||||
*
|
||||
* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated
|
||||
* with a timeline entry remains unchanged when chunking occurs.
|
||||
* <p/>
|
||||
*
|
||||
* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most
|
||||
* recent objects (according to the version) that match the given interval. The intent is that objects represent
|
||||
* a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look
|
||||
* at in order to get a correct answer about that time period.
|
||||
* <p/>
|
||||
*
|
||||
* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because
|
||||
* they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods
|
||||
* to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if
|
||||
|
|
|
@ -28,8 +28,6 @@ import com.metamx.common.guava.Sequence;
|
|||
import com.metamx.common.guava.SequenceTestHelper;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.TestSequence;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
},
|
||||
"pathSpec" : {
|
||||
"type" : "static",
|
||||
"paths" : "examples/bin/examples/indexing/wikipedia_data.json"
|
||||
"paths" : "examples/indexing/wikipedia_data.json"
|
||||
},
|
||||
"targetPartitionSize" : 5000000,
|
||||
"rollupSpec" : {
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
}],
|
||||
"firehose" : {
|
||||
"type" : "local",
|
||||
"baseDir" : "examples/bin/examples/indexing",
|
||||
"baseDir" : "examples/indexing/",
|
||||
"filter" : "wikipedia_data.json",
|
||||
"parser" : {
|
||||
"timestampSpec" : {
|
||||
|
|
|
@ -81,13 +81,13 @@ import java.util.Map;
|
|||
/**
|
||||
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
|
||||
* choosing the best dimension that satisfies the criteria:
|
||||
* <p/>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Must have exactly one value per row.</li>
|
||||
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
|
||||
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
|
||||
* </ul>
|
||||
* <p/>
|
||||
*
|
||||
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
|
||||
* segment size relative to the target.
|
||||
*/
|
||||
|
|
|
@ -45,9 +45,9 @@ public class HadoopDruidIndexerJob implements Jobby
|
|||
this.config = config;
|
||||
|
||||
if (config.isUpdaterJobSpecSet()) {
|
||||
this.dbUpdaterJob = new DbUpdaterJob(config);
|
||||
dbUpdaterJob = new DbUpdaterJob(config);
|
||||
} else {
|
||||
this.dbUpdaterJob = null;
|
||||
dbUpdaterJob = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,12 +23,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.db.MetadataDbConnectorConfig;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.indexer.partitions.PartitionsSpec;
|
||||
import io.druid.indexer.partitions.RandomPartitionsSpec;
|
||||
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
||||
import io.druid.indexer.updater.DbUpdaterJobSpec;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.indexer.partitions;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfigTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.indexer.partitions;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfigTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
|
|
@ -28,7 +28,7 @@ import com.google.common.base.Preconditions;
|
|||
/**
|
||||
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
|
||||
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
|
||||
* <p/>
|
||||
*
|
||||
* TaskStatus objects are immutable.
|
||||
*/
|
||||
public class TaskStatus
|
||||
|
@ -101,6 +101,8 @@ public class TaskStatus
|
|||
/**
|
||||
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
|
||||
* isSuccess, or isFailure will be true at any one time.
|
||||
*
|
||||
* @return whether the task is runnable.
|
||||
*/
|
||||
@JsonIgnore
|
||||
public boolean isRunnable()
|
||||
|
@ -110,6 +112,8 @@ public class TaskStatus
|
|||
|
||||
/**
|
||||
* Inverse of {@link #isRunnable}.
|
||||
*
|
||||
* @return whether the task is complete.
|
||||
*/
|
||||
@JsonIgnore
|
||||
public boolean isComplete()
|
||||
|
@ -120,6 +124,8 @@ public class TaskStatus
|
|||
/**
|
||||
* Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
|
||||
* be true at any one time.
|
||||
*
|
||||
* @return whether the task succeeded.
|
||||
*/
|
||||
@JsonIgnore
|
||||
public boolean isSuccess()
|
||||
|
@ -130,6 +136,8 @@ public class TaskStatus
|
|||
/**
|
||||
* Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
|
||||
* isFailure will be true at any one time.
|
||||
*
|
||||
* @return whether the task failed
|
||||
*/
|
||||
@JsonIgnore
|
||||
public boolean isFailure()
|
||||
|
|
|
@ -29,7 +29,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
|
|||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.loading.DataSegmentArchiver;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.DataSegmentMover;
|
||||
|
|
|
@ -81,7 +81,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
|
|||
{
|
||||
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
|
||||
|
||||
final Set<DataSegment> retVal = toolbox.getIndexerMetadataCoordinator().announceHistoricalSegments(segments);
|
||||
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
|
||||
|
||||
// Emit metrics
|
||||
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
|
||||
|
|
|
@ -68,7 +68,7 @@ public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
|
|||
@Override
|
||||
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
|
||||
{
|
||||
return toolbox.getIndexerMetadataCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
|
||||
return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -68,7 +68,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
|
|||
@Override
|
||||
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
|
||||
{
|
||||
return toolbox.getIndexerMetadataCoordinator().getUsedSegmentsForInterval(dataSource, interval);
|
||||
return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
|
|||
) throws IOException
|
||||
{
|
||||
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
|
||||
toolbox.getIndexerMetadataCoordinator().updateSegmentMetadata(segments);
|
||||
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
|
||||
|
||||
// Emit metrics
|
||||
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
|
||||
|
|
|
@ -59,7 +59,7 @@ public class SegmentNukeAction implements TaskAction<Void>
|
|||
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
|
||||
{
|
||||
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
|
||||
toolbox.getIndexerMetadataCoordinator().deleteSegments(segments);
|
||||
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
|
||||
|
||||
// Emit metrics
|
||||
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
|
||||
|
|
|
@ -37,18 +37,18 @@ import java.util.Set;
|
|||
public class TaskActionToolbox
|
||||
{
|
||||
private final TaskLockbox taskLockbox;
|
||||
private final IndexerMetadataCoordinator indexerMetadataCoordinator;
|
||||
private final IndexerMetadataCoordinator indexerDBCoordinator;
|
||||
private final ServiceEmitter emitter;
|
||||
|
||||
@Inject
|
||||
public TaskActionToolbox(
|
||||
TaskLockbox taskLockbox,
|
||||
IndexerMetadataCoordinator indexerMetadataCoordinator,
|
||||
IndexerMetadataCoordinator indexerDBCoordinator,
|
||||
ServiceEmitter emitter
|
||||
)
|
||||
{
|
||||
this.taskLockbox = taskLockbox;
|
||||
this.indexerMetadataCoordinator = indexerMetadataCoordinator;
|
||||
this.indexerDBCoordinator = indexerDBCoordinator;
|
||||
this.emitter = emitter;
|
||||
}
|
||||
|
||||
|
@ -57,9 +57,9 @@ public class TaskActionToolbox
|
|||
return taskLockbox;
|
||||
}
|
||||
|
||||
public IndexerMetadataCoordinator getIndexerMetadataCoordinator()
|
||||
public IndexerMetadataCoordinator getIndexerDBCoordinator()
|
||||
{
|
||||
return indexerMetadataCoordinator;
|
||||
return indexerDBCoordinator;
|
||||
}
|
||||
|
||||
public ServiceEmitter getEmitter()
|
||||
|
|
|
@ -107,6 +107,12 @@ public abstract class AbstractTask implements Task
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClasspathPrefix()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -118,7 +124,10 @@ public abstract class AbstractTask implements Task
|
|||
}
|
||||
|
||||
/**
|
||||
* Start helper methods *
|
||||
* Start helper methods
|
||||
*
|
||||
* @param objects objects to join
|
||||
* @return string of joined objects
|
||||
*/
|
||||
public static String joinId(Object... objects)
|
||||
{
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -31,7 +30,6 @@ import io.druid.granularity.QueryGranularity;
|
|||
import io.druid.indexing.common.TaskLock;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.IndexMerger;
|
||||
import io.druid.segment.IndexableAdapter;
|
||||
|
|
|
@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerJob;
|
||||
|
@ -42,9 +44,7 @@ import io.druid.indexing.common.TaskToolbox;
|
|||
import io.druid.indexing.common.actions.LockAcquireAction;
|
||||
import io.druid.indexing.common.actions.LockTryAcquireAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -79,11 +79,13 @@ public class HadoopIndexTask extends AbstractTask
|
|||
private final HadoopIngestionSpec spec;
|
||||
@JsonIgnore
|
||||
private final List<String> hadoopDependencyCoordinates;
|
||||
@JsonIgnore
|
||||
private final String classpathPrefix;
|
||||
|
||||
/**
|
||||
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||
* for creating Druid index segments. It may be modified.
|
||||
* <p/>
|
||||
*
|
||||
* Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the
|
||||
* job does not push a list of published segments the database. Instead, we will use the method
|
||||
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
|
||||
|
@ -96,7 +98,8 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@JsonProperty("spec") HadoopIngestionSpec spec,
|
||||
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
|
||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
|
||||
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
|
||||
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
|
||||
@JsonProperty("classpathPrefix") String classpathPrefix
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -123,6 +126,8 @@ public class HadoopIndexTask extends AbstractTask
|
|||
// Will be defaulted to something at runtime, based on taskConfig.
|
||||
this.hadoopDependencyCoordinates = null;
|
||||
}
|
||||
|
||||
this.classpathPrefix = classpathPrefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -159,6 +164,13 @@ public class HadoopIndexTask extends AbstractTask
|
|||
return hadoopDependencyCoordinates;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public String getClasspathPrefix()
|
||||
{
|
||||
return classpathPrefix;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
|
@ -316,4 +328,4 @@ public class HadoopIndexTask extends AbstractTask
|
|||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -244,8 +244,6 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
|
||||
|
||||
// Load data
|
||||
System.out.println("PARSER IS");
|
||||
System.out.println(ingestionSchema.getDataSchema().getParser());
|
||||
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
|
||||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow = firehose.nextRow();
|
||||
|
|
|
@ -41,7 +41,6 @@ import com.metamx.emitter.service.ServiceMetricEvent;
|
|||
import io.druid.indexing.common.TaskLock;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import io.druid.indexing.common.actions.SegmentListUsedAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.segment.IndexIO;
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
@ -44,7 +43,6 @@ import io.druid.query.QueryRunner;
|
|||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
|
@ -115,7 +113,8 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("maxPendingPersists") int maxPendingPersists,
|
||||
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
|
||||
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -142,7 +141,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
windowPeriod,
|
||||
null,
|
||||
null,
|
||||
rejectionPolicyFactory,
|
||||
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
|
||||
maxPendingPersists,
|
||||
spec.getShardSpec()
|
||||
),
|
||||
|
@ -315,6 +314,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
0
|
||||
);
|
||||
|
||||
|
|
|
@ -59,23 +59,27 @@ public interface Task
|
|||
{
|
||||
/**
|
||||
* Returns ID of this task. Must be unique across all tasks ever created.
|
||||
* @return task ID
|
||||
*/
|
||||
public String getId();
|
||||
|
||||
/**
|
||||
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
|
||||
* a common convention is to set group ID equal to task ID.
|
||||
* @return task group ID
|
||||
*/
|
||||
public String getGroupId();
|
||||
|
||||
/**
|
||||
* Returns a {@link io.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
|
||||
* worker requirements a task may require.
|
||||
* @return {@link io.druid.indexing.common.task.TaskResource} for this task
|
||||
*/
|
||||
public TaskResource getTaskResource();
|
||||
|
||||
/**
|
||||
* Returns a descriptive label for this task type. Used for metrics emission and logging.
|
||||
* @return task type label
|
||||
*/
|
||||
public String getType();
|
||||
|
||||
|
@ -95,9 +99,17 @@ public interface Task
|
|||
/**
|
||||
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
|
||||
* should return null.
|
||||
* @param <T> query result type
|
||||
* @return query runners for this task
|
||||
*/
|
||||
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
|
||||
|
||||
/**
|
||||
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
|
||||
* extra classpath should be prepended, this should return null or the empty string.
|
||||
*/
|
||||
public String getClasspathPrefix();
|
||||
|
||||
/**
|
||||
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
|
||||
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
|
||||
|
@ -122,7 +134,7 @@ public interface Task
|
|||
*
|
||||
* @return Some kind of finished status (isRunnable must be false).
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception if this task failed
|
||||
*/
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception;
|
||||
}
|
||||
|
|
|
@ -43,6 +43,8 @@ public class TaskResource
|
|||
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same
|
||||
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the
|
||||
* task ID.
|
||||
*
|
||||
* @return task availability group
|
||||
*/
|
||||
@JsonProperty
|
||||
public String getAvailabilityGroup()
|
||||
|
@ -52,7 +54,7 @@ public class TaskResource
|
|||
|
||||
|
||||
/**
|
||||
* Returns the number of worker slots this task will take.
|
||||
* @return the number of worker slots this task will take
|
||||
*/
|
||||
@JsonProperty
|
||||
public int getRequiredCapacity()
|
||||
|
|
|
@ -161,10 +161,19 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
|
||||
final List<String> command = Lists.newArrayList();
|
||||
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
|
||||
final String taskClasspath;
|
||||
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
|
||||
taskClasspath = Joiner.on(File.pathSeparator).join(
|
||||
task.getClasspathPrefix(),
|
||||
config.getClasspath()
|
||||
);
|
||||
} else {
|
||||
taskClasspath = config.getClasspath();
|
||||
}
|
||||
|
||||
command.add(config.getJavaCommand());
|
||||
command.add("-cp");
|
||||
command.add(config.getClasspath());
|
||||
command.add(taskClasspath);
|
||||
|
||||
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
|
||||
|
||||
|
@ -450,4 +459,4 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
closer.register(process.getOutputStream());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import io.druid.tasklogs.TaskLogPusher;
|
|||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
||||
{
|
||||
private final ForkingTaskRunnerConfig config;
|
||||
|
@ -66,4 +66,4 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
|||
{
|
||||
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,13 +88,13 @@ import java.util.concurrent.TimeUnit;
|
|||
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
|
||||
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
|
||||
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
|
||||
* <p/>
|
||||
*
|
||||
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
|
||||
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
|
||||
* For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties.
|
||||
* <p/>
|
||||
*
|
||||
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
|
||||
* <p/>
|
||||
*
|
||||
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
|
||||
*/
|
||||
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
||||
|
@ -848,4 +848,4 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
// Notify interested parties
|
||||
taskRunnerWorkItem.setResult(taskStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import io.druid.server.initialization.ZkPathsConfig;
|
|||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
|
||||
{
|
||||
private final CuratorFramework curator;
|
||||
|
@ -74,4 +74,4 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
|
|||
httpClient
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,6 +161,12 @@ public class TaskLockbox
|
|||
/**
|
||||
* Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock
|
||||
* cannot be acquired.
|
||||
*
|
||||
* @param task task to acquire lock for
|
||||
* @param interval interval to lock
|
||||
* @return acquired TaskLock
|
||||
*
|
||||
* @throws java.lang.InterruptedException if the lock cannot be acquired
|
||||
*/
|
||||
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
|
||||
{
|
||||
|
@ -296,6 +302,7 @@ public class TaskLockbox
|
|||
* Return the currently-active locks for some task.
|
||||
*
|
||||
* @param task task for which to locate locks
|
||||
* @return currently-active locks for the given task
|
||||
*/
|
||||
public List<TaskLock> findLocksForTask(final Task task)
|
||||
{
|
||||
|
|
|
@ -287,4 +287,4 @@ public class TaskMaster
|
|||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,11 +56,11 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
|
||||
/**
|
||||
* Interface between task producers and the task runner.
|
||||
* <p/>
|
||||
*
|
||||
* This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a
|
||||
* {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready
|
||||
* in time (based on its {@link Task#isReady} method).
|
||||
* <p/>
|
||||
*
|
||||
* For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object.
|
||||
*/
|
||||
public class TaskQueue
|
||||
|
@ -292,6 +292,7 @@ public class TaskQueue
|
|||
* @param task task to add
|
||||
*
|
||||
* @return true
|
||||
* @throws TaskExistsException if the task already exists
|
||||
*/
|
||||
public boolean add(final Task task) throws TaskExistsException
|
||||
{
|
||||
|
@ -513,4 +514,4 @@ public class TaskQueue
|
|||
giant.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,8 @@ public interface TaskRunner
|
|||
/**
|
||||
* Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any
|
||||
* currently-running tasks.
|
||||
*
|
||||
* @param taskid task ID to clean up resources for
|
||||
*/
|
||||
public void shutdown(String taskid);
|
||||
|
||||
|
@ -52,4 +54,4 @@ public interface TaskRunner
|
|||
public Collection<? extends TaskRunnerWorkItem> getKnownTasks();
|
||||
|
||||
public Collection<ZkWorker> getWorkers();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,24 +32,33 @@ public interface TaskStorage
|
|||
/**
|
||||
* Adds a task to the storage facility with a particular status.
|
||||
*
|
||||
* @param task task to add
|
||||
* @param status task status
|
||||
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
|
||||
*/
|
||||
public void insert(Task task, TaskStatus status) throws TaskExistsException;
|
||||
|
||||
/**
|
||||
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle
|
||||
* is not respected (absent -> RUNNING -> SUCCESS/FAILURE).
|
||||
* is not respected (absent -> RUNNING -> SUCCESS/FAILURE).
|
||||
*
|
||||
* @param status task status
|
||||
*/
|
||||
public void setStatus(TaskStatus status);
|
||||
|
||||
/**
|
||||
* Persists lock state in the storage facility.
|
||||
* @param taskid task ID
|
||||
* @param taskLock lock state
|
||||
*/
|
||||
public void addLock(String taskid, TaskLock taskLock);
|
||||
|
||||
/**
|
||||
* Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but
|
||||
* this method can help reclaim wasted space.
|
||||
*
|
||||
* @param taskid task ID
|
||||
* @param taskLock lock state
|
||||
*/
|
||||
public void removeLock(String taskid, TaskLock taskLock);
|
||||
|
||||
|
@ -58,28 +67,44 @@ public interface TaskStorage
|
|||
* absentee Optional.
|
||||
*
|
||||
* NOTE: This method really feels like it should be combined with {@link #getStatus}. Expect that in the future.
|
||||
*
|
||||
* @param taskid task ID
|
||||
* @return optional task
|
||||
*/
|
||||
public Optional<Task> getTask(String taskid);
|
||||
|
||||
/**
|
||||
* Returns task status as stored in the storage facility. If the task ID does not exist, this will return
|
||||
* an absentee Optional.
|
||||
*
|
||||
* @param taskid task ID
|
||||
* @return task status
|
||||
*/
|
||||
public Optional<TaskStatus> getStatus(String taskid);
|
||||
|
||||
/**
|
||||
* Add an action taken by a task to the audit log.
|
||||
*
|
||||
* @param task task to record action for
|
||||
* @param taskAction task action to record
|
||||
*
|
||||
* @param <T> task action return type
|
||||
*/
|
||||
public <T> void addAuditLog(Task task, TaskAction<T> taskAction);
|
||||
|
||||
/**
|
||||
* Returns all actions taken by a task.
|
||||
*
|
||||
* @param taskid task ID
|
||||
* @return list of task actions
|
||||
*/
|
||||
public List<TaskAction> getAuditLogs(String taskid);
|
||||
|
||||
/**
|
||||
* Returns a list of currently running or pending tasks as stored in the storage facility. No particular order
|
||||
* is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
|
||||
*
|
||||
* @return list of active tasks
|
||||
*/
|
||||
public List<Task> getActiveTasks();
|
||||
|
||||
|
@ -87,11 +112,16 @@ public interface TaskStorage
|
|||
* Returns a list of recently finished task statuses as stored in the storage facility. No particular order
|
||||
* is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular
|
||||
* standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing.
|
||||
*
|
||||
* @return list of recently finished tasks
|
||||
*/
|
||||
public List<TaskStatus> getRecentlyFinishedTaskStatuses();
|
||||
|
||||
/**
|
||||
* Returns a list of locks for a particular task.
|
||||
*
|
||||
* @param taskid task ID
|
||||
* @return list of TaskLocks for the given task
|
||||
*/
|
||||
public List<TaskLock> getLocks(String taskid);
|
||||
}
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
|
@ -30,7 +28,6 @@ import io.druid.indexing.common.actions.TaskAction;
|
|||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -73,6 +70,9 @@ public class TaskStorageQueryAdapter
|
|||
* This method is useful when you want to figure out all of the things a single task spawned. It does pose issues
|
||||
* with the result set perhaps growing boundlessly and we do not do anything to protect against that. Use at your
|
||||
* own risk and know that at some point, we might adjust this to actually enforce some sort of limits.
|
||||
*
|
||||
* @param taskid task ID
|
||||
* @return set of segments created by the specified task
|
||||
*/
|
||||
public Set<DataSegment> getInsertedSegments(final String taskid)
|
||||
{
|
||||
|
|
|
@ -86,19 +86,19 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
|
|||
runningItems.add(taskRunnerWorkItem);
|
||||
Futures.addCallback(
|
||||
statusFuture, new FutureCallback<TaskStatus>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(TaskStatus result)
|
||||
{
|
||||
runningItems.remove(taskRunnerWorkItem);
|
||||
}
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(TaskStatus result)
|
||||
{
|
||||
runningItems.remove(taskRunnerWorkItem);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
runningItems.remove(taskRunnerWorkItem);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
runningItems.remove(taskRunnerWorkItem);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return statusFuture;
|
||||
|
@ -252,4 +252,4 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -523,4 +523,4 @@ public class OverlordResource
|
|||
return data;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.granularity.PeriodGranularity;
|
||||
import io.druid.indexing.overlord.RemoteTaskRunner;
|
||||
import io.druid.indexing.overlord.TaskRunner;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Period;
|
||||
|
@ -33,7 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
|
||||
/**
|
||||
* The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed.
|
||||
* It uses a {@link io.druid.indexing.overlord.RemoteTaskRunner.TaskRunner} to return all pending tasks in the system and the status of the worker nodes in
|
||||
* It uses a {@link TaskRunner} to return all pending tasks in the system and the status of the worker nodes in
|
||||
* the system.
|
||||
* The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually
|
||||
* occur. That decision is made in the {@link ResourceManagementStrategy}.
|
||||
|
|
|
@ -35,6 +35,9 @@ public interface EC2UserData<T extends EC2UserData>
|
|||
/**
|
||||
* Return a copy of this instance with a different worker version. If no changes are needed (possibly because the
|
||||
* user data does not depend on the worker version) then it is OK to return "this".
|
||||
*
|
||||
* @param version worker version
|
||||
* @return instance with the specified version
|
||||
*/
|
||||
public EC2UserData<T> withVersion(String version);
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ import java.util.concurrent.ExecutorService;
|
|||
/**
|
||||
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
|
||||
* created that waits for new tasks. Tasks are executed as soon as they are seen.
|
||||
* <p/>
|
||||
*
|
||||
* The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
|
||||
* realtime index tasks.
|
||||
*/
|
||||
|
@ -197,4 +197,4 @@ public class WorkerTaskMonitor
|
|||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,4 +170,4 @@ public class ExecutorLifecycle
|
|||
{
|
||||
parentMonitorExec.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
|
|||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
this.status = status;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.indexing.common;
|
||||
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
@ -207,6 +206,7 @@ public class TaskSerdeTest
|
|||
new Period("PT10M"),
|
||||
1,
|
||||
Granularity.HOUR,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
@ -426,7 +426,8 @@ public class TaskSerdeTest
|
|||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
null,
|
||||
"blah"
|
||||
);
|
||||
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
@ -441,5 +442,7 @@ public class TaskSerdeTest
|
|||
task.getSpec().getTuningConfig().getJobProperties(),
|
||||
task2.getSpec().getTuningConfig().getJobProperties()
|
||||
);
|
||||
Assert.assertEquals("blah", task.getClasspathPrefix());
|
||||
Assert.assertEquals("blah", task2.getClasspathPrefix());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,13 +44,12 @@ import io.druid.data.input.MapBasedInputRow;
|
|||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.db.IndexerSQLMetadataCoordinator;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.indexing.common.TestUtils;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.indexing.common.SegmentLoaderFactory;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.indexing.common.TaskToolboxFactory;
|
||||
import io.druid.indexing.common.TestUtils;
|
||||
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import io.druid.indexing.common.actions.LockListAction;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
|
@ -67,6 +66,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
|
|||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.loading.DataSegmentArchiver;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.DataSegmentMover;
|
||||
|
@ -101,7 +101,7 @@ public class TaskLifecycleTest
|
|||
private TaskLockbox tl = null;
|
||||
private TaskQueue tq = null;
|
||||
private TaskRunner tr = null;
|
||||
private MockIndexerMetadataCoordinator mdc = null;
|
||||
private MockIndexerDBCoordinator mdc = null;
|
||||
private TaskActionClientFactory tac = null;
|
||||
private TaskToolboxFactory tb = null;
|
||||
TaskStorageQueryAdapter tsqa = null;
|
||||
|
@ -510,12 +510,12 @@ public class TaskLifecycleTest
|
|||
return retVal;
|
||||
}
|
||||
|
||||
private static class MockIndexerMetadataCoordinator extends IndexerSQLMetadataCoordinator
|
||||
private static class MockIndexerDBCoordinator extends IndexerSQLMetadataCoordinator
|
||||
{
|
||||
final private Set<DataSegment> published = Sets.newHashSet();
|
||||
final private Set<DataSegment> nuked = Sets.newHashSet();
|
||||
|
||||
private MockIndexerMetadataCoordinator()
|
||||
private MockIndexerDBCoordinator()
|
||||
{
|
||||
super(null, null, null);
|
||||
}
|
||||
|
@ -562,9 +562,9 @@ public class TaskLifecycleTest
|
|||
}
|
||||
}
|
||||
|
||||
private static MockIndexerMetadataCoordinator newMockMDC()
|
||||
private static MockIndexerDBCoordinator newMockMDC()
|
||||
{
|
||||
return new MockIndexerMetadataCoordinator();
|
||||
return new MockIndexerDBCoordinator();
|
||||
}
|
||||
|
||||
private static ServiceEmitter newMockEmitter()
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.indexing.overlord.scaling;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
|
|
@ -49,6 +49,7 @@ public class TaskAnnouncementTest
|
|||
new Period("PT10M"),
|
||||
1,
|
||||
Granularity.HOUR,
|
||||
null,
|
||||
null
|
||||
);
|
||||
final TaskStatus status = TaskStatus.running(task.getId());
|
||||
|
|
|
@ -70,10 +70,14 @@ public class DurationGranularity extends BaseQueryGranularity
|
|||
}
|
||||
|
||||
@Override
|
||||
public long truncate(long t)
|
||||
public long truncate(final long t)
|
||||
{
|
||||
final long duration = getDurationMillis();
|
||||
return t - t % duration + origin;
|
||||
long offset = t % duration - origin % duration;
|
||||
if(offset < 0) {
|
||||
offset += duration;
|
||||
}
|
||||
return t - offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -302,14 +302,17 @@ public class PeriodGranularity extends BaseQueryGranularity
|
|||
return current;
|
||||
}
|
||||
|
||||
private long truncateMillisPeriod(long t)
|
||||
private long truncateMillisPeriod(final long t)
|
||||
{
|
||||
// toStandardDuration assumes days are always 24h, and hours are always 60 minutes,
|
||||
// which may not always be the case, e.g if there are daylight saving changes.
|
||||
if(chronology.days().isPrecise() && chronology.hours().isPrecise()) {
|
||||
if (chronology.days().isPrecise() && chronology.hours().isPrecise()) {
|
||||
final long millis = period.toStandardDuration().getMillis();
|
||||
t -= t % millis + origin % millis;
|
||||
return t;
|
||||
long offset = t % millis - origin % millis;
|
||||
if(offset < 0) {
|
||||
offset += millis;
|
||||
}
|
||||
return t - offset;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -23,8 +23,6 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.metamx.common.config.Config;
|
||||
import io.druid.guice.JsonConfigurator;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import javax.validation.Validation;
|
||||
|
|
|
@ -24,13 +24,6 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.util.Modules;
|
||||
import io.druid.guice.ConfigModule;
|
||||
import io.druid.guice.DruidGuiceExtensions;
|
||||
import io.druid.guice.DruidSecondaryModule;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.PropertiesModule;
|
||||
import io.druid.jackson.JacksonModule;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
|
|
@ -102,6 +102,7 @@ public abstract class BaseQuery<T> implements Query<T>
|
|||
return duration;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
|
|
|
@ -20,12 +20,8 @@
|
|||
package io.druid.query;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.Yielders;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
|
|
@ -48,14 +48,14 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
/**
|
||||
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
|
||||
* <p/>
|
||||
*
|
||||
* When using this, it is important to make sure that the list of QueryRunners provided is fully flattened.
|
||||
* If, for example, you were to pass a list of a Chained QueryRunner (A) and a non-chained QueryRunner (B). Imagine
|
||||
* A has 2 QueryRunner chained together (Aa and Ab), the fact that the Queryables are run in parallel on an
|
||||
* executor would mean that the Queryables are actually processed in the order
|
||||
* <p/>
|
||||
* A -> B -> Aa -> Ab
|
||||
* <p/>
|
||||
*
|
||||
* <pre>A -> B -> Aa -> Ab</pre>
|
||||
*
|
||||
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
|
||||
* must be fully cached in memory before the results for Aa and Ab are computed.
|
||||
*/
|
||||
|
|
|
@ -42,6 +42,6 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
|
|||
@Config(value = "${base_path}.columnCache.sizeBytes")
|
||||
public int columnCacheSizeBytes()
|
||||
{
|
||||
return 1024 * 1024;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,9 +59,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for AndDimFilter.
|
||||
* <p/>
|
||||
*
|
||||
* Required: fields() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
|
||||
|
@ -105,9 +105,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for OrDimFilter.
|
||||
* <p/>
|
||||
*
|
||||
* Required: fields() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* OrDimFilter orDimFilter = Druids.newOrDimFilterBuilder()
|
||||
|
@ -160,9 +160,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for NotDimFilter.
|
||||
* <p/>
|
||||
*
|
||||
* Required: field() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* NotDimFilter notDimFilter = Druids.newNotDimFilterBuilder()
|
||||
|
@ -206,9 +206,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for SelectorDimFilter.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dimension() and value() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* Selector selDimFilter = Druids.newSelectorDimFilterBuilder()
|
||||
|
@ -285,10 +285,10 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for TimeseriesQuery.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dataSource(), intervals(), and aggregators() must be called before build()
|
||||
* Optional: filters(), granularity(), postAggregators(), and context() can be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -483,11 +483,11 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for SearchQuery.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dataSource(), intervals(), dimensions() and query() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Optional: filters(), granularity(), and context() can be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* SearchQuery query = Druids.newSearchQueryBuilder()
|
||||
|
@ -678,9 +678,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for TimeBoundaryQuery.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dataSource() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* TimeBoundaryQuery query = new MaxTimeQueryBuilder()
|
||||
|
@ -774,12 +774,12 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for Result.
|
||||
* <p/>
|
||||
*
|
||||
* Required: timestamp() and value() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* Result<T> result = Druids.newResultBuilder()
|
||||
* Result<T> result = Druids.newResultBuilder()
|
||||
* .timestamp(egDateTime)
|
||||
* .value(egValue)
|
||||
* .build();
|
||||
|
@ -840,9 +840,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for SegmentMetadataQuery.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dataSource(), intervals() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
|
||||
|
@ -948,9 +948,9 @@ public class Druids
|
|||
|
||||
/**
|
||||
* A Builder for SelectQuery.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dataSource(), intervals() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* SelectQuery query = new SelectQueryBuilder()
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
package io.druid.query;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
|
||||
@JsonTypeName("table")
|
||||
|
|
|
@ -35,10 +35,13 @@ import java.util.Map;
|
|||
*/
|
||||
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private static final String DEFAULT_METRIC_NAME = "query/time";
|
||||
|
||||
private final ServiceEmitter emitter;
|
||||
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
|
||||
private final QueryRunner<T> queryRunner;
|
||||
private final long creationTime;
|
||||
private final String metricName;
|
||||
|
||||
public MetricsEmittingQueryRunner(
|
||||
ServiceEmitter emitter,
|
||||
|
@ -46,25 +49,38 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
QueryRunner<T> queryRunner
|
||||
)
|
||||
{
|
||||
this(emitter, builderFn, queryRunner, -1);
|
||||
this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME);
|
||||
}
|
||||
|
||||
public MetricsEmittingQueryRunner(
|
||||
ServiceEmitter emitter,
|
||||
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
|
||||
QueryRunner<T> queryRunner,
|
||||
long creationTime
|
||||
long creationTime,
|
||||
String metricName
|
||||
)
|
||||
{
|
||||
this.emitter = emitter;
|
||||
this.builderFn = builderFn;
|
||||
this.queryRunner = queryRunner;
|
||||
this.creationTime = creationTime;
|
||||
this.metricName = metricName;
|
||||
}
|
||||
|
||||
public MetricsEmittingQueryRunner(
|
||||
ServiceEmitter emitter,
|
||||
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
|
||||
QueryRunner<T> queryRunner,
|
||||
String metricName
|
||||
)
|
||||
{
|
||||
this(emitter, builderFn, queryRunner, -1, metricName);
|
||||
}
|
||||
|
||||
|
||||
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
|
||||
{
|
||||
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis());
|
||||
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis(), metricName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,9 +115,9 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
finally {
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
|
||||
emitter.emit(builder.build("query/time", timeTaken));
|
||||
emitter.emit(builder.build(metricName, timeTaken));
|
||||
|
||||
if(creationTime > 0) {
|
||||
if (creationTime > 0) {
|
||||
emitter.emit(builder.build("query/wait", startTime - creationTime));
|
||||
}
|
||||
}
|
||||
|
@ -175,12 +191,13 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
emitter.emit(builder.build("query/time", timeTaken));
|
||||
emitter.emit(builder.build(metricName, timeTaken));
|
||||
|
||||
if (creationTime > 0) {
|
||||
emitter.emit(builder.build("query/wait", startTime - creationTime));
|
||||
}
|
||||
} finally {
|
||||
}
|
||||
finally {
|
||||
yielder.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,4 +29,4 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|||
public interface PostProcessingOperator<T>
|
||||
{
|
||||
public QueryRunner<T> postProcess(QueryRunner<T> baseQueryRunner);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,4 +92,4 @@ public interface Query<T>
|
|||
public String getId();
|
||||
|
||||
Query<T> withDataSource(DataSource dataSource);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.query;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
|
|
@ -29,6 +29,8 @@ public interface QuerySegmentWalker
|
|||
* Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s)
|
||||
* such that it represents the interval.
|
||||
*
|
||||
* @param <T> query result type
|
||||
* @param query the query to find a Queryable for
|
||||
* @param intervals the intervals to find a Queryable for
|
||||
* @return a Queryable object that represents the interval
|
||||
*/
|
||||
|
@ -36,8 +38,10 @@ public interface QuerySegmentWalker
|
|||
|
||||
/**
|
||||
* Gets the Queryable for a given list of SegmentSpecs.
|
||||
* exist.
|
||||
*
|
||||
* @param <T> the query result type
|
||||
* @param query the query to return a Queryable for
|
||||
* @param specs the list of SegmentSpecs to find a Queryable for
|
||||
* @return the Queryable object with the given SegmentSpecs
|
||||
*/
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs);
|
||||
|
|
|
@ -23,7 +23,6 @@ package io.druid.query;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
|
|
@ -158,4 +158,4 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
|
|||
tOffset += start;
|
||||
return tOffset - t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,9 +26,9 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* Processing related interface
|
||||
* <p/>
|
||||
*
|
||||
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
|
||||
* <p/>
|
||||
*
|
||||
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
|
||||
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
|
||||
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.query.aggregation;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
@ -37,6 +36,8 @@ public class AggregatorUtil
|
|||
* such that all the dependencies of any given aggregator should occur before that aggregator.
|
||||
* See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
|
||||
* @param postAggName name of the postAgg on which dependency is to be calculated
|
||||
*
|
||||
* @return the list of dependent postAggregators
|
||||
*/
|
||||
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String postAggName)
|
||||
{
|
||||
|
|
|
@ -136,6 +136,8 @@ public class Histogram
|
|||
* Returns a visual representation of a histogram object.
|
||||
* Initially returns an array of just the min. and max. values
|
||||
* but can also support the addition of quantiles.
|
||||
*
|
||||
* @return a visual representation of this histogram
|
||||
*/
|
||||
public HistogramVisual asVisual() {
|
||||
float[] visualCounts = new float[bins.length - 2];
|
||||
|
|
|
@ -32,8 +32,6 @@ import org.mozilla.javascript.Context;
|
|||
import org.mozilla.javascript.ContextAction;
|
||||
import org.mozilla.javascript.ContextFactory;
|
||||
import org.mozilla.javascript.Function;
|
||||
import org.mozilla.javascript.NativeArray;
|
||||
import org.mozilla.javascript.Scriptable;
|
||||
import org.mozilla.javascript.ScriptableObject;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -41,7 +39,6 @@ import java.lang.reflect.Array;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.commons.codec.binary.Base64;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -233,6 +232,40 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
|
|||
return HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;
|
||||
|
||||
if (byRow != that.byRow) {
|
||||
return false;
|
||||
}
|
||||
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) {
|
||||
return false;
|
||||
}
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = name != null ? name.hashCode() : 0;
|
||||
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
|
||||
result = 31 * result + (byRow ? 1 : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -28,18 +28,20 @@ import java.nio.ByteBuffer;
|
|||
|
||||
/**
|
||||
* Implements the HyperLogLog cardinality estimator described in:
|
||||
* <p/>
|
||||
*
|
||||
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
|
||||
* <p/>
|
||||
*
|
||||
* Run this code to see a simple indication of expected errors based on different m values:
|
||||
* <p/>
|
||||
* for (int i = 1; i < 20; ++i) {
|
||||
* System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
|
||||
* }
|
||||
* <p/>
|
||||
*
|
||||
* <code>
|
||||
* for (int i = 1; i < 20; ++i) {
|
||||
* System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
|
||||
* }
|
||||
* </code>
|
||||
*
|
||||
* This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that
|
||||
* only one thread is ever calling methods on it.
|
||||
* <p/>
|
||||
*
|
||||
* If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior
|
||||
*/
|
||||
public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector>
|
||||
|
@ -195,6 +197,13 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
|||
return applyCorrection(e, zeroCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the payload for the given ByteBuffer is sparse or not.
|
||||
* The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse
|
||||
*
|
||||
* @param buffer
|
||||
* @return
|
||||
*/
|
||||
private static boolean isSparse(ByteBuffer buffer)
|
||||
{
|
||||
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
|
||||
|
@ -495,13 +504,32 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
|||
return false;
|
||||
}
|
||||
|
||||
HyperLogLogCollector collector = (HyperLogLogCollector) o;
|
||||
ByteBuffer otherBuffer = ((HyperLogLogCollector) o).storageBuffer;
|
||||
|
||||
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) {
|
||||
if (storageBuffer != null ? false : otherBuffer != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
if(storageBuffer == null && otherBuffer == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
final ByteBuffer denseStorageBuffer;
|
||||
if(storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
||||
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
|
||||
denseCollector.convertToDenseStorage();
|
||||
denseStorageBuffer = denseCollector.storageBuffer;
|
||||
} else {
|
||||
denseStorageBuffer = storageBuffer;
|
||||
}
|
||||
|
||||
if(otherBuffer.remaining() != getNumBytesForDenseStorage()) {
|
||||
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
|
||||
otherCollector.convertToDenseStorage();
|
||||
otherBuffer = otherCollector.storageBuffer;
|
||||
}
|
||||
|
||||
return denseStorageBuffer.equals(otherBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -127,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
if (dataSource instanceof QueryDataSource) {
|
||||
GroupByQuery subquery;
|
||||
try {
|
||||
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
|
||||
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(query.getContext());
|
||||
}
|
||||
catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
||||
|
@ -193,7 +193,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
@Override
|
||||
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
||||
{
|
||||
return new OrderedMergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
|
||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> mergeSequencesUnordered(Sequence<Sequence<Row>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
private Ordering<Row> getOrdering()
|
||||
{
|
||||
return Ordering.<Row>natural().nullsFirst();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -257,6 +268,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
{
|
||||
return new CacheStrategy<Row, Object, GroupByQuery>()
|
||||
{
|
||||
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
|
||||
|
||||
@Override
|
||||
public byte[] computeCacheKey(GroupByQuery query)
|
||||
{
|
||||
|
@ -342,14 +355,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
|
||||
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
|
||||
|
||||
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
|
||||
|
||||
Map<String, Object> event = jsonMapper.convertValue(
|
||||
results.next(),
|
||||
new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
|
||||
while (aggsIter.hasNext()) {
|
||||
final AggregatorFactory factory = aggsIter.next();
|
||||
Object agg = event.get(factory.getName());
|
||||
if (agg != null) {
|
||||
event.put(factory.getName(), factory.deserialize(agg));
|
||||
}
|
||||
}
|
||||
|
||||
return new MapBasedRow(
|
||||
timestamp,
|
||||
(Map<String, Object>) jsonMapper.convertValue(
|
||||
results.next(),
|
||||
new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
)
|
||||
event
|
||||
);
|
||||
}
|
||||
};
|
||||
|
@ -358,7 +383,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
@Override
|
||||
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -24,13 +24,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Result;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
|
||||
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
|
||||
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class GreaterThanHavingSpec implements HavingSpec
|
||||
|
|
|
@ -23,13 +23,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Result;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
|
||||
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
|
||||
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class LessThanHavingSpec implements HavingSpec
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.query.groupby.having;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Result;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Result;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|||
import com.google.common.base.Function;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Functions;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
|
|
|
@ -186,7 +186,7 @@ public class OrderByColumnSpec
|
|||
final byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
final byte[] directionBytes = direction.name().getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length)
|
||||
return ByteBuffer.allocate(dimensionBytes.length + directionBytes.length)
|
||||
.put(dimensionBytes)
|
||||
.put(directionBytes)
|
||||
.array();
|
||||
|
|
|
@ -146,7 +146,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
|||
@Override
|
||||
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
|
||||
{
|
||||
return new OrderedMergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
|
||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<SegmentAnalysis> mergeSequencesUnordered(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -112,7 +112,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
@Override
|
||||
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new OrderedMergeSequence<Result<SearchResultValue>>(getOrdering(), seqOfSequences);
|
||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<SearchResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -114,7 +114,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
|||
@Override
|
||||
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new OrderedMergeSequence<Result<SelectResultValue>>(getOrdering(), seqOfSequences);
|
||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<SelectResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,14 +19,11 @@
|
|||
|
||||
package io.druid.query.select;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.query.ChainedExecutionQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryConfig;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryToolChest;
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.query.timeboundary;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.query.BaseQuery;
|
||||
|
|
|
@ -42,7 +42,6 @@ import io.druid.query.aggregation.MetricManipulationFn;
|
|||
import io.druid.timeline.LogicalSegment;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -115,6 +114,12 @@ public class TimeBoundaryQueryQueryToolChest
|
|||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TimeBoundaryResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
|
||||
{
|
||||
|
@ -146,9 +151,10 @@ public class TimeBoundaryQueryQueryToolChest
|
|||
@Override
|
||||
public byte[] computeCacheKey(TimeBoundaryQuery query)
|
||||
{
|
||||
return ByteBuffer.allocate(2)
|
||||
final byte[] cacheKey = query.getCacheKey();
|
||||
return ByteBuffer.allocate(1 + cacheKey.length)
|
||||
.put(TIMEBOUNDARY_QUERY)
|
||||
.put(query.getCacheKey())
|
||||
.put(cacheKey)
|
||||
.array();
|
||||
}
|
||||
|
||||
|
|
|
@ -112,7 +112,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
|
|||
@Override
|
||||
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
|
||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TimeseriesResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ package io.druid.query.topn;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
|
|
|
@ -24,8 +24,6 @@ import io.druid.query.Result;
|
|||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
|
||||
{
|
||||
private final TopNQuery query;
|
||||
|
|
|
@ -39,12 +39,12 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
* A Builder for TopNQuery.
|
||||
* <p/>
|
||||
*
|
||||
* Required: dataSource(), intervals(), metric() and threshold() must be called before build()
|
||||
* Additional requirement for numeric metric sorts: aggregators() must be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Optional: filters(), granularity(), postAggregators() and context() can be called before build()
|
||||
* <p/>
|
||||
*
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* TopNQuery query = new TopNQueryBuilder()
|
||||
|
|
|
@ -72,7 +72,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
|
||||
{
|
||||
};
|
||||
|
||||
private final TopNQueryConfig config;
|
||||
|
||||
@Inject
|
||||
|
@ -128,7 +127,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
@Override
|
||||
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new OrderedMergeSequence<Result<TopNResultValue>>(getOrdering(), seqOfSequences);
|
||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TopNResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,11 +20,8 @@
|
|||
package io.druid.query.topn;
|
||||
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
|
|
@ -42,7 +42,7 @@ public interface DimensionSelector
|
|||
*
|
||||
* Value cardinality would be 2.
|
||||
*
|
||||
* @return
|
||||
* @return the value cardinality
|
||||
*/
|
||||
public int getValueCardinality();
|
||||
|
||||
|
@ -57,26 +57,26 @@ public interface DimensionSelector
|
|||
*
|
||||
* getRow() would return
|
||||
*
|
||||
* getRow(0) => [0 1]
|
||||
* getRow(1) => [0]
|
||||
* getRow(2) => [0 1]
|
||||
* getRow(3) => [1]
|
||||
* getRow(0) => [0 1]
|
||||
* getRow(1) => [0]
|
||||
* getRow(2) => [0 1]
|
||||
* getRow(3) => [1]
|
||||
*
|
||||
* and then lookupName would return:
|
||||
*
|
||||
* lookupName(0) => A
|
||||
* lookupName(1) => B
|
||||
* lookupName(0) => A
|
||||
* lookupName(1) => B
|
||||
*
|
||||
* @param id
|
||||
* @return
|
||||
* @param id id to lookup the field name for
|
||||
* @return the field name for the given id
|
||||
*/
|
||||
public String lookupName(int id);
|
||||
|
||||
/**
|
||||
* The ID is the int id value of the field.
|
||||
*
|
||||
* @param name
|
||||
* @return
|
||||
* @param name field name to look up the id for
|
||||
* @return the id for the given field name
|
||||
*/
|
||||
public int lookupId(String name);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.io.OutputSupplier;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
@ -51,7 +50,6 @@ import io.druid.common.utils.JodaUtils;
|
|||
import io.druid.common.utils.SerializerUtils;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.ToLowerCaseAggregatorFactory;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.data.ByteBufferWriter;
|
||||
import io.druid.segment.data.CompressedLongsSupplierSerializer;
|
||||
import io.druid.segment.data.CompressedObjectStrategy;
|
||||
|
@ -115,7 +113,8 @@ public class IndexMerger
|
|||
* @param dataInterval the Interval that the data represents
|
||||
* @param outDir the directory to persist the data to
|
||||
*
|
||||
* @throws java.io.IOException
|
||||
* @return the index output directory
|
||||
* @throws java.io.IOException if an IO error occurs persisting the index
|
||||
*/
|
||||
public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir) throws IOException
|
||||
{
|
||||
|
|
|
@ -33,7 +33,7 @@ public interface QueryableIndex extends ColumnSelector
|
|||
|
||||
/**
|
||||
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
|
||||
* @throws java.io.IOException
|
||||
* @throws java.io.IOException if an exception was thrown closing the index
|
||||
*/
|
||||
@Deprecated
|
||||
public void close() throws IOException;
|
||||
|
|
|
@ -138,7 +138,10 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
{
|
||||
Interval actualInterval = interval;
|
||||
|
||||
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
|
||||
final Interval dataInterval = new Interval(
|
||||
getMinTime().getMillis(),
|
||||
gran.next(gran.truncate(getMaxTime().getMillis()))
|
||||
);
|
||||
|
||||
if (!actualInterval.overlaps(dataInterval)) {
|
||||
return Sequences.empty();
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue