Respect buildV9Directly in PlumberSchools, so it works on standalone realtime nodes.

Also parameterize some tests to run with/without buildV9Directly:

- IndexGeneratorJobTest
- RealtimeIndexTaskTest
- RealtimePlumberSchoolTest
This commit is contained in:
Gian Merlino 2016-01-19 12:15:06 -08:00
parent a2e327ed08
commit 1dcf22edb7
10 changed files with 116 additions and 48 deletions

View File

@ -78,10 +78,10 @@ public class IndexGeneratorJobTest
{
@Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " +
"inputFormatName={4}")
"inputFormatName={4}, buildV9Directly={5}")
public static Collection<Object[]> constructFeed()
{
return Arrays.asList(
final List<Object[]> baseConstructors = Arrays.asList(
new Object[][]{
{
false,
@ -273,22 +273,39 @@ public class IndexGeneratorJobTest
}
}
);
// Run each baseConstructor with/without buildV9Directly.
final List<Object[]> constructors = Lists.newArrayList();
for (Object[] baseConstructor : baseConstructors) {
final Object[] c1 = new Object[baseConstructor.length + 1];
final Object[] c2 = new Object[baseConstructor.length + 1];
System.arraycopy(baseConstructor, 0, c1, 0, baseConstructor.length);
System.arraycopy(baseConstructor, 0, c2, 0, baseConstructor.length);
c1[c1.length - 1] = true;
c2[c2.length - 1] = false;
constructors.add(c1);
constructors.add(c2);
}
return constructors;
}
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private final boolean useCombiner;
private final String partitionType;
private final Interval interval;
private final Object[][][] shardInfoForEachSegment;
private final List<String> data;
private final String inputFormatName;
private final InputRowParser inputRowParser;
private final boolean buildV9Directly;
private ObjectMapper mapper;
private HadoopDruidIndexerConfig config;
private File dataFile;
private File tmpDir;
private Interval interval;
private String partitionType;
private Object[][][] shardInfoForEachSegment;
private List<String> data;
private boolean useCombiner;
private String inputFormatName;
private InputRowParser inputRowParser;
public IndexGeneratorJobTest(
boolean useCombiner,
@ -297,7 +314,8 @@ public class IndexGeneratorJobTest
Object[][][] shardInfoForEachSegment,
List<String> data,
String inputFormatName,
InputRowParser inputRowParser
InputRowParser inputRowParser,
boolean buildV9Directly
) throws IOException
{
this.useCombiner = useCombiner;
@ -307,6 +325,7 @@ public class IndexGeneratorJobTest
this.data = data;
this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser;
this.buildV9Directly = buildV9Directly;
}
private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
@ -396,7 +415,7 @@ public class IndexGeneratorJobTest
false,
useCombiner,
null,
null
buildV9Directly
)
)
);

View File

@ -37,6 +37,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IndexSizeExceededException;
@ -68,6 +69,7 @@ public class YeOldePlumberSchool implements PlumberSchool
private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private static final Logger log = new Logger(YeOldePlumberSchool.class);
@ -79,6 +81,7 @@ public class YeOldePlumberSchool implements PlumberSchool
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO
)
{
@ -87,6 +90,7 @@ public class YeOldePlumberSchool implements PlumberSchool
this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
}
@ -106,6 +110,9 @@ public class YeOldePlumberSchool implements PlumberSchool
// Set of spilled segments. Will be merged at the end.
final Set<File> spilled = Sets.newHashSet();
// IndexMerger implementation.
final IndexMerger theIndexMerger = config.getBuildV9Directly() ? indexMergerV9 : indexMerger;
return new Plumber()
{
@Override
@ -174,7 +181,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
indexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
}
// Map merged segment so we can extract dimensions
@ -219,7 +226,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
indexMerger.persist(
theIndexMerger.persist(
indexToPersist.getIndex(),
dirToPersist,
null,

View File

@ -124,7 +124,12 @@ public class IndexTask extends AbstractFixedIntervalTask
);
}
static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlushBoundary, IndexSpec indexSpec)
static RealtimeTuningConfig convertTuningConfig(
ShardSpec shardSpec,
int rowFlushBoundary,
IndexSpec indexSpec,
boolean buildV9Directly
)
{
return new RealtimeTuningConfig(
rowFlushBoundary,
@ -136,7 +141,7 @@ public class IndexTask extends AbstractFixedIntervalTask
null,
shardSpec,
indexSpec,
null
buildV9Directly
);
}
@ -355,19 +360,22 @@ public class IndexTask extends AbstractFixedIntervalTask
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser());
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
final IndexMerger indexMerger = ingestionSchema.getTuningConfig().getBuildV9Directly()
? toolbox.getIndexMergerV9()
: toolbox.getIndexMerger();
final Plumber plumber = new YeOldePlumberSchool(
interval,
version,
wrappedDataSegmentPusher,
tmpDir,
indexMerger,
toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO()
).findPlumber(
schema,
convertTuningConfig(shardSpec, myRowFlushBoundary, ingestionSchema.getTuningConfig().getIndexSpec()),
convertTuningConfig(
shardSpec,
myRowFlushBoundary,
ingestionSchema.getTuningConfig().getIndexSpec(),
ingestionSchema.tuningConfig.getBuildV9Directly()
),
metrics
);

View File

@ -47,7 +47,6 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -287,9 +286,6 @@ public class RealtimeIndexTask extends AbstractTask
);
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
IndexMerger indexMerger = spec.getTuningConfig().getBuildV9Directly()
? toolbox.getIndexMergerV9()
: toolbox.getIndexMerger();
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and
// NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the
@ -302,7 +298,8 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
indexMerger,
toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),

View File

@ -34,7 +34,6 @@ import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
@ -342,7 +341,8 @@ public class IndexTaskTest
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(
spec,
config.getRowFlushBoundary(),
config.getIndexSpec()
config.getIndexSpec(),
config.getBuildV9Directly()
);
Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary());
Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec);

View File

@ -109,15 +109,20 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class RealtimeIndexTaskTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
@ -143,10 +148,25 @@ public class RealtimeIndexTaskTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private final boolean buildV9Directly;
private DateTime now;
private ListeningExecutorService taskExec;
private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
@Parameterized.Parameters(name = "buildV9Directly = {0}")
public static Collection<?> constructorFeeder() throws IOException
{
return ImmutableList.of(
new Object[]{true},
new Object[]{false}
);
}
public RealtimeIndexTaskTest(boolean buildV9Directly)
{
this.buildV9Directly = buildV9Directly;
}
@Before
public void setUp()
@ -572,7 +592,7 @@ public class RealtimeIndexTaskTest
null,
null,
null,
null
buildV9Directly
);
return new RealtimeIndexTask(
taskId,
@ -650,7 +670,7 @@ public class RealtimeIndexTaskTest
)
);
handOffCallbacks = Maps.newConcurrentMap();
handoffNotifierFactory = new SegmentHandoffNotifierFactory()
final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)

View File

@ -31,6 +31,7 @@ import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -54,6 +55,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -67,6 +69,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@ -82,6 +85,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null,
queryExecutorService,
indexMerger,
indexMergerV9,
indexIO,
cache,
cacheConfig,
@ -94,6 +98,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
@ -118,7 +123,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
conglomerate,
segmentAnnouncer,
queryExecutorService,
indexMerger,
config.getBuildV9Directly() ? indexMergerV9 : indexMerger,
indexIO,
cache,
cacheConfig,

View File

@ -30,6 +30,7 @@ import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -51,6 +52,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -66,6 +68,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@ -80,6 +83,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryExecutorService = executorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
@ -107,7 +111,7 @@ public class RealtimePlumberSchool implements PlumberSchool
dataSegmentPusher,
segmentPublisher,
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
indexMerger,
config.getBuildV9Directly() ? indexMergerV9 : indexMerger,
indexIO,
cache,
cacheConfig,

View File

@ -107,6 +107,7 @@ public class FireDepartmentTest
null,
null,
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexMergerV9(),
TestHelper.getTestIndexIO(),
MapCache.create(0),
NO_CACHE_CONFIG,

View File

@ -68,7 +68,6 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -82,6 +81,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class RealtimePlumberSchoolTest
{
private final RejectionPolicyFactory rejectionPolicy;
private final boolean buildV9Directly;
private RealtimePlumber plumber;
private RealtimePlumberSchool realtimePlumberSchool;
private DataSegmentAnnouncer announcer;
@ -95,24 +95,28 @@ public class RealtimePlumberSchoolTest
private DataSchema schema2;
private FireDepartmentMetrics metrics;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy, boolean buildV9Directly)
{
this.rejectionPolicy = rejectionPolicy;
this.buildV9Directly = buildV9Directly;
}
@Parameterized.Parameters
@Parameterized.Parameters(name = "rejectionPolicy = {0}, buildV9Directly = {1}")
public static Collection<?> constructorFeeder() throws IOException
{
return Arrays.asList(
new Object[][]{
{
new NoopRejectionPolicyFactory()
},
{
new MessageTimeRejectionPolicyFactory()
}
}
);
final RejectionPolicyFactory[] rejectionPolicies = new RejectionPolicyFactory[]{
new NoopRejectionPolicyFactory(),
new MessageTimeRejectionPolicyFactory()
};
final boolean[] buildV9Directlies = new boolean[]{true, false};
final List<Object[]> constructors = Lists.newArrayList();
for (RejectionPolicyFactory rejectionPolicy : rejectionPolicies) {
for (boolean buildV9Directly : buildV9Directlies) {
constructors.add(new Object[]{rejectionPolicy, buildV9Directly});
}
}
return constructors;
}
@Before
@ -163,7 +167,9 @@ public class RealtimePlumberSchoolTest
dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class);
handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class);
EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(handoffNotifier).anyTimes();
EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString()))
.andReturn(handoffNotifier)
.anyTimes();
EasyMock.expect(
handoffNotifier.registerSegmentHandoffCallback(
EasyMock.<SegmentDescriptor>anyObject(),
@ -186,7 +192,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
null
buildV9Directly
);
realtimePlumberSchool = new RealtimePlumberSchool(
@ -198,6 +204,7 @@ public class RealtimePlumberSchoolTest
handoffNotifierFactory,
MoreExecutors.sameThreadExecutor(),
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexMergerV9(),
TestHelper.getTestIndexIO(),
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
@ -211,7 +218,7 @@ public class RealtimePlumberSchoolTest
@After
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher,handoffNotifierFactory, handoffNotifier, emitter);
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
FileUtils.deleteDirectory(
new File(
tuningConfig.getBasePersistDirectory(),