diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java similarity index 66% rename from indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java rename to indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index 401c512d5d4..b82ff95cf4b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -44,30 +44,50 @@ import java.util.List; import java.util.Map; /** + * This task takes a segment and attempts to reindex it in the latest version with the specified indexSpec. + * + * Only datasource must be specified. `indexSpec` and `force` are highly suggested but optional. The rest get + * auto-configured and should only be modified with great care */ -public class VersionConverterTask extends AbstractFixedIntervalTask +public class ConvertSegmentTask extends AbstractFixedIntervalTask { - private static final String TYPE = "version_converter"; + private static final String TYPE = "convert_segment"; private static final Integer CURR_VERSION_INTEGER = IndexIO.CURRENT_VERSION_ID; - private static final Logger log = new Logger(VersionConverterTask.class); + private static final Logger log = new Logger(ConvertSegmentTask.class); @JsonIgnore private final DataSegment segment; private final IndexSpec indexSpec; + private final boolean force; - public static VersionConverterTask create(String dataSource, Interval interval) + /** + * Create a segment converter task to convert a segment to the most recent version including the specified indexSpec + * @param dataSource The datasource to which this update should be applied + * @param interval The interval in the datasource which to apply the update to + * @param indexSpec The IndexSpec to use in the updated segments + * @param force Force an update, even if the task thinks it doesn't need to update. + * @return A SegmentConverterTask for the datasource's interval with the indexSpec specified. + */ + public static ConvertSegmentTask create(String dataSource, Interval interval, IndexSpec indexSpec, boolean force) { final String id = makeId(dataSource, interval); - return new VersionConverterTask(id, id, dataSource, interval, null, null); + return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force); } - public static VersionConverterTask create(DataSegment segment) + /** + * Create a task to update the segment specified to the most recent binary version with the specified indexSpec + * @param segment The segment to which this update should be applied + * @param indexSpec The IndexSpec to use in the updated segments + * @param force Force an update, even if the task thinks it doesn't need to update. + * @return A SegmentConverterTask for the segment with the indexSpec specified. + */ + public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force) { final Interval interval = segment.getInterval(); final String dataSource = segment.getDataSource(); final String id = makeId(dataSource, interval); - return new VersionConverterTask(id, id, dataSource, interval, segment, null); + return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force); } private static String makeId(String dataSource, Interval interval) @@ -78,37 +98,51 @@ public class VersionConverterTask extends AbstractFixedIntervalTask } @JsonCreator - private static VersionConverterTask createFromJson( + private static ConvertSegmentTask createFromJson( @JsonProperty("id") String id, @JsonProperty("groupId") String groupId, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("segment") DataSegment segment, - @JsonProperty("indexSpec") IndexSpec indexSpec + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("force") Boolean force ) { + final boolean isForce = force == null ? false : force; if (id == null) { if (segment == null) { - return create(dataSource, interval); + return create(dataSource, interval, indexSpec, isForce); } else { - return create(segment); + return create(segment, indexSpec, isForce); } } - return new VersionConverterTask(id, groupId, dataSource, interval, segment, indexSpec); + return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce); } - private VersionConverterTask( + private ConvertSegmentTask( String id, String groupId, String dataSource, Interval interval, DataSegment segment, - IndexSpec indexSpec + IndexSpec indexSpec, + boolean force ) { super(id, groupId, dataSource, interval); this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.force = force; + } + + @JsonProperty + public boolean isForce(){ + return force; + } + + @JsonProperty + public IndexSpec getIndexSpec(){ + return indexSpec; } @Override @@ -143,11 +177,14 @@ public class VersionConverterTask extends AbstractFixedIntervalTask { final Integer segmentVersion = segment.getBinaryVersion(); if (!CURR_VERSION_INTEGER.equals(segmentVersion)) { - return new SubTask(getGroupId(), segment, indexSpec); + return new SubTask(getGroupId(), segment, indexSpec, force); + } else if(force){ + log.info("Segment[%s] already at version[%s], forcing conversion", segment.getIdentifier(), segmentVersion); + return new SubTask(getGroupId(), segment, indexSpec, force); + } else { + log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); + return null; } - - log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); - return null; } } ); @@ -161,7 +198,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask } } else { log.info("I'm in a subless mood."); - convertSegment(toolbox, segment, indexSpec); + convertSegment(toolbox, segment, indexSpec, force); } return success(); } @@ -176,7 +213,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask return false; } - VersionConverterTask that = (VersionConverterTask) o; + ConvertSegmentTask that = (ConvertSegmentTask) o; if (segment != null ? !segment.equals(that.segment) : that.segment != null) { return false; @@ -190,12 +227,14 @@ public class VersionConverterTask extends AbstractFixedIntervalTask @JsonIgnore private final DataSegment segment; private final IndexSpec indexSpec; + private final boolean force; @JsonCreator public SubTask( @JsonProperty("groupId") String groupId, @JsonProperty("segment") DataSegment segment, - @JsonProperty("indexSpec") IndexSpec indexSpec + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("force") Boolean force ) { super( @@ -212,6 +251,12 @@ public class VersionConverterTask extends AbstractFixedIntervalTask ); this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.force = force == null ? false : force; + } + + @JsonProperty + public boolean isForce(){ + return force; } @JsonProperty @@ -230,12 +275,12 @@ public class VersionConverterTask extends AbstractFixedIntervalTask public TaskStatus run(TaskToolbox toolbox) throws Exception { log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); - convertSegment(toolbox, segment, indexSpec); + convertSegment(toolbox, segment, indexSpec, force); return success(); } } - private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec) + private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec, boolean force) throws SegmentLoadingException, IOException { log.info("Converting segment[%s]", segment); @@ -248,7 +293,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask final String version = currentSegment.getVersion(); final Integer binaryVersion = currentSegment.getBinaryVersion(); - if (version.startsWith(segment.getVersion()) && CURR_VERSION_INTEGER.equals(binaryVersion)) { + if (!force && (version.startsWith(segment.getVersion()) && CURR_VERSION_INTEGER.equals(binaryVersion))) { log.info("Skipping already updated segment[%s].", segment); return; } @@ -258,7 +303,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask final File location = localSegments.get(segment); final File outLocation = new File(location, "v9_out"); - if (IndexIO.convertSegment(location, outLocation, indexSpec)) { + if (IndexIO.convertSegment(location, outLocation, indexSpec, force)) { final int outVersion = IndexIO.getVersionFromDir(outLocation); // Appending to the version makes a new version that inherits most comparability parameters of the original diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 817befb4ded..c8151e5b679 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -49,8 +49,10 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), @JsonSubTypes.Type(name = "noop", value = NoopTask.class), - @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), - @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class) + @JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentTask.class), // Backwards compat - Deprecated + @JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentTask.SubTask.class), // backwards compat - Deprecated + @JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class), + @JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class) }) public interface Task { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/VersionConverterTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java similarity index 89% rename from indexing-service/src/test/java/io/druid/indexing/common/task/VersionConverterTaskTest.java rename to indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java index 38102ee9182..ab90b9d9e55 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/VersionConverterTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/ConvertSegmentTaskTest.java @@ -22,14 +22,14 @@ import com.google.common.collect.ImmutableMap; import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import junit.framework.Assert; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.junit.Assert; import org.junit.Test; /** */ -public class VersionConverterTaskTest +public class ConvertSegmentTaskTest { @Test public void testSerializationSimple() throws Exception @@ -39,7 +39,7 @@ public class VersionConverterTaskTest DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); - VersionConverterTask task = VersionConverterTask.create(dataSource, interval); + ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false); Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Assert.assertEquals(task, task2); @@ -56,7 +56,7 @@ public class VersionConverterTaskTest 102937 ); - task = VersionConverterTask.create(segment); + task = ConvertSegmentTask.create(segment, null, false); task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); Assert.assertEquals(task, task2); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 306e9d37e41..fa66769af97 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.segment.IndexSpec; +import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -43,12 +44,13 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import junit.framework.Assert; import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.Assert; import org.junit.Test; import java.io.File; +import java.io.IOException; public class TaskSerdeTest { @@ -159,14 +161,16 @@ public class TaskSerdeTest @Test public void testVersionConverterTaskSerde() throws Exception { - final VersionConverterTask task = VersionConverterTask.create( - DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + final ConvertSegmentTask task = ConvertSegmentTask.create( + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), + null, + false ); final String json = jsonMapper.writeValueAsString(task); Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change - final VersionConverterTask task2 = (VersionConverterTask) jsonMapper.readValue(json, Task.class); + final ConvertSegmentTask task2 = (ConvertSegmentTask) jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval()); @@ -181,16 +185,17 @@ public class TaskSerdeTest @Test public void testVersionConverterSubTaskSerde() throws Exception { - final VersionConverterTask.SubTask task = new VersionConverterTask.SubTask( + final ConvertSegmentTask.SubTask task = new ConvertSegmentTask.SubTask( "myGroupId", DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), - indexSpec + indexSpec, + false ); final String json = jsonMapper.writeValueAsString(task); Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change - final VersionConverterTask.SubTask task2 = (VersionConverterTask.SubTask) jsonMapper.readValue(json, Task.class); + final ConvertSegmentTask.SubTask task2 = (ConvertSegmentTask.SubTask) jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals("myGroupId", task.getGroupId()); @@ -215,7 +220,8 @@ public class TaskSerdeTest new AggregatorFactory[0], new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) ), - new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() + new RealtimeIOConfig( + new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() { @Override public Plumber findPlumber( @@ -224,7 +230,8 @@ public class TaskSerdeTest { return null; } - }), + } + ), new RealtimeTuningConfig( 1, new Period("PT10M"), @@ -348,6 +355,73 @@ public class TaskSerdeTest Assert.assertEquals(task.getInterval(), task2.getInterval()); } + @Test + public void testSegmentConvetSerdeReflection() throws IOException + { + final ConvertSegmentTask task = ConvertSegmentTask.create( + new DataSegment( + "dataSource", + Interval.parse("1990-01-01/1999-12-31"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("metric1", "metric2"), + new NoneShardSpec(), + 0, + 12345L + ), + indexSpec + , false + ); + final String json = jsonMapper.writeValueAsString(task); + final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class); + Assert.assertEquals(json, jsonMapper.writeValueAsString(taskFromJson)); + } + + @Test + public void testSegmentConvertSerde() throws IOException + { + final DataSegment segment = new DataSegment( + "dataSource", + Interval.parse("1990-01-01/1999-12-31"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("metric1", "metric2"), + new NoneShardSpec(), + 0, + 12345L + ); + final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create( + segment, + new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed") + , false + ); + final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal); + final Task task = jsonMapper.readValue(json, Task.class); + Assert.assertTrue(task instanceof ConvertSegmentTask); + final ConvertSegmentTask convertSegmentTask = (ConvertSegmentTask) task; + Assert.assertEquals(convertSegmentTaskOriginal.getDataSource(), convertSegmentTask.getDataSource()); + Assert.assertEquals(convertSegmentTaskOriginal.getInterval(), convertSegmentTask.getInterval()); + Assert.assertEquals( + convertSegmentTaskOriginal.getIndexSpec().getBitmapSerdeFactory().getClass().getCanonicalName(), + convertSegmentTask.getIndexSpec() + .getBitmapSerdeFactory() + .getClass() + .getCanonicalName() + ); + Assert.assertEquals( + convertSegmentTaskOriginal.getIndexSpec().getDimensionCompression(), + convertSegmentTask.getIndexSpec().getDimensionCompression() + ); + Assert.assertEquals( + convertSegmentTaskOriginal.getIndexSpec().getMetricCompression(), + convertSegmentTask.getIndexSpec().getMetricCompression() + ); + Assert.assertEquals(false, convertSegmentTask.isForce()); + Assert.assertEquals(segment, convertSegmentTask.getSegment()); + } + @Test public void testMoveTaskSerde() throws Exception { diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 8c3841ddc70..0f0028fa6d9 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -71,6 +71,7 @@ import io.druid.segment.data.IndexedMultivalue; import io.druid.segment.data.IndexedRTree; import io.druid.segment.data.VSizeIndexed; import io.druid.segment.data.VSizeIndexedInts; +import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.serde.BitmapIndexColumnPartSupplier; import io.druid.segment.serde.ComplexColumnPartSerde; import io.druid.segment.serde.ComplexColumnPartSupplier; @@ -81,6 +82,7 @@ import io.druid.segment.serde.FloatGenericColumnSupplier; import io.druid.segment.serde.LongGenericColumnPartSerde; import io.druid.segment.serde.LongGenericColumnSupplier; import io.druid.segment.serde.SpatialIndexColumnPartSupplier; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.ByteArrayOutputStream; @@ -92,6 +94,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.AbstractList; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -195,6 +199,11 @@ public class IndexIO } public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException + { + return convertSegment(toConvert, converted, indexSpec, false); + } + + public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) throws IOException { final int version = SegmentUtils.getVersionFromDir(toConvert); @@ -221,8 +230,19 @@ public class IndexIO DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec); return true; default: - log.info("Version[%s], skipping.", version); - return false; + if(forceIfCurrent){ + final QueryableIndexIndexableAdapter indexIndexableAdapter = new QueryableIndexIndexableAdapter(loadIndex(toConvert)); + IndexMaker.append( + Collections.singletonList(indexIndexableAdapter), + converted, + indexSpec + ); + DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); + return true; + } else { + log.info("Version[%s], skipping.", version); + return false; + } } } @@ -337,6 +357,35 @@ public class IndexIO return retVal; } + public static void validateTwoSegments(File dir1, File dir2) throws IOException + { + final QueryableIndexIndexableAdapter adapter1 = new QueryableIndexIndexableAdapter(loadIndex(dir1)); + final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(loadIndex(dir2)); + if(adapter1.getNumRows() != adapter2.getNumRows()){ + throw new IOException("Validation failure - Row count mismatch"); + } + final Iterator it1 = adapter1.getRows().iterator(); + final Iterator it2 = adapter2.getRows().iterator(); + long row = 0L; + while(it1.hasNext()){ + if(it1.hasNext() ^ it2.hasNext()){ + throw new IOException("Validation failure - Iterator doesn't have enough"); + } + final Rowboat rb1 = it1.next(); + final Rowboat rb2 = it2.next(); + ++row; + if(rb1.compareTo(rb2) != 0){ + throw new IOException(String.format("Validation failure on row %d: [%s] vs [%s]", row, rb1, rb2)); + } + } + if(it2.hasNext()){ + throw new IOException("Validation failure - Iterator still has more"); + } + if(row != adapter1.getNumRows()){ + throw new IOException("Validation failure - Actual Row count mismatch"); + } + } + public static void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) throws IOException { log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);