diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index 9aae64dcf51..91beca02f25 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -82,7 +82,7 @@ Data segment announcers are used to announce segments. |Property|Description|Default| |--------|-----------|-------| -|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|legacy| +|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|batch| ##### Single Data Segment Announcer diff --git a/docs/content/DimensionSpecs.md b/docs/content/DimensionSpecs.md index c9cb351343e..a799e5f762b 100644 --- a/docs/content/DimensionSpecs.md +++ b/docs/content/DimensionSpecs.md @@ -31,7 +31,7 @@ Returns dimension values transformed using the given [DimExtractionFn](#toc_3) ## DimExtractionFn -`DimExtractionFn`s define the transformation applied to each dimenion value +`DimExtractionFn`s define the transformation applied to each dimension value ### RegexDimExtractionFn diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 2babeadb78a..f2075502e5d 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1024,6 +1024,48 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); } + @Test + public void testGroupByWithOrderLimit4() + { + GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setGranularity(QueryRunnerTestHelper.allGran) + .setDimensions( + Arrays.asList( + new DefaultDimensionSpec( + QueryRunnerTestHelper.marketDimension, + QueryRunnerTestHelper.marketDimension + ) + ) + ) + .setInterval(QueryRunnerTestHelper.fullOnInterval) + .setLimitSpec( + new DefaultLimitSpec( + Lists.newArrayList( + new OrderByColumnSpec( + QueryRunnerTestHelper.marketDimension, + OrderByColumnSpec.Direction.DESCENDING + ) + ), 3 + ) + ) + .setAggregatorSpecs( + Lists.newArrayList( + QueryRunnerTestHelper.rowsCount + ) + ) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "market", "upfront", "rows", 186L), + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "market", "total_market", "rows", 186L), + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "market", "spot", "rows", 837L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); + } + @Test public void testHavingSpec() { diff --git a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java index f9b20106ba2..4570c7e48cb 100644 --- a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java +++ b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.inject.Provider; -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerViewProvider.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class), @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class) diff --git a/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java index fa48fba4661..f506d03148a 100644 --- a/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java @@ -25,7 +25,7 @@ import com.google.inject.Provider; /** */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class), @JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class) diff --git a/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java b/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java index 717b0b513a0..81dd8d3136f 100644 --- a/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java +++ b/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java @@ -22,7 +22,6 @@ package io.druid.segment.loading; import com.metamx.common.logger.Logger; import io.druid.segment.IndexIO; import io.druid.segment.QueryableIndex; -import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; @@ -40,13 +39,6 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory return IndexIO.loadIndex(parentDir); } catch (IOException e) { - log.warn(e, "Got exception!!!! Going to delete parentDir[%s]", parentDir); - try { - FileUtils.deleteDirectory(parentDir); - } - catch (IOException e2) { - log.error(e, "Problem deleting parentDir[%s]", parentDir); - } throw new SegmentLoadingException(e, "%s", e.getMessage()); } } diff --git a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java index bfec5093392..b1c3162c100 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java @@ -48,6 +48,8 @@ public class OmniSegmentLoader implements SegmentLoader private final List locations; + private final Object lock = new Object(); + @Inject public OmniSegmentLoader( Map pullers, @@ -118,16 +120,33 @@ public class OmniSegmentLoader implements SegmentLoader } File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - if (!storageDir.mkdirs()) { - log.debug("Unable to make parent file[%s]", storageDir); + + // We use a marker to prevent the case where a segment is downloaded, but before the download completes, + // the parent directories of the segment are removed + final File downloadStartMarker = new File(storageDir, "downloadStartMarker"); + synchronized (lock) { + if (!storageDir.mkdirs()) { + log.debug("Unable to make parent file[%s]", storageDir); + } + try { + downloadStartMarker.createNewFile(); + } + catch (IOException e) { + throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir); + } } getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir); + + if (!downloadStartMarker.delete()) { + throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); + } + + loc.addSegment(segment); retVal = storageDir; - } - else { + } else { retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); } @@ -151,9 +170,10 @@ public class OmniSegmentLoader implements SegmentLoader } try { + // Druid creates folders of the form dataSource/interval/version/partitionNum. + // We need to clean up all these directories if they are all empty. File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - log.info("Deleting directory[%s]", cacheFile); - FileUtils.deleteDirectory(cacheFile); + cleanupCacheFiles(loc.getPath(), cacheFile); loc.removeSegment(segment); } catch (IOException e) { @@ -172,4 +192,25 @@ public class OmniSegmentLoader implements SegmentLoader return loader; } + + public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException + { + if (cacheFile.equals(baseFile)) { + return; + } + + synchronized (lock) { + log.info("Deleting directory[%s]", cacheFile); + try { + FileUtils.deleteDirectory(cacheFile); + } + catch (Exception e) { + log.error("Unable to remove file[%s]", cacheFile); + } + } + + if (cacheFile.getParentFile() != null && cacheFile.getParentFile().listFiles().length == 0) { + cleanupCacheFiles(baseFile, cacheFile.getParentFile()); + } + } }