Merge pull request #1310 from metamx/optionalValidation

Optional validation on ConvertSegmentTask
This commit is contained in:
Xavier Léauté 2015-04-27 11:19:57 -07:00
commit 962997f3f7
5 changed files with 93 additions and 41 deletions

View File

@ -39,7 +39,7 @@ import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -60,34 +60,47 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
private final DataSegment segment; private final DataSegment segment;
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
private final boolean force; private final boolean force;
private final boolean validate;
/** /**
* Create a segment converter task to convert a segment to the most recent version including the specified indexSpec * Create a segment converter task to convert a segment to the most recent version including the specified indexSpec
*
* @param dataSource The datasource to which this update should be applied * @param dataSource The datasource to which this update should be applied
* @param interval The interval in the datasource which to apply the update to * @param interval The interval in the datasource which to apply the update to
* @param indexSpec The IndexSpec to use in the updated segments * @param indexSpec The IndexSpec to use in the updated segments
* @param force Force an update, even if the task thinks it doesn't need to update. * @param force Force an update, even if the task thinks it doesn't need to update.
* @param validate Validate the new segment compared to the old segment on a row by row basis
*
* @return A SegmentConverterTask for the datasource's interval with the indexSpec specified. * @return A SegmentConverterTask for the datasource's interval with the indexSpec specified.
*/ */
public static ConvertSegmentTask create(String dataSource, Interval interval, IndexSpec indexSpec, boolean force) public static ConvertSegmentTask create(
String dataSource,
Interval interval,
IndexSpec indexSpec,
boolean force,
boolean validate
)
{ {
final String id = makeId(dataSource, interval); final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force); return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force, validate);
} }
/** /**
* Create a task to update the segment specified to the most recent binary version with the specified indexSpec * Create a task to update the segment specified to the most recent binary version with the specified indexSpec
* @param segment The segment to which this update should be applied *
* @param segment The segment to which this update should be applied
* @param indexSpec The IndexSpec to use in the updated segments * @param indexSpec The IndexSpec to use in the updated segments
* @param force Force an update, even if the task thinks it doesn't need to update. * @param force Force an update, even if the task thinks it doesn't need to update.
* @param validate Validate the new segment compared to the old segment on a row by row basis
*
* @return A SegmentConverterTask for the segment with the indexSpec specified. * @return A SegmentConverterTask for the segment with the indexSpec specified.
*/ */
public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force) public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate)
{ {
final Interval interval = segment.getInterval(); final Interval interval = segment.getInterval();
final String dataSource = segment.getDataSource(); final String dataSource = segment.getDataSource();
final String id = makeId(dataSource, interval); final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force); return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force, validate);
} }
private static String makeId(String dataSource, Interval interval) private static String makeId(String dataSource, Interval interval)
@ -105,18 +118,20 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@JsonProperty("interval") Interval interval, @JsonProperty("interval") Interval interval,
@JsonProperty("segment") DataSegment segment, @JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force @JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
) )
{ {
final boolean isForce = force == null ? false : force; final boolean isForce = force == null ? false : force;
final boolean isValidate = validate == null ? true : validate;
if (id == null) { if (id == null) {
if (segment == null) { if (segment == null) {
return create(dataSource, interval, indexSpec, isForce); return create(dataSource, interval, indexSpec, isForce, isValidate);
} else { } else {
return create(segment, indexSpec, isForce); return create(segment, indexSpec, isForce, isValidate);
} }
} }
return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce); return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce, isValidate);
} }
private ConvertSegmentTask( private ConvertSegmentTask(
@ -126,22 +141,32 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
Interval interval, Interval interval,
DataSegment segment, DataSegment segment,
IndexSpec indexSpec, IndexSpec indexSpec,
boolean force boolean force,
boolean validate
) )
{ {
super(id, groupId, dataSource, interval); super(id, groupId, dataSource, interval);
this.segment = segment; this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.force = force; this.force = force;
this.validate = validate;
} }
@JsonProperty @JsonProperty
public boolean isForce(){ public boolean isForce()
{
return force; return force;
} }
@JsonProperty @JsonProperty
public IndexSpec getIndexSpec(){ public boolean isValidate()
{
return validate;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec; return indexSpec;
} }
@ -177,10 +202,14 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
{ {
final Integer segmentVersion = segment.getBinaryVersion(); final Integer segmentVersion = segment.getBinaryVersion();
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) { if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
return new SubTask(getGroupId(), segment, indexSpec, force); return new SubTask(getGroupId(), segment, indexSpec, force, validate);
} else if(force){ } else if (force) {
log.info("Segment[%s] already at version[%s], forcing conversion", segment.getIdentifier(), segmentVersion); log.info(
return new SubTask(getGroupId(), segment, indexSpec, force); "Segment[%s] already at version[%s], forcing conversion",
segment.getIdentifier(),
segmentVersion
);
return new SubTask(getGroupId(), segment, indexSpec, force, validate);
} else { } else {
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
return null; return null;
@ -198,7 +227,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
} }
} else { } else {
log.info("I'm in a subless mood."); log.info("I'm in a subless mood.");
convertSegment(toolbox, segment, indexSpec, force); convertSegment(toolbox, segment, indexSpec, force, validate);
} }
return success(); return success();
} }
@ -228,13 +257,15 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
private final DataSegment segment; private final DataSegment segment;
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
private final boolean force; private final boolean force;
private final boolean validate;
@JsonCreator @JsonCreator
public SubTask( public SubTask(
@JsonProperty("groupId") String groupId, @JsonProperty("groupId") String groupId,
@JsonProperty("segment") DataSegment segment, @JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force @JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
) )
{ {
super( super(
@ -252,10 +283,18 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
this.segment = segment; this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.force = force == null ? false : force; this.force = force == null ? false : force;
this.validate = validate == null ? true : validate;
} }
@JsonProperty @JsonProperty
public boolean isForce(){ public boolean isValidate()
{
return validate;
}
@JsonProperty
public boolean isForce()
{
return force; return force;
} }
@ -275,12 +314,18 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); log.info("Subs are good! Italian BMT and Meatball are probably my favorite.");
convertSegment(toolbox, segment, indexSpec, force); convertSegment(toolbox, segment, indexSpec, force, validate);
return success(); return success();
} }
} }
private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec, boolean force) private static void convertSegment(
TaskToolbox toolbox,
final DataSegment segment,
IndexSpec indexSpec,
boolean force,
boolean validate
)
throws SegmentLoadingException, IOException throws SegmentLoadingException, IOException
{ {
log.info("Converting segment[%s]", segment); log.info("Converting segment[%s]", segment);
@ -299,11 +344,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
} }
} }
final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Arrays.asList(segment)); final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Collections.singletonList(segment));
final File location = localSegments.get(segment); final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out"); final File outLocation = new File(location, "v9_out");
if (IndexIO.convertSegment(location, outLocation, indexSpec, force)) { if (IndexIO.convertSegment(location, outLocation, indexSpec, force, validate)) {
final int outVersion = IndexIO.getVersionFromDir(outLocation); final int outVersion = IndexIO.getVersionFromDir(outLocation);
// Appending to the version makes a new version that inherits most comparability parameters of the original // Appending to the version makes a new version that inherits most comparability parameters of the original

View File

@ -28,6 +28,8 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.UUID;
/** /**
*/ */
public class NoopTask extends AbstractTask public class NoopTask extends AbstractTask
@ -66,7 +68,7 @@ public class NoopTask extends AbstractTask
) )
{ {
super( super(
id == null ? String.format("noop_%s", new DateTime()) : id, id == null ? String.format("noop_%s_%s", new DateTime(), UUID.randomUUID().toString()) : id,
"none" "none"
); );

View File

@ -39,7 +39,7 @@ public class ConvertSegmentTaskTest
DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false); ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false, true);
Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2); Assert.assertEquals(task, task2);
@ -56,7 +56,7 @@ public class ConvertSegmentTaskTest
102937 102937
); );
task = ConvertSegmentTask.create(segment, null, false); task = ConvertSegmentTask.create(segment, null, false, true);
task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2); Assert.assertEquals(task, task2);

View File

@ -164,7 +164,8 @@ public class TaskSerdeTest
final ConvertSegmentTask task = ConvertSegmentTask.create( final ConvertSegmentTask task = ConvertSegmentTask.create(
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
null, null,
false false,
true
); );
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
@ -189,7 +190,8 @@ public class TaskSerdeTest
"myGroupId", "myGroupId",
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
indexSpec, indexSpec,
false false,
true
); );
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
@ -370,8 +372,9 @@ public class TaskSerdeTest
0, 0,
12345L 12345L
), ),
indexSpec indexSpec,
, false false,
true
); );
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class); final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class);
@ -394,8 +397,9 @@ public class TaskSerdeTest
); );
final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create( final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create(
segment, segment,
new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed") new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed"),
, false false,
true
); );
final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal); final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal);
final Task task = jsonMapper.readValue(json, Task.class); final Task task = jsonMapper.readValue(json, Task.class);

View File

@ -198,14 +198,13 @@ public class IndexIO
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException
{ {
return convertSegment(toConvert, converted, indexSpec, false); return convertSegment(toConvert, converted, indexSpec, false, true);
} }
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent, boolean validate)
throws IOException throws IOException
{ {
final int version = SegmentUtils.getVersionFromDir(toConvert); final int version = SegmentUtils.getVersionFromDir(toConvert);
switch (version) { switch (version) {
case 1: case 1:
case 2: case 2:
@ -231,7 +230,9 @@ public class IndexIO
default: default:
if (forceIfCurrent) { if (forceIfCurrent) {
IndexMaker.convert(toConvert, converted, indexSpec); IndexMaker.convert(toConvert, converted, indexSpec);
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); if(validate){
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted);
}
return true; return true;
} else { } else {
log.info("Version[%s], skipping.", version); log.info("Version[%s], skipping.", version);