mirror of https://github.com/apache/druid.git
Merge remote-tracking branch 'origin/master' into druid-845
This commit is contained in:
commit
9a9238a801
|
@ -82,7 +82,7 @@ Data segment announcers are used to announce segments.
|
|||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|legacy|
|
||||
|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|batch|
|
||||
|
||||
##### Single Data Segment Announcer
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ Returns dimension values transformed using the given [DimExtractionFn](#toc_3)
|
|||
|
||||
## DimExtractionFn
|
||||
|
||||
`DimExtractionFn`s define the transformation applied to each dimenion value
|
||||
`DimExtractionFn`s define the transformation applied to each dimension value
|
||||
|
||||
### RegexDimExtractionFn
|
||||
|
||||
|
|
|
@ -1024,6 +1024,48 @@ public class GroupByQueryRunnerTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithOrderLimit4()
|
||||
{
|
||||
GroupByQuery query = new GroupByQuery.Builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.setDimensions(
|
||||
Arrays.<DimensionSpec>asList(
|
||||
new DefaultDimensionSpec(
|
||||
QueryRunnerTestHelper.marketDimension,
|
||||
QueryRunnerTestHelper.marketDimension
|
||||
)
|
||||
)
|
||||
)
|
||||
.setInterval(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
Lists.newArrayList(
|
||||
new OrderByColumnSpec(
|
||||
QueryRunnerTestHelper.marketDimension,
|
||||
OrderByColumnSpec.Direction.DESCENDING
|
||||
)
|
||||
), 3
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
Lists.<AggregatorFactory>newArrayList(
|
||||
QueryRunnerTestHelper.rowsCount
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "market", "upfront", "rows", 186L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "market", "total_market", "rows", 186L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "market", "spot", "rows", 837L)
|
||||
);
|
||||
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHavingSpec()
|
||||
{
|
||||
|
|
|
@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.inject.Provider;
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class)
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerViewProvider.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class),
|
||||
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class)
|
||||
|
|
|
@ -25,7 +25,7 @@ import com.google.inject.Provider;
|
|||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class)
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class),
|
||||
@JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class)
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.segment.loading;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -40,13 +39,6 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
|
|||
return IndexIO.loadIndex(parentDir);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Got exception!!!! Going to delete parentDir[%s]", parentDir);
|
||||
try {
|
||||
FileUtils.deleteDirectory(parentDir);
|
||||
}
|
||||
catch (IOException e2) {
|
||||
log.error(e, "Problem deleting parentDir[%s]", parentDir);
|
||||
}
|
||||
throw new SegmentLoadingException(e, "%s", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,8 @@ public class OmniSegmentLoader implements SegmentLoader
|
|||
|
||||
private final List<StorageLocation> locations;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
@Inject
|
||||
public OmniSegmentLoader(
|
||||
Map<String, DataSegmentPuller> pullers,
|
||||
|
@ -118,16 +120,33 @@ public class OmniSegmentLoader implements SegmentLoader
|
|||
}
|
||||
|
||||
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
if (!storageDir.mkdirs()) {
|
||||
log.debug("Unable to make parent file[%s]", storageDir);
|
||||
|
||||
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
|
||||
// the parent directories of the segment are removed
|
||||
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
|
||||
synchronized (lock) {
|
||||
if (!storageDir.mkdirs()) {
|
||||
log.debug("Unable to make parent file[%s]", storageDir);
|
||||
}
|
||||
try {
|
||||
downloadStartMarker.createNewFile();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir);
|
||||
}
|
||||
}
|
||||
|
||||
getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir);
|
||||
|
||||
if (!downloadStartMarker.delete()) {
|
||||
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
|
||||
}
|
||||
|
||||
|
||||
loc.addSegment(segment);
|
||||
|
||||
retVal = storageDir;
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
}
|
||||
|
||||
|
@ -151,9 +170,10 @@ public class OmniSegmentLoader implements SegmentLoader
|
|||
}
|
||||
|
||||
try {
|
||||
// Druid creates folders of the form dataSource/interval/version/partitionNum.
|
||||
// We need to clean up all these directories if they are all empty.
|
||||
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
log.info("Deleting directory[%s]", cacheFile);
|
||||
FileUtils.deleteDirectory(cacheFile);
|
||||
cleanupCacheFiles(loc.getPath(), cacheFile);
|
||||
loc.removeSegment(segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -172,4 +192,25 @@ public class OmniSegmentLoader implements SegmentLoader
|
|||
|
||||
return loader;
|
||||
}
|
||||
|
||||
public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException
|
||||
{
|
||||
if (cacheFile.equals(baseFile)) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (lock) {
|
||||
log.info("Deleting directory[%s]", cacheFile);
|
||||
try {
|
||||
FileUtils.deleteDirectory(cacheFile);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Unable to remove file[%s]", cacheFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (cacheFile.getParentFile() != null && cacheFile.getParentFile().listFiles().length == 0) {
|
||||
cleanupCacheFiles(baseFile, cacheFile.getParentFile());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue