mirror of https://github.com/apache/druid.git
fix bug in StringDimensionHandler and add a cli tool for validating segments (#3666)
This commit is contained in:
parent
988d97b09c
commit
7c0f462fbc
|
@ -120,24 +120,20 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
|
|||
dimValName
|
||||
);
|
||||
}
|
||||
} else {
|
||||
throw new SegmentValidationException(
|
||||
"Dim [%s] value lengths not equal. Expected %d found %d",
|
||||
dimensionName,
|
||||
lhsLen,
|
||||
rhsLen
|
||||
);
|
||||
}
|
||||
} else {
|
||||
throw new SegmentValidationException(
|
||||
"Dim [%s] value lengths not equal. Expected %d found %d",
|
||||
dimensionName,
|
||||
lhsLen,
|
||||
rhsLen
|
||||
);
|
||||
}
|
||||
|
||||
for (int j = 0; j < Math.max(lhsLen, rhsLen); ++j) {
|
||||
final int dIdex1 = lhsLen <= j ? -1 : lhs[j];
|
||||
final int dIdex2 = rhsLen <= j ? -1 : rhs[j];
|
||||
|
||||
if (dIdex1 == dIdex2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final String dim1ValName = dIdex1 < 0 ? null : lhsEncodings.get(dIdex1);
|
||||
final String dim2ValName = dIdex2 < 0 ? null : rhsEncodings.get(dIdex2);
|
||||
if ((dim1ValName == null) || (dim2ValName == null)) {
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
package io.druid.segment;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.segment.data.CompressedObjectStrategy;
|
||||
import io.druid.segment.data.CompressionFactory;
|
||||
import io.druid.segment.data.ConciseBitmapSerdeFactory;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import java.util.Collections;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class StringDimensionHandlerTest
|
||||
{
|
||||
private static final Interval TEST_INTERVAL = Interval.parse("2015-01-01/2015-12-31");
|
||||
|
||||
private static final IndexSpec INDEX_SPEC = new IndexSpec(
|
||||
new ConciseBitmapSerdeFactory(),
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressionFactory.LongEncodingStrategy.LONGS
|
||||
);
|
||||
|
||||
|
||||
private final List<String> dimensions = Arrays.asList("penguins", "predators");
|
||||
|
||||
private static Pair<IncrementalIndexAdapter, IncrementalIndexAdapter> getAdapters(
|
||||
List<String> dims,
|
||||
Map<String, Object> event1,
|
||||
Map<String, Object> event2
|
||||
) throws Exception {
|
||||
IncrementalIndex incrementalIndex1 = new OnheapIncrementalIndex(
|
||||
TEST_INTERVAL.getStartMillis(),
|
||||
QueryGranularities.NONE,
|
||||
true,
|
||||
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null),
|
||||
new AggregatorFactory[]{
|
||||
new CountAggregatorFactory(
|
||||
"count"
|
||||
)
|
||||
},
|
||||
1000
|
||||
);
|
||||
|
||||
IncrementalIndex incrementalIndex2 = new OnheapIncrementalIndex(
|
||||
TEST_INTERVAL.getStartMillis(),
|
||||
QueryGranularities.NONE,
|
||||
true,
|
||||
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null),
|
||||
new AggregatorFactory[]{
|
||||
new CountAggregatorFactory(
|
||||
"count"
|
||||
)
|
||||
},
|
||||
1000
|
||||
);
|
||||
|
||||
incrementalIndex1.add(new MapBasedInputRow(TEST_INTERVAL.getStartMillis(), dims, event1));
|
||||
incrementalIndex2.add(new MapBasedInputRow(TEST_INTERVAL.getStartMillis() + 3, dims, event2));
|
||||
|
||||
IncrementalIndexAdapter adapter1 = new IncrementalIndexAdapter(
|
||||
TEST_INTERVAL,
|
||||
incrementalIndex1,
|
||||
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
|
||||
);
|
||||
IncrementalIndexAdapter adapter2 = new IncrementalIndexAdapter(
|
||||
TEST_INTERVAL,
|
||||
incrementalIndex2,
|
||||
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
|
||||
);
|
||||
|
||||
return new Pair<>(adapter1, adapter2);
|
||||
}
|
||||
|
||||
private static void validate(IncrementalIndexAdapter adapter1, IncrementalIndexAdapter adapter2) throws Exception {
|
||||
Map<String, DimensionHandler> handlers = adapter1.getDimensionHandlers();
|
||||
Indexed<String> dimNames1 = adapter1.getDimensionNames();
|
||||
Indexed<String> dimNames2 = adapter2.getDimensionNames();
|
||||
Iterator<Rowboat> iterator1 = adapter1.getRows().iterator();
|
||||
Iterator<Rowboat> iterator2 = adapter2.getRows().iterator();
|
||||
|
||||
while (iterator1.hasNext()) {
|
||||
Rowboat r1 = iterator1.next();
|
||||
Rowboat r2 = iterator2.next();
|
||||
Object[] dims1 = r1.getDims();
|
||||
Object[] dims2 = r2.getDims();
|
||||
for (int i = 0; i < dims1.length; i++) {
|
||||
Object val1 = dims1[i];
|
||||
Object val2 = dims2[i];
|
||||
String name1 = dimNames1.get(i);
|
||||
String name2 = dimNames2.get(i);
|
||||
DimensionHandler handler = handlers.get(name1);
|
||||
handler.validateSortedEncodedArrays(
|
||||
val1,
|
||||
val2,
|
||||
adapter1.getDimValueLookup(name1),
|
||||
adapter2.getDimValueLookup(name2)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateSortedEncodedArrays() throws Exception {
|
||||
Map<String, Object> event1 = ImmutableMap.<String, Object>of(
|
||||
"penguins", Arrays.asList("adelie", "emperor"),
|
||||
"predators", Arrays.asList("seal")
|
||||
);
|
||||
Map<String, Object> event2 = ImmutableMap.<String, Object>of(
|
||||
"penguins", Arrays.asList("adelie", "emperor"),
|
||||
"predators", Arrays.asList("seal")
|
||||
);
|
||||
|
||||
Pair<IncrementalIndexAdapter, IncrementalIndexAdapter> adapters = getAdapters(dimensions, event1, event2);
|
||||
IncrementalIndexAdapter adapter1 = adapters.lhs;
|
||||
IncrementalIndexAdapter adapter2 = adapters.rhs;
|
||||
|
||||
validate(adapter1, adapter2);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testValidateSortedDifferentEncodedArrays() throws Exception {
|
||||
Map<String, Object> event1 = ImmutableMap.<String, Object>of(
|
||||
"penguins", Arrays.asList("adelie", "emperor"),
|
||||
"predators", Collections.singletonList("seal")
|
||||
);
|
||||
Map<String, Object> event2 = ImmutableMap.<String, Object>of(
|
||||
"penguins", Arrays.asList("chinstrap", "gentoo"),
|
||||
"predators", Collections.singletonList("seal")
|
||||
);
|
||||
|
||||
Pair<IncrementalIndexAdapter, IncrementalIndexAdapter> adapters = getAdapters(dimensions, event1, event2);
|
||||
IncrementalIndexAdapter adapter1 = adapters.lhs;
|
||||
IncrementalIndexAdapter adapter2 = adapters.rhs;
|
||||
|
||||
exception.expect(SegmentValidationException.class);
|
||||
validate(adapter1, adapter2);
|
||||
}
|
||||
}
|
|
@ -74,7 +74,8 @@ public class Main
|
|||
CreateTables.class,
|
||||
InsertSegment.class,
|
||||
DumpSegment.class,
|
||||
ResetCluster.class
|
||||
ResetCluster.class,
|
||||
ValidateSegments.class
|
||||
);
|
||||
|
||||
builder.withGroup("index")
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.cli;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.name.Names;
|
||||
import io.airlift.airline.Arguments;
|
||||
import io.airlift.airline.Command;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.DruidProcessingConfig;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
|
||||
@Command(
|
||||
name = "validate-segments",
|
||||
description = "Compare two segments for validation"
|
||||
)
|
||||
public class ValidateSegments extends GuiceRunnable
|
||||
{
|
||||
private static final Logger log = new Logger(ValidateSegments.class);
|
||||
|
||||
public ValidateSegments() { super(log); }
|
||||
|
||||
@Arguments(
|
||||
description = "Two directories where each directory contains segment files to validate.",
|
||||
required = true)
|
||||
public List<String> directories;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (directories.size() != 2) {
|
||||
throw new IAE("Please provide two segment directories to compare");
|
||||
}
|
||||
final Injector injector = makeInjector();
|
||||
final IndexIO indexIO = injector.getInstance(IndexIO.class);
|
||||
try {
|
||||
String dir1 = directories.get(0);
|
||||
String dir2 = directories.get(1);
|
||||
indexIO.validateTwoSegments(new File(dir1), new File(dir2));
|
||||
log.info("Segments [%s] and [%s] are identical", dir1, dir2);
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<? extends Module> getModules()
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
|
||||
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
|
||||
binder.bind(DruidProcessingConfig.class).toInstance(
|
||||
new DruidProcessingConfig()
|
||||
{
|
||||
@Override
|
||||
public String getFormatString()
|
||||
{
|
||||
return "processing-%s";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int intermediateComputeSizeBytes()
|
||||
{
|
||||
return 100 * 1024 * 1024;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumThreads()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int columnCacheSizeBytes()
|
||||
{
|
||||
return 25 * 1024 * 1024;
|
||||
}
|
||||
}
|
||||
);
|
||||
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue