diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index b82ff95cf4b..2d3b03c0680 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -39,7 +39,7 @@ import org.joda.time.Interval; import java.io.File; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -60,34 +60,47 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask private final DataSegment segment; private final IndexSpec indexSpec; private final boolean force; + private final boolean validate; /** * Create a segment converter task to convert a segment to the most recent version including the specified indexSpec + * * @param dataSource The datasource to which this update should be applied - * @param interval The interval in the datasource which to apply the update to - * @param indexSpec The IndexSpec to use in the updated segments - * @param force Force an update, even if the task thinks it doesn't need to update. + * @param interval The interval in the datasource which to apply the update to + * @param indexSpec The IndexSpec to use in the updated segments + * @param force Force an update, even if the task thinks it doesn't need to update. + * @param validate Validate the new segment compared to the old segment on a row by row basis + * * @return A SegmentConverterTask for the datasource's interval with the indexSpec specified. */ - public static ConvertSegmentTask create(String dataSource, Interval interval, IndexSpec indexSpec, boolean force) + public static ConvertSegmentTask create( + String dataSource, + Interval interval, + IndexSpec indexSpec, + boolean force, + boolean validate + ) { final String id = makeId(dataSource, interval); - return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force); + return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force, validate); } /** * Create a task to update the segment specified to the most recent binary version with the specified indexSpec - * @param segment The segment to which this update should be applied + * + * @param segment The segment to which this update should be applied * @param indexSpec The IndexSpec to use in the updated segments - * @param force Force an update, even if the task thinks it doesn't need to update. + * @param force Force an update, even if the task thinks it doesn't need to update. + * @param validate Validate the new segment compared to the old segment on a row by row basis + * * @return A SegmentConverterTask for the segment with the indexSpec specified. */ - public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force) + public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate) { final Interval interval = segment.getInterval(); final String dataSource = segment.getDataSource(); final String id = makeId(dataSource, interval); - return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force); + return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force, validate); } private static String makeId(String dataSource, Interval interval) @@ -105,18 +118,20 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask @JsonProperty("interval") Interval interval, @JsonProperty("segment") DataSegment segment, @JsonProperty("indexSpec") IndexSpec indexSpec, - @JsonProperty("force") Boolean force + @JsonProperty("force") Boolean force, + @JsonProperty("validate") Boolean validate ) { final boolean isForce = force == null ? false : force; + final boolean isValidate = validate == null ? true : validate; if (id == null) { if (segment == null) { - return create(dataSource, interval, indexSpec, isForce); + return create(dataSource, interval, indexSpec, isForce, isValidate); } else { - return create(segment, indexSpec, isForce); + return create(segment, indexSpec, isForce, isValidate); } } - return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce); + return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce, isValidate); } private ConvertSegmentTask( @@ -126,22 +141,32 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask Interval interval, DataSegment segment, IndexSpec indexSpec, - boolean force + boolean force, + boolean validate ) { super(id, groupId, dataSource, interval); this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.force = force; + this.validate = validate; } @JsonProperty - public boolean isForce(){ + public boolean isForce() + { return force; } @JsonProperty - public IndexSpec getIndexSpec(){ + public boolean isValidate() + { + return validate; + } + + @JsonProperty + public IndexSpec getIndexSpec() + { return indexSpec; } @@ -177,10 +202,14 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask { final Integer segmentVersion = segment.getBinaryVersion(); if (!CURR_VERSION_INTEGER.equals(segmentVersion)) { - return new SubTask(getGroupId(), segment, indexSpec, force); - } else if(force){ - log.info("Segment[%s] already at version[%s], forcing conversion", segment.getIdentifier(), segmentVersion); - return new SubTask(getGroupId(), segment, indexSpec, force); + return new SubTask(getGroupId(), segment, indexSpec, force, validate); + } else if (force) { + log.info( + "Segment[%s] already at version[%s], forcing conversion", + segment.getIdentifier(), + segmentVersion + ); + return new SubTask(getGroupId(), segment, indexSpec, force, validate); } else { log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); return null; @@ -198,7 +227,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask } } else { log.info("I'm in a subless mood."); - convertSegment(toolbox, segment, indexSpec, force); + convertSegment(toolbox, segment, indexSpec, force, validate); } return success(); } @@ -228,13 +257,15 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask private final DataSegment segment; private final IndexSpec indexSpec; private final boolean force; + private final boolean validate; @JsonCreator public SubTask( @JsonProperty("groupId") String groupId, @JsonProperty("segment") DataSegment segment, @JsonProperty("indexSpec") IndexSpec indexSpec, - @JsonProperty("force") Boolean force + @JsonProperty("force") Boolean force, + @JsonProperty("validate") Boolean validate ) { super( @@ -252,10 +283,18 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.force = force == null ? false : force; + this.validate = validate == null ? true : validate; } @JsonProperty - public boolean isForce(){ + public boolean isValidate() + { + return validate; + } + + @JsonProperty + public boolean isForce() + { return force; } @@ -275,12 +314,18 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask public TaskStatus run(TaskToolbox toolbox) throws Exception { log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); - convertSegment(toolbox, segment, indexSpec, force); + convertSegment(toolbox, segment, indexSpec, force, validate); return success(); } } - private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec, boolean force) + private static void convertSegment( + TaskToolbox toolbox, + final DataSegment segment, + IndexSpec indexSpec, + boolean force, + boolean validate + ) throws SegmentLoadingException, IOException { log.info("Converting segment[%s]", segment); @@ -299,11 +344,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask } } - final Map localSegments = toolbox.fetchSegments(Arrays.asList(segment)); + final Map localSegments = toolbox.fetchSegments(Collections.singletonList(segment)); final File location = localSegments.get(segment); final File outLocation = new File(location, "v9_out"); - if (IndexIO.convertSegment(location, outLocation, indexSpec, force)) { + if (IndexIO.convertSegment(location, outLocation, indexSpec, force, validate)) { final int outVersion = IndexIO.getVersionFromDir(outLocation); // Appending to the version makes a new version that inherits most comparability parameters of the original diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java index ab90b9d9e55..f172d2c3778 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java @@ -39,7 +39,7 @@ public class ConvertSegmentTaskTest DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); - ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false); + ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false, true); Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Assert.assertEquals(task, task2); @@ -56,7 +56,7 @@ public class ConvertSegmentTaskTest 102937 ); - task = ConvertSegmentTask.create(segment, null, false); + task = ConvertSegmentTask.create(segment, null, false, true); task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Assert.assertEquals(task, task2); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index fa66769af97..76702c8de62 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -164,7 +164,8 @@ public class TaskSerdeTest final ConvertSegmentTask task = ConvertSegmentTask.create( DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), null, - false + false, + true ); final String json = jsonMapper.writeValueAsString(task); @@ -189,7 +190,8 @@ public class TaskSerdeTest "myGroupId", DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), indexSpec, - false + false, + true ); final String json = jsonMapper.writeValueAsString(task); @@ -370,8 +372,9 @@ public class TaskSerdeTest 0, 12345L ), - indexSpec - , false + indexSpec, + false, + true ); final String json = jsonMapper.writeValueAsString(task); final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class); @@ -394,8 +397,9 @@ public class TaskSerdeTest ); final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create( segment, - new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed") - , false + new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed"), + false, + true ); final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal); final Task task = jsonMapper.readValue(json, Task.class); diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index d737e732f7e..036b5bf0d8d 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -198,14 +198,13 @@ public class IndexIO public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException { - return convertSegment(toConvert, converted, indexSpec, false); + return convertSegment(toConvert, converted, indexSpec, false, true); } - public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) + public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent, boolean validate) throws IOException { final int version = SegmentUtils.getVersionFromDir(toConvert); - switch (version) { case 1: case 2: @@ -231,7 +230,9 @@ public class IndexIO default: if (forceIfCurrent) { IndexMaker.convert(toConvert, converted, indexSpec); - DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); + if(validate){ + DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); + } return true; } else { log.info("Version[%s], skipping.", version);