mirror of https://github.com/apache/druid.git
Update VersionConverterTask for IndexSepc and allowing Forced updates
This commit is contained in:
parent
8b68b8f3c2
commit
7479ac9012
|
@ -44,30 +44,50 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This task takes a segment and attempts to reindex it in the latest version with the specified indexSpec.
|
||||
*
|
||||
* Only datasource must be specified. `indexSpec` and `force` are highly suggested but optional. The rest get
|
||||
* auto-configured and should only be modified with great care
|
||||
*/
|
||||
public class VersionConverterTask extends AbstractFixedIntervalTask
|
||||
public class ConvertSegmentTask extends AbstractFixedIntervalTask
|
||||
{
|
||||
private static final String TYPE = "version_converter";
|
||||
private static final String TYPE = "convert_segment";
|
||||
private static final Integer CURR_VERSION_INTEGER = IndexIO.CURRENT_VERSION_ID;
|
||||
|
||||
private static final Logger log = new Logger(VersionConverterTask.class);
|
||||
private static final Logger log = new Logger(ConvertSegmentTask.class);
|
||||
|
||||
@JsonIgnore
|
||||
private final DataSegment segment;
|
||||
private final IndexSpec indexSpec;
|
||||
private final boolean force;
|
||||
|
||||
public static VersionConverterTask create(String dataSource, Interval interval)
|
||||
/**
|
||||
* Create a segment converter task to convert a segment to the most recent version including the specified indexSpec
|
||||
* @param dataSource The datasource to which this update should be applied
|
||||
* @param interval The interval in the datasource which to apply the update to
|
||||
* @param indexSpec The IndexSpec to use in the updated segments
|
||||
* @param force Force an update, even if the task thinks it doesn't need to update.
|
||||
* @return A SegmentConverterTask for the datasource's interval with the indexSpec specified.
|
||||
*/
|
||||
public static ConvertSegmentTask create(String dataSource, Interval interval, IndexSpec indexSpec, boolean force)
|
||||
{
|
||||
final String id = makeId(dataSource, interval);
|
||||
return new VersionConverterTask(id, id, dataSource, interval, null, null);
|
||||
return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force);
|
||||
}
|
||||
|
||||
public static VersionConverterTask create(DataSegment segment)
|
||||
/**
|
||||
* Create a task to update the segment specified to the most recent binary version with the specified indexSpec
|
||||
* @param segment The segment to which this update should be applied
|
||||
* @param indexSpec The IndexSpec to use in the updated segments
|
||||
* @param force Force an update, even if the task thinks it doesn't need to update.
|
||||
* @return A SegmentConverterTask for the segment with the indexSpec specified.
|
||||
*/
|
||||
public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force)
|
||||
{
|
||||
final Interval interval = segment.getInterval();
|
||||
final String dataSource = segment.getDataSource();
|
||||
final String id = makeId(dataSource, interval);
|
||||
return new VersionConverterTask(id, id, dataSource, interval, segment, null);
|
||||
return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force);
|
||||
}
|
||||
|
||||
private static String makeId(String dataSource, Interval interval)
|
||||
|
@ -78,37 +98,51 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
}
|
||||
|
||||
@JsonCreator
|
||||
private static VersionConverterTask createFromJson(
|
||||
private static ConvertSegmentTask createFromJson(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("groupId") String groupId,
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("segment") DataSegment segment,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("force") Boolean force
|
||||
)
|
||||
{
|
||||
final boolean isForce = force == null ? false : force;
|
||||
if (id == null) {
|
||||
if (segment == null) {
|
||||
return create(dataSource, interval);
|
||||
return create(dataSource, interval, indexSpec, isForce);
|
||||
} else {
|
||||
return create(segment);
|
||||
return create(segment, indexSpec, isForce);
|
||||
}
|
||||
}
|
||||
return new VersionConverterTask(id, groupId, dataSource, interval, segment, indexSpec);
|
||||
return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce);
|
||||
}
|
||||
|
||||
private VersionConverterTask(
|
||||
private ConvertSegmentTask(
|
||||
String id,
|
||||
String groupId,
|
||||
String dataSource,
|
||||
Interval interval,
|
||||
DataSegment segment,
|
||||
IndexSpec indexSpec
|
||||
IndexSpec indexSpec,
|
||||
boolean force
|
||||
)
|
||||
{
|
||||
super(id, groupId, dataSource, interval);
|
||||
this.segment = segment;
|
||||
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
|
||||
this.force = force;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isForce(){
|
||||
return force;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public IndexSpec getIndexSpec(){
|
||||
return indexSpec;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,11 +177,14 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
{
|
||||
final Integer segmentVersion = segment.getBinaryVersion();
|
||||
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
|
||||
return new SubTask(getGroupId(), segment, indexSpec);
|
||||
return new SubTask(getGroupId(), segment, indexSpec, force);
|
||||
} else if(force){
|
||||
log.info("Segment[%s] already at version[%s], forcing conversion", segment.getIdentifier(), segmentVersion);
|
||||
return new SubTask(getGroupId(), segment, indexSpec, force);
|
||||
} else {
|
||||
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
|
||||
return null;
|
||||
}
|
||||
|
||||
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -161,7 +198,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
}
|
||||
} else {
|
||||
log.info("I'm in a subless mood.");
|
||||
convertSegment(toolbox, segment, indexSpec);
|
||||
convertSegment(toolbox, segment, indexSpec, force);
|
||||
}
|
||||
return success();
|
||||
}
|
||||
|
@ -176,7 +213,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
return false;
|
||||
}
|
||||
|
||||
VersionConverterTask that = (VersionConverterTask) o;
|
||||
ConvertSegmentTask that = (ConvertSegmentTask) o;
|
||||
|
||||
if (segment != null ? !segment.equals(that.segment) : that.segment != null) {
|
||||
return false;
|
||||
|
@ -190,12 +227,14 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
@JsonIgnore
|
||||
private final DataSegment segment;
|
||||
private final IndexSpec indexSpec;
|
||||
private final boolean force;
|
||||
|
||||
@JsonCreator
|
||||
public SubTask(
|
||||
@JsonProperty("groupId") String groupId,
|
||||
@JsonProperty("segment") DataSegment segment,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("force") Boolean force
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -212,6 +251,12 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
);
|
||||
this.segment = segment;
|
||||
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
|
||||
this.force = force == null ? false : force;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isForce(){
|
||||
return force;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -230,12 +275,12 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
log.info("Subs are good! Italian BMT and Meatball are probably my favorite.");
|
||||
convertSegment(toolbox, segment, indexSpec);
|
||||
convertSegment(toolbox, segment, indexSpec, force);
|
||||
return success();
|
||||
}
|
||||
}
|
||||
|
||||
private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec)
|
||||
private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec, boolean force)
|
||||
throws SegmentLoadingException, IOException
|
||||
{
|
||||
log.info("Converting segment[%s]", segment);
|
||||
|
@ -248,7 +293,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
final String version = currentSegment.getVersion();
|
||||
final Integer binaryVersion = currentSegment.getBinaryVersion();
|
||||
|
||||
if (version.startsWith(segment.getVersion()) && CURR_VERSION_INTEGER.equals(binaryVersion)) {
|
||||
if (!force && (version.startsWith(segment.getVersion()) && CURR_VERSION_INTEGER.equals(binaryVersion))) {
|
||||
log.info("Skipping already updated segment[%s].", segment);
|
||||
return;
|
||||
}
|
||||
|
@ -258,7 +303,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
|
||||
final File location = localSegments.get(segment);
|
||||
final File outLocation = new File(location, "v9_out");
|
||||
if (IndexIO.convertSegment(location, outLocation, indexSpec)) {
|
||||
if (IndexIO.convertSegment(location, outLocation, indexSpec, force)) {
|
||||
final int outVersion = IndexIO.getVersionFromDir(outLocation);
|
||||
|
||||
// Appending to the version makes a new version that inherits most comparability parameters of the original
|
|
@ -49,8 +49,10 @@ import io.druid.query.QueryRunner;
|
|||
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
|
||||
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
|
||||
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
|
||||
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
|
||||
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class)
|
||||
@JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentTask.class), // Backwards compat - Deprecated
|
||||
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentTask.SubTask.class), // backwards compat - Deprecated
|
||||
@JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
|
||||
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class)
|
||||
})
|
||||
public interface Task
|
||||
{
|
||||
|
|
|
@ -22,14 +22,14 @@ import com.google.common.collect.ImmutableMap;
|
|||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import junit.framework.Assert;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class VersionConverterTaskTest
|
||||
public class ConvertSegmentTaskTest
|
||||
{
|
||||
@Test
|
||||
public void testSerializationSimple() throws Exception
|
||||
|
@ -39,7 +39,7 @@ public class VersionConverterTaskTest
|
|||
|
||||
DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
VersionConverterTask task = VersionConverterTask.create(dataSource, interval);
|
||||
ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false);
|
||||
|
||||
Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
|
||||
Assert.assertEquals(task, task2);
|
||||
|
@ -56,7 +56,7 @@ public class VersionConverterTaskTest
|
|||
102937
|
||||
);
|
||||
|
||||
task = VersionConverterTask.create(segment);
|
||||
task = ConvertSegmentTask.create(segment, null, false);
|
||||
|
||||
task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
|
||||
Assert.assertEquals(task, task2);
|
|
@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.segment.IndexSpec;
|
||||
import io.druid.segment.data.RoaringBitmapSerdeFactory;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
|
@ -43,12 +44,13 @@ import io.druid.segment.realtime.plumber.Plumber;
|
|||
import io.druid.segment.realtime.plumber.PlumberSchool;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import junit.framework.Assert;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
public class TaskSerdeTest
|
||||
{
|
||||
|
@ -159,14 +161,16 @@ public class TaskSerdeTest
|
|||
@Test
|
||||
public void testVersionConverterTaskSerde() throws Exception
|
||||
{
|
||||
final VersionConverterTask task = VersionConverterTask.create(
|
||||
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||
final ConvertSegmentTask task = ConvertSegmentTask.create(
|
||||
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
final VersionConverterTask task2 = (VersionConverterTask) jsonMapper.readValue(json, Task.class);
|
||||
final ConvertSegmentTask task2 = (ConvertSegmentTask) jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
|
||||
|
@ -181,16 +185,17 @@ public class TaskSerdeTest
|
|||
@Test
|
||||
public void testVersionConverterSubTaskSerde() throws Exception
|
||||
{
|
||||
final VersionConverterTask.SubTask task = new VersionConverterTask.SubTask(
|
||||
final ConvertSegmentTask.SubTask task = new ConvertSegmentTask.SubTask(
|
||||
"myGroupId",
|
||||
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
|
||||
indexSpec
|
||||
indexSpec,
|
||||
false
|
||||
);
|
||||
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
final VersionConverterTask.SubTask task2 = (VersionConverterTask.SubTask) jsonMapper.readValue(json, Task.class);
|
||||
final ConvertSegmentTask.SubTask task2 = (ConvertSegmentTask.SubTask) jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals("myGroupId", task.getGroupId());
|
||||
|
@ -215,7 +220,8 @@ public class TaskSerdeTest
|
|||
new AggregatorFactory[0],
|
||||
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
|
||||
),
|
||||
new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
|
||||
new RealtimeIOConfig(
|
||||
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
|
||||
{
|
||||
@Override
|
||||
public Plumber findPlumber(
|
||||
|
@ -224,7 +230,8 @@ public class TaskSerdeTest
|
|||
{
|
||||
return null;
|
||||
}
|
||||
}),
|
||||
}
|
||||
),
|
||||
new RealtimeTuningConfig(
|
||||
1,
|
||||
new Period("PT10M"),
|
||||
|
@ -348,6 +355,73 @@ public class TaskSerdeTest
|
|||
Assert.assertEquals(task.getInterval(), task2.getInterval());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSegmentConvetSerdeReflection() throws IOException
|
||||
{
|
||||
final ConvertSegmentTask task = ConvertSegmentTask.create(
|
||||
new DataSegment(
|
||||
"dataSource",
|
||||
Interval.parse("1990-01-01/1999-12-31"),
|
||||
"version",
|
||||
ImmutableMap.<String, Object>of(),
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("metric1", "metric2"),
|
||||
new NoneShardSpec(),
|
||||
0,
|
||||
12345L
|
||||
),
|
||||
indexSpec
|
||||
, false
|
||||
);
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class);
|
||||
Assert.assertEquals(json, jsonMapper.writeValueAsString(taskFromJson));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSegmentConvertSerde() throws IOException
|
||||
{
|
||||
final DataSegment segment = new DataSegment(
|
||||
"dataSource",
|
||||
Interval.parse("1990-01-01/1999-12-31"),
|
||||
"version",
|
||||
ImmutableMap.<String, Object>of(),
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("metric1", "metric2"),
|
||||
new NoneShardSpec(),
|
||||
0,
|
||||
12345L
|
||||
);
|
||||
final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create(
|
||||
segment,
|
||||
new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed")
|
||||
, false
|
||||
);
|
||||
final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal);
|
||||
final Task task = jsonMapper.readValue(json, Task.class);
|
||||
Assert.assertTrue(task instanceof ConvertSegmentTask);
|
||||
final ConvertSegmentTask convertSegmentTask = (ConvertSegmentTask) task;
|
||||
Assert.assertEquals(convertSegmentTaskOriginal.getDataSource(), convertSegmentTask.getDataSource());
|
||||
Assert.assertEquals(convertSegmentTaskOriginal.getInterval(), convertSegmentTask.getInterval());
|
||||
Assert.assertEquals(
|
||||
convertSegmentTaskOriginal.getIndexSpec().getBitmapSerdeFactory().getClass().getCanonicalName(),
|
||||
convertSegmentTask.getIndexSpec()
|
||||
.getBitmapSerdeFactory()
|
||||
.getClass()
|
||||
.getCanonicalName()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
convertSegmentTaskOriginal.getIndexSpec().getDimensionCompression(),
|
||||
convertSegmentTask.getIndexSpec().getDimensionCompression()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
convertSegmentTaskOriginal.getIndexSpec().getMetricCompression(),
|
||||
convertSegmentTask.getIndexSpec().getMetricCompression()
|
||||
);
|
||||
Assert.assertEquals(false, convertSegmentTask.isForce());
|
||||
Assert.assertEquals(segment, convertSegmentTask.getSegment());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveTaskSerde() throws Exception
|
||||
{
|
||||
|
|
|
@ -71,6 +71,7 @@ import io.druid.segment.data.IndexedMultivalue;
|
|||
import io.druid.segment.data.IndexedRTree;
|
||||
import io.druid.segment.data.VSizeIndexed;
|
||||
import io.druid.segment.data.VSizeIndexedInts;
|
||||
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
||||
import io.druid.segment.serde.BitmapIndexColumnPartSupplier;
|
||||
import io.druid.segment.serde.ComplexColumnPartSerde;
|
||||
import io.druid.segment.serde.ComplexColumnPartSupplier;
|
||||
|
@ -81,6 +82,7 @@ import io.druid.segment.serde.FloatGenericColumnSupplier;
|
|||
import io.druid.segment.serde.LongGenericColumnPartSerde;
|
||||
import io.druid.segment.serde.LongGenericColumnSupplier;
|
||||
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -92,6 +94,8 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.ByteOrder;
|
||||
import java.util.AbstractList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -195,6 +199,11 @@ public class IndexIO
|
|||
}
|
||||
|
||||
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException
|
||||
{
|
||||
return convertSegment(toConvert, converted, indexSpec, false);
|
||||
}
|
||||
|
||||
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) throws IOException
|
||||
{
|
||||
final int version = SegmentUtils.getVersionFromDir(toConvert);
|
||||
|
||||
|
@ -221,8 +230,19 @@ public class IndexIO
|
|||
DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
|
||||
return true;
|
||||
default:
|
||||
log.info("Version[%s], skipping.", version);
|
||||
return false;
|
||||
if(forceIfCurrent){
|
||||
final QueryableIndexIndexableAdapter indexIndexableAdapter = new QueryableIndexIndexableAdapter(loadIndex(toConvert));
|
||||
IndexMaker.append(
|
||||
Collections.<IndexableAdapter>singletonList(indexIndexableAdapter),
|
||||
converted,
|
||||
indexSpec
|
||||
);
|
||||
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted);
|
||||
return true;
|
||||
} else {
|
||||
log.info("Version[%s], skipping.", version);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -337,6 +357,35 @@ public class IndexIO
|
|||
return retVal;
|
||||
}
|
||||
|
||||
public static void validateTwoSegments(File dir1, File dir2) throws IOException
|
||||
{
|
||||
final QueryableIndexIndexableAdapter adapter1 = new QueryableIndexIndexableAdapter(loadIndex(dir1));
|
||||
final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(loadIndex(dir2));
|
||||
if(adapter1.getNumRows() != adapter2.getNumRows()){
|
||||
throw new IOException("Validation failure - Row count mismatch");
|
||||
}
|
||||
final Iterator<Rowboat> it1 = adapter1.getRows().iterator();
|
||||
final Iterator<Rowboat> it2 = adapter2.getRows().iterator();
|
||||
long row = 0L;
|
||||
while(it1.hasNext()){
|
||||
if(it1.hasNext() ^ it2.hasNext()){
|
||||
throw new IOException("Validation failure - Iterator doesn't have enough");
|
||||
}
|
||||
final Rowboat rb1 = it1.next();
|
||||
final Rowboat rb2 = it2.next();
|
||||
++row;
|
||||
if(rb1.compareTo(rb2) != 0){
|
||||
throw new IOException(String.format("Validation failure on row %d: [%s] vs [%s]", row, rb1, rb2));
|
||||
}
|
||||
}
|
||||
if(it2.hasNext()){
|
||||
throw new IOException("Validation failure - Iterator still has more");
|
||||
}
|
||||
if(row != adapter1.getNumRows()){
|
||||
throw new IOException("Validation failure - Actual Row count mismatch");
|
||||
}
|
||||
}
|
||||
|
||||
public static void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) throws IOException
|
||||
{
|
||||
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
|
||||
|
|
Loading…
Reference in New Issue