fix ClassCastException in FiniteAppenderatorDriver (#2896)

This commit is contained in:
David Lim 2016-04-28 19:39:24 -06:00 committed by Fangjin Yang
parent 3f71a4a302
commit 5f0a9ccc57
2 changed files with 35 additions and 3 deletions

View File

@ -198,7 +198,7 @@ public class FiniteAppenderatorDriver implements Closeable
try {
final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier));
if (numRows >= maxRowsPerSegment) {
moveSegmentOut(ImmutableList.of(identifier));
moveSegmentOut(sequenceName, ImmutableList.of(identifier));
}
}
catch (SegmentNotWritableException e) {
@ -376,13 +376,18 @@ public class FiniteAppenderatorDriver implements Closeable
/**
* Move a set of identifiers out from "active", making way for newer segments.
*/
private void moveSegmentOut(final List<SegmentIdentifier> identifiers)
private void moveSegmentOut(final String sequenceName, final List<SegmentIdentifier> identifiers)
{
synchronized (activeSegments) {
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
if (activeSegmentsForSequence == null) {
throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName);
}
for (final SegmentIdentifier identifier : identifiers) {
log.info("Moving segment[%s] out of active list.", identifier);
final long key = identifier.getInterval().getStartMillis();
if (activeSegments.remove(key) != identifier) {
if (activeSegmentsForSequence.remove(key) != identifier) {
throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier);
}
}

View File

@ -140,6 +140,33 @@ public class FiniteAppenderatorDriverTest
Assert.assertEquals(3, segmentsAndMetadata.getCommitMetadata());
}
@Test
public void testMaxRowsPerSegment() throws Exception
{
final int numSegments = 3;
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
Assert.assertNull(driver.startJob());
for (int i = 0; i < numSegments * MAX_ROWS_PER_SEGMENT; i++) {
committerSupplier.setMetadata(i + 1);
InputRow row = new MapBasedInputRow(
new DateTime("2000T01"),
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of(
"dim2",
String.format("bar-%d", i),
"met1",
2.0
)
);
Assert.assertNotNull(driver.add(row, "dummy", committerSupplier));
}
final SegmentsAndMetadata segmentsAndMetadata = driver.finish(makeOkPublisher(), committerSupplier.get());
Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata());
}
private Set<SegmentIdentifier> asIdentifiers(Iterable<DataSegment> segments)
{
return ImmutableSet.copyOf(