From 29341f98378b16ec1a5d6da6e2bd98cc6432d58b Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 24 Apr 2015 13:07:48 -0700 Subject: [PATCH 1/2] Fix random unit test failure from NoopTask ID collision --- .../src/main/java/io/druid/indexing/common/task/NoopTask.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index e6a32c7ae5e..3b0debf2c13 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -28,6 +28,8 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import org.joda.time.DateTime; +import java.util.UUID; + /** */ public class NoopTask extends AbstractTask @@ -66,7 +68,7 @@ public class NoopTask extends AbstractTask ) { super( - id == null ? String.format("noop_%s", new DateTime()) : id, + id == null ? String.format("noop_%s_%s", new DateTime(), UUID.randomUUID().toString()) : id, "none" ); From 633fdb029e2352b18e2567c383e332888e8e0748 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 24 Apr 2015 13:08:20 -0700 Subject: [PATCH 2/2] Add option to ConvertSegmentTask to skip validation * Validation is enabled by default --- .../common/task/ConvertSegmentTask.java | 101 +++++++++++++----- .../common/task/ConvertSegmentTaskTest.java | 4 +- .../indexing/common/task/TaskSerdeTest.java | 16 +-- .../main/java/io/druid/segment/IndexIO.java | 9 +- 4 files changed, 90 insertions(+), 40 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index b82ff95cf4b..2d3b03c0680 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -39,7 +39,7 @@ import org.joda.time.Interval; import java.io.File; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -60,34 +60,47 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask private final DataSegment segment; private final IndexSpec indexSpec; private final boolean force; + private final boolean validate; /** * Create a segment converter task to convert a segment to the most recent version including the specified indexSpec + * * @param dataSource The datasource to which this update should be applied - * @param interval The interval in the datasource which to apply the update to - * @param indexSpec The IndexSpec to use in the updated segments - * @param force Force an update, even if the task thinks it doesn't need to update. + * @param interval The interval in the datasource which to apply the update to + * @param indexSpec The IndexSpec to use in the updated segments + * @param force Force an update, even if the task thinks it doesn't need to update. + * @param validate Validate the new segment compared to the old segment on a row by row basis + * * @return A SegmentConverterTask for the datasource's interval with the indexSpec specified. */ - public static ConvertSegmentTask create(String dataSource, Interval interval, IndexSpec indexSpec, boolean force) + public static ConvertSegmentTask create( + String dataSource, + Interval interval, + IndexSpec indexSpec, + boolean force, + boolean validate + ) { final String id = makeId(dataSource, interval); - return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force); + return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force, validate); } /** * Create a task to update the segment specified to the most recent binary version with the specified indexSpec - * @param segment The segment to which this update should be applied + * + * @param segment The segment to which this update should be applied * @param indexSpec The IndexSpec to use in the updated segments - * @param force Force an update, even if the task thinks it doesn't need to update. + * @param force Force an update, even if the task thinks it doesn't need to update. + * @param validate Validate the new segment compared to the old segment on a row by row basis + * * @return A SegmentConverterTask for the segment with the indexSpec specified. */ - public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force) + public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate) { final Interval interval = segment.getInterval(); final String dataSource = segment.getDataSource(); final String id = makeId(dataSource, interval); - return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force); + return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force, validate); } private static String makeId(String dataSource, Interval interval) @@ -105,18 +118,20 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask @JsonProperty("interval") Interval interval, @JsonProperty("segment") DataSegment segment, @JsonProperty("indexSpec") IndexSpec indexSpec, - @JsonProperty("force") Boolean force + @JsonProperty("force") Boolean force, + @JsonProperty("validate") Boolean validate ) { final boolean isForce = force == null ? false : force; + final boolean isValidate = validate == null ? true : validate; if (id == null) { if (segment == null) { - return create(dataSource, interval, indexSpec, isForce); + return create(dataSource, interval, indexSpec, isForce, isValidate); } else { - return create(segment, indexSpec, isForce); + return create(segment, indexSpec, isForce, isValidate); } } - return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce); + return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce, isValidate); } private ConvertSegmentTask( @@ -126,22 +141,32 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask Interval interval, DataSegment segment, IndexSpec indexSpec, - boolean force + boolean force, + boolean validate ) { super(id, groupId, dataSource, interval); this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.force = force; + this.validate = validate; } @JsonProperty - public boolean isForce(){ + public boolean isForce() + { return force; } @JsonProperty - public IndexSpec getIndexSpec(){ + public boolean isValidate() + { + return validate; + } + + @JsonProperty + public IndexSpec getIndexSpec() + { return indexSpec; } @@ -177,10 +202,14 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask { final Integer segmentVersion = segment.getBinaryVersion(); if (!CURR_VERSION_INTEGER.equals(segmentVersion)) { - return new SubTask(getGroupId(), segment, indexSpec, force); - } else if(force){ - log.info("Segment[%s] already at version[%s], forcing conversion", segment.getIdentifier(), segmentVersion); - return new SubTask(getGroupId(), segment, indexSpec, force); + return new SubTask(getGroupId(), segment, indexSpec, force, validate); + } else if (force) { + log.info( + "Segment[%s] already at version[%s], forcing conversion", + segment.getIdentifier(), + segmentVersion + ); + return new SubTask(getGroupId(), segment, indexSpec, force, validate); } else { log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); return null; @@ -198,7 +227,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask } } else { log.info("I'm in a subless mood."); - convertSegment(toolbox, segment, indexSpec, force); + convertSegment(toolbox, segment, indexSpec, force, validate); } return success(); } @@ -228,13 +257,15 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask private final DataSegment segment; private final IndexSpec indexSpec; private final boolean force; + private final boolean validate; @JsonCreator public SubTask( @JsonProperty("groupId") String groupId, @JsonProperty("segment") DataSegment segment, @JsonProperty("indexSpec") IndexSpec indexSpec, - @JsonProperty("force") Boolean force + @JsonProperty("force") Boolean force, + @JsonProperty("validate") Boolean validate ) { super( @@ -252,10 +283,18 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.force = force == null ? false : force; + this.validate = validate == null ? true : validate; } @JsonProperty - public boolean isForce(){ + public boolean isValidate() + { + return validate; + } + + @JsonProperty + public boolean isForce() + { return force; } @@ -275,12 +314,18 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask public TaskStatus run(TaskToolbox toolbox) throws Exception { log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); - convertSegment(toolbox, segment, indexSpec, force); + convertSegment(toolbox, segment, indexSpec, force, validate); return success(); } } - private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec, boolean force) + private static void convertSegment( + TaskToolbox toolbox, + final DataSegment segment, + IndexSpec indexSpec, + boolean force, + boolean validate + ) throws SegmentLoadingException, IOException { log.info("Converting segment[%s]", segment); @@ -299,11 +344,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask } } - final Map localSegments = toolbox.fetchSegments(Arrays.asList(segment)); + final Map localSegments = toolbox.fetchSegments(Collections.singletonList(segment)); final File location = localSegments.get(segment); final File outLocation = new File(location, "v9_out"); - if (IndexIO.convertSegment(location, outLocation, indexSpec, force)) { + if (IndexIO.convertSegment(location, outLocation, indexSpec, force, validate)) { final int outVersion = IndexIO.getVersionFromDir(outLocation); // Appending to the version makes a new version that inherits most comparability parameters of the original diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java index ab90b9d9e55..f172d2c3778 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java @@ -39,7 +39,7 @@ public class ConvertSegmentTaskTest DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); - ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false); + ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false, true); Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Assert.assertEquals(task, task2); @@ -56,7 +56,7 @@ public class ConvertSegmentTaskTest 102937 ); - task = ConvertSegmentTask.create(segment, null, false); + task = ConvertSegmentTask.create(segment, null, false, true); task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Assert.assertEquals(task, task2); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index fa66769af97..76702c8de62 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -164,7 +164,8 @@ public class TaskSerdeTest final ConvertSegmentTask task = ConvertSegmentTask.create( DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), null, - false + false, + true ); final String json = jsonMapper.writeValueAsString(task); @@ -189,7 +190,8 @@ public class TaskSerdeTest "myGroupId", DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), indexSpec, - false + false, + true ); final String json = jsonMapper.writeValueAsString(task); @@ -370,8 +372,9 @@ public class TaskSerdeTest 0, 12345L ), - indexSpec - , false + indexSpec, + false, + true ); final String json = jsonMapper.writeValueAsString(task); final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class); @@ -394,8 +397,9 @@ public class TaskSerdeTest ); final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create( segment, - new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed") - , false + new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed"), + false, + true ); final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal); final Task task = jsonMapper.readValue(json, Task.class); diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index d737e732f7e..036b5bf0d8d 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -198,14 +198,13 @@ public class IndexIO public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException { - return convertSegment(toConvert, converted, indexSpec, false); + return convertSegment(toConvert, converted, indexSpec, false, true); } - public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) + public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent, boolean validate) throws IOException { final int version = SegmentUtils.getVersionFromDir(toConvert); - switch (version) { case 1: case 2: @@ -231,7 +230,9 @@ public class IndexIO default: if (forceIfCurrent) { IndexMaker.convert(toConvert, converted, indexSpec); - DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); + if(validate){ + DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); + } return true; } else { log.info("Version[%s], skipping.", version);