KafkaIndexTask: Use a separate sequence per Kafka partition in order to make (#2844)

segment creation deterministic.

This means that each segment will contain data from just one Kafka
partition. So, users will probably not want to have a super high number
of Kafka partitions...

Fixes #2703.
This commit is contained in:
Gian Merlino 2016-04-18 22:29:52 -07:00 committed by Fangjin Yang
parent 7b65ca7889
commit 08c784fbf6
9 changed files with 194 additions and 98 deletions

View File

@ -30,7 +30,7 @@ public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private final String sequenceName;
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
private final Map<String, String> consumerProperties;
@ -38,14 +38,14 @@ public class KafkaIOConfig implements IOConfig
@JsonCreator
public KafkaIOConfig(
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction
)
{
this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions");
this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
@ -72,9 +72,9 @@ public class KafkaIOConfig implements IOConfig
}
@JsonProperty
public String getSequenceName()
public String getBaseSequenceName()
{
return sequenceName;
return baseSequenceName;
}
@JsonProperty
@ -105,7 +105,7 @@ public class KafkaIOConfig implements IOConfig
public String toString()
{
return "KafkaIOConfig{" +
"sequenceName='" + sequenceName + '\'' +
"baseSequenceName='" + baseSequenceName + '\'' +
", startPartitions=" + startPartitions +
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +

View File

@ -236,6 +236,12 @@ public class KafkaIndexTask extends AbstractTask
}
}
// Set up sequenceNames.
final Map<Integer, String> sequenceNames = Maps.newHashMap();
for (Integer partitionNum : nextOffsets.keySet()) {
sequenceNames.put(partitionNum, String.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum));
}
// Set up committer.
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@ -295,7 +301,7 @@ public class KafkaIndexTask extends AbstractTask
// Main loop.
// Could eventually support early termination (triggered by a supervisor)
// Could eventually support leader/follower mode (for keeping replicas more in sync)
boolean stillReading = true;
boolean stillReading = !assignment.isEmpty();
while (stillReading) {
if (stopping) {
log.info("Stopping early.");
@ -352,7 +358,11 @@ public class KafkaIndexTask extends AbstractTask
try {
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row");
final SegmentIdentifier identifier = driver.add(row, committerSupplier);
final SegmentIdentifier identifier = driver.add(
row,
sequenceNames.get(record.partition()),
committerSupplier
);
if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
@ -525,11 +535,7 @@ public class KafkaIndexTask extends AbstractTask
{
return new FiniteAppenderatorDriver(
appenderator,
new ActionBasedSegmentAllocator(
toolbox.getTaskActionClient(),
dataSchema,
ioConfig.getSequenceName()
),
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),

View File

@ -170,7 +170,8 @@ public class KafkaIndexTaskTest
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "e", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, "unparseable".getBytes()),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2013", "f", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2012", "g", "y", 1.0f))
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2012", "g", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2011", "h", "y", 1.0f))
);
static {
@ -377,6 +378,42 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
}
@Test(timeout = 60_000L)
public void testRunOnNothing() throws Exception
{
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) {
kafkaProducer.send(record).get();
}
}
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
true
),
null
);
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(0, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
}
@Test(timeout = 60_000L)
public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
{
@ -714,7 +751,7 @@ public class KafkaIndexTaskTest
new KafkaIOConfig(
"sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(),
true
),
@ -734,24 +771,30 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(5, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
SegmentDescriptor desc3 = SD(task, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
SegmentDescriptor desc3 = SD(task, "2011/P1D", 1);
SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L))),
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc3));
Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc4));
// Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically
Assert.assertEquals(
ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
ImmutableSet.of(readSegmentDim1(desc2), readSegmentDim1(desc3))
);
}
@Test(timeout = 60_000L)

View File

@ -58,6 +58,8 @@ public class TestBroker implements Closeable
{
final Properties props = new Properties();
props.setProperty("zookeeper.connect", zookeeperConnect);
props.setProperty("zookeeper.session.timeout.ms", "30000");
props.setProperty("zookeeper.connection.timeout.ms", "30000");
props.setProperty("log.dirs", directory.toString());
props.setProperty("broker.id", String.valueOf(id));
props.setProperty("port", String.valueOf(new Random().nextInt(9999) + 10000));

View File

@ -32,22 +32,20 @@ public class ActionBasedSegmentAllocator implements SegmentAllocator
{
private final TaskActionClient taskActionClient;
private final DataSchema dataSchema;
private final String sequenceName;
public ActionBasedSegmentAllocator(
TaskActionClient taskActionClient,
DataSchema dataSchema,
String sequenceName
DataSchema dataSchema
)
{
this.taskActionClient = taskActionClient;
this.dataSchema = dataSchema;
this.sequenceName = sequenceName;
}
@Override
public SegmentIdentifier allocate(
final DateTime timestamp,
final String sequenceName,
final String previousSegmentId
) throws IOException
{

View File

@ -27,7 +27,9 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -75,10 +77,13 @@ public class FiniteAppenderatorDriver implements Closeable
private final int maxRowsPerSegment;
private final long handoffConditionTimeout;
// Key = Start of segment intervals. Value = Segment we're currently adding data to.
// All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments".
private final NavigableMap<Long, SegmentIdentifier> activeSegments = new TreeMap<>();
private volatile String lastSegmentId = null;
// sequenceName -> start of segment interval -> segment we're currently adding data to
private final Map<String, NavigableMap<Long, SegmentIdentifier>> activeSegments = new TreeMap<>();
// sequenceName -> most recently allocated segment
private final Map<String, String> lastSegmentIds = Maps.newHashMap();
// Notified when segments are dropped.
private final Object handoffMonitor = new Object();
@ -136,10 +141,17 @@ public class FiniteAppenderatorDriver implements Closeable
if (metadata != null) {
synchronized (activeSegments) {
for (SegmentIdentifier identifier : metadata.getActiveSegments()) {
activeSegments.put(identifier.getInterval().getStartMillis(), identifier);
for (Map.Entry<String, List<SegmentIdentifier>> entry : metadata.getActiveSegments().entrySet()) {
final String sequenceName = entry.getKey();
final TreeMap<Long, SegmentIdentifier> segmentMap = Maps.newTreeMap();
lastSegmentIds.put(sequenceName, metadata.getLastSegmentIds().get(sequenceName));
activeSegments.put(sequenceName, segmentMap);
for (SegmentIdentifier identifier : entry.getValue()) {
segmentMap.put(identifier.getInterval().getStartMillis(), identifier);
}
}
lastSegmentId = metadata.getPreviousSegmentId();
}
return metadata.getCallerMetadata();
@ -163,6 +175,7 @@ public class FiniteAppenderatorDriver implements Closeable
* Add a row. Must not be called concurrently from multiple threads.
*
* @param row the row to add
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
*
* @return segment to which this row was added, or null if segment allocator returned null for this row
@ -171,10 +184,15 @@ public class FiniteAppenderatorDriver implements Closeable
*/
public SegmentIdentifier add(
final InputRow row,
final String sequenceName,
final Supplier<Committer> committerSupplier
) throws IOException
{
final SegmentIdentifier identifier = getSegment(row.getTimestamp());
Preconditions.checkNotNull(row, "row");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(committerSupplier, "committerSupplier");
final SegmentIdentifier identifier = getSegment(row.getTimestamp(), sequenceName);
if (identifier != null) {
try {
@ -191,24 +209,10 @@ public class FiniteAppenderatorDriver implements Closeable
return identifier;
}
public int getActiveSegmentCount()
{
synchronized (activeSegments) {
return activeSegments.size();
}
}
public List<SegmentIdentifier> getActiveSegments()
{
synchronized (activeSegments) {
return ImmutableList.copyOf(activeSegments.values());
}
}
/**
* Persist all data indexed through this driver so far. Blocks until complete.
*
* Should be called after all data has been added through {@link #add(InputRow, Supplier)}.
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}.
*
* @param committer committer representing all data that has been added so far
*
@ -235,7 +239,7 @@ public class FiniteAppenderatorDriver implements Closeable
* Publish all data indexed through this driver so far, and waits for it to be handed off. Blocks until complete.
* Retries forever on transient failures, but may exit early on permanent failures.
*
* Should be called after all data has been added and persisted through {@link #add(InputRow, Supplier)} and
* Should be called after all data has been added and persisted through {@link #add(InputRow, String, Supplier)} and
* {@link #persist(Committer)}.
*
* @param publisher publisher to use for this set of segments
@ -256,7 +260,8 @@ public class FiniteAppenderatorDriver implements Closeable
? System.currentTimeMillis() + handoffConditionTimeout
: 0;
log.info("Awaiting handoff...");
log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments()));
synchronized (handoffMonitor) {
while (!appenderator.getSegments().isEmpty()) {
@ -276,6 +281,8 @@ public class FiniteAppenderatorDriver implements Closeable
}
}
log.info("All segments handed off.");
return new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
@ -294,10 +301,16 @@ public class FiniteAppenderatorDriver implements Closeable
handoffNotifier.close();
}
private SegmentIdentifier getActiveSegment(final DateTime timestamp)
private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName)
{
synchronized (activeSegments) {
final Map.Entry<Long, SegmentIdentifier> candidateEntry = activeSegments.floorEntry(timestamp.getMillis());
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
if (activeSegmentsForSequence == null) {
return null;
}
final Map.Entry<Long, SegmentIdentifier> candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis());
if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) {
return candidateEntry.getValue();
} else {
@ -309,39 +322,50 @@ public class FiniteAppenderatorDriver implements Closeable
/**
* Return a segment usable for "timestamp". May return null if no segment can be allocated.
*
* @param timestamp data timestamp
* @param timestamp data timestamp
* @param sequenceName sequenceName for potential segment allocation
*
* @return identifier, or null
*
* @throws IOException if an exception occurs while allocating a segment
*/
private SegmentIdentifier getSegment(final DateTime timestamp) throws IOException
private SegmentIdentifier getSegment(final DateTime timestamp, final String sequenceName) throws IOException
{
synchronized (activeSegments) {
final SegmentIdentifier existing = getActiveSegment(timestamp);
final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName);
if (existing != null) {
return existing;
} else {
// Allocate new segment.
final SegmentIdentifier newSegment = segmentAllocator.allocate(timestamp, lastSegmentId);
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
final SegmentIdentifier newSegment = segmentAllocator.allocate(
timestamp,
sequenceName,
lastSegmentIds.get(sequenceName)
);
if (newSegment != null) {
final Long key = newSegment.getInterval().getStartMillis();
final SegmentIdentifier conflicting = activeSegments.get(key);
if (conflicting != null) {
throw new ISE(
"WTF?! Allocated segment[%s] which conflicts with existing segment[%s].",
newSegment,
conflicting
);
for (SegmentIdentifier identifier : appenderator.getSegments()) {
if (identifier.equals(newSegment)) {
throw new ISE(
"WTF?! Allocated segment[%s] which conflicts with existing segment[%s].",
newSegment,
identifier
);
}
}
log.info("New segment[%s].", newSegment);
activeSegments.put(key, newSegment);
lastSegmentId = newSegment.getIdentifierAsString();
log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName);
if (activeSegmentsForSequence == null) {
activeSegments.put(sequenceName, Maps.<Long, SegmentIdentifier>newTreeMap());
}
activeSegments.get(sequenceName).put(key, newSegment);
lastSegmentIds.put(sequenceName, newSegment.getIdentifierAsString());
} else {
// Well, we tried.
log.warn("Cannot allocate segment for timestamp[%s]. ", timestamp);
log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName);
}
return newSegment;
@ -368,14 +392,15 @@ public class FiniteAppenderatorDriver implements Closeable
/**
* Push and publish all segments to the metadata store.
*
* @param publisher segment publisher
* @param committer wrapped committer (from wrapCommitter)
* @param publisher segment publisher
* @param wrappedCommitter wrapped committer (from wrapCommitter)
*
* @return published segments and metadata
* @return published segments and metadata, or null if segments could not be published due to transaction failure
* with commit metadata.
*/
private SegmentsAndMetadata publishAll(
final TransactionalSegmentPublisher publisher,
final Committer committer
final Committer wrappedCommitter
) throws InterruptedException
{
final List<SegmentIdentifier> theSegments = ImmutableList.copyOf(appenderator.getSegments());
@ -384,7 +409,7 @@ public class FiniteAppenderatorDriver implements Closeable
while (true) {
try {
log.info("Pushing segments: [%s]", Joiner.on(", ").join(theSegments));
final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, committer).get();
final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, wrappedCommitter).get();
// Sanity check
if (!segmentsToIdentifiers(segmentsAndMetadata.getSegments()).equals(Sets.newHashSet(theSegments))) {
@ -401,21 +426,25 @@ public class FiniteAppenderatorDriver implements Closeable
Joiner.on(", ").join(segmentsAndMetadata.getSegments())
);
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
);
if (published) {
log.info("Published segments, awaiting handoff.");
if (segmentsAndMetadata.getSegments().isEmpty()) {
log.info("Nothing to publish, skipping publish step.");
} else {
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info("Our segments really do exist, awaiting handoff.");
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
);
if (published) {
log.info("Published segments, awaiting handoff.");
} else {
log.warn("Our segments don't exist, giving up.");
return null;
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info("Our segments really do exist, awaiting handoff.");
} else {
log.warn("Our segments don't exist, giving up.");
return null;
}
}
}
@ -491,8 +520,20 @@ public class FiniteAppenderatorDriver implements Closeable
{
synchronized (activeSegments) {
final FiniteAppenderatorDriverMetadata wrappedMetadata = new FiniteAppenderatorDriverMetadata(
ImmutableList.copyOf(activeSegments.values()),
lastSegmentId,
ImmutableMap.copyOf(
Maps.transformValues(
activeSegments,
new Function<NavigableMap<Long, SegmentIdentifier>, List<SegmentIdentifier>>()
{
@Override
public List<SegmentIdentifier> apply(NavigableMap<Long, SegmentIdentifier> input)
{
return ImmutableList.copyOf(input.values());
}
}
)
),
ImmutableMap.copyOf(lastSegmentIds),
committer.getMetadata()
);

View File

@ -23,35 +23,36 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
public class FiniteAppenderatorDriverMetadata
{
private final List<SegmentIdentifier> activeSegments;
private final String previousSegmentId;
private final Map<String, List<SegmentIdentifier>> activeSegments;
private final Map<String, String> lastSegmentIds;
private final Object callerMetadata;
@JsonCreator
public FiniteAppenderatorDriverMetadata(
@JsonProperty("activeSegments") List<SegmentIdentifier> activeSegments,
@JsonProperty("previousSegmentId") String previousSegmentId,
@JsonProperty("activeSegments") Map<String, List<SegmentIdentifier>> activeSegments,
@JsonProperty("lastSegmentIds") Map<String, String> lastSegmentIds,
@JsonProperty("callerMetadata") Object callerMetadata
)
{
this.activeSegments = activeSegments;
this.previousSegmentId = previousSegmentId;
this.lastSegmentIds = lastSegmentIds;
this.callerMetadata = callerMetadata;
}
@JsonProperty
public List<SegmentIdentifier> getActiveSegments()
public Map<String, List<SegmentIdentifier>> getActiveSegments()
{
return activeSegments;
}
@JsonProperty
public String getPreviousSegmentId()
public Map<String, String> getLastSegmentIds()
{
return previousSegmentId;
return lastSegmentIds;
}
@JsonProperty
@ -65,7 +66,7 @@ public class FiniteAppenderatorDriverMetadata
{
return "FiniteAppenderatorDriverMetadata{" +
"activeSegments=" + activeSegments +
", previousSegmentId='" + previousSegmentId + '\'' +
", lastSegmentIds=" + lastSegmentIds +
", callerMetadata=" + callerMetadata +
'}';
}

View File

@ -29,12 +29,14 @@ public interface SegmentAllocator
* Allocates a new segment for a given timestamp.
*
* @param timestamp timestamp of the event which triggered this allocation request
* @param previousSegmentId segment identifier returned on the previous call to allocate
* @param sequenceName sequenceName for this allocation
* @param previousSegmentId segment identifier returned on the previous call to allocate for your sequenceName
*
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
SegmentIdentifier allocate(
DateTime timestamp,
String sequenceName,
String previousSegmentId
) throws IOException;
}

View File

@ -85,6 +85,7 @@ public class FiniteAppenderatorDriverTest
)
);
SegmentAllocator allocator;
AppenderatorTester appenderatorTester;
FiniteAppenderatorDriver driver;
@ -92,9 +93,10 @@ public class FiniteAppenderatorDriverTest
public void setUp()
{
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularity.HOUR);
driver = new FiniteAppenderatorDriver(
appenderatorTester.getAppenderator(),
new TestSegmentAllocator(DATA_SOURCE, Granularity.HOUR),
allocator,
new TestSegmentHandoffNotifierFactory(),
new TestUsedSegmentChecker(),
OBJECT_MAPPER,
@ -119,7 +121,7 @@ public class FiniteAppenderatorDriverTest
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertNotNull(driver.add(ROWS.get(i), committerSupplier));
Assert.assertNotNull(driver.add(ROWS.get(i), "dummy", committerSupplier));
}
final SegmentsAndMetadata segmentsAndMetadata = driver.finish(
@ -212,6 +214,7 @@ public class FiniteAppenderatorDriverTest
@Override
public SegmentIdentifier allocate(
final DateTime timestamp,
final String sequenceName,
final String previousSegmentId
) throws IOException
{