Be more respectful of maxRowsInMemory. (#3284)

- Appenderator: Respect maxRowsInMemory across all sinks.
- KafkaIndexTask: Respect maxRowsInMemory across all partitions.
This commit is contained in:
Gian Merlino 2016-07-26 14:02:35 -07:00 committed by David Lim
parent 9b5523add3
commit 8030f1cb67
6 changed files with 141 additions and 33 deletions

View File

@ -753,9 +753,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
{ {
final int maxRowsInMemoryPerPartition = (tuningConfig.getMaxRowsInMemory() /
ioConfig.getStartPartitions().getPartitionOffsetMap().size());
return Appenderators.createRealtime( return Appenderators.createRealtime(
dataSchema, dataSchema,
tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")), tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist"))
.withMaxRowsInMemory(maxRowsInMemoryPerPartition),
metrics, metrics,
toolbox.getSegmentPusher(), toolbox.getSegmentPusher(),
toolbox.getObjectMapper(), toolbox.getObjectMapper(),

View File

@ -144,4 +144,19 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
handoffConditionTimeout handoffConditionTimeout
); );
} }
public KafkaTuningConfig withMaxRowsInMemory(int rows)
{
return new KafkaTuningConfig(
rows,
maxRowsPerSegment,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
);
}
} }

View File

@ -33,7 +33,7 @@ import java.util.List;
* An Appenderator indexes data. It has some in-memory data and some persisted-on-disk data. It can serve queries on * An Appenderator indexes data. It has some in-memory data and some persisted-on-disk data. It can serve queries on
* both of those. It can also push data to deep storage. But, it does not decide which segments data should go into. * both of those. It can also push data to deep storage. But, it does not decide which segments data should go into.
* It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself! * It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself!
* <p/> * <p>
* Any time you call one of the methods that adds, persists, or pushes data, you must provide a Committer, or a * Any time you call one of the methods that adds, persists, or pushes data, you must provide a Committer, or a
* Supplier of one, that represents all data you have given to the Appenderator so far. The Committer will be used when * Supplier of one, that represents all data you have given to the Appenderator so far. The Committer will be used when
* that data has been persisted to disk. * that data has been persisted to disk.
@ -54,13 +54,13 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
/** /**
* Add a row. Must not be called concurrently from multiple threads. * Add a row. Must not be called concurrently from multiple threads.
* <p/> * <p>
* If no pending segment exists for the provided identifier, a new one will be created. * If no pending segment exists for the provided identifier, a new one will be created.
* <p/> * <p>
* This method may trigger a {@link #persistAll(Committer)} using the supplied Committer. If it does this, the * This method may trigger a {@link #persistAll(Committer)} using the supplied Committer. If it does this, the
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously. * asynchronously.
* <p/> * <p>
* The add, clear, persistAll, and push methods should all be called from the same thread. * The add, clear, persistAll, and push methods should all be called from the same thread.
* *
* @param identifier the segment into which this row should be added * @param identifier the segment into which this row should be added
@ -95,6 +95,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if, * Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if,
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been * for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first. * cleared. This may take some time, since all pending persists must finish first.
*
* The add, clear, persistAll, and push methods should all be called from the same thread.
*/ */
void clear() throws InterruptedException; void clear() throws InterruptedException;
@ -102,7 +104,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* Drop all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk commit * Drop all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk commit
* metadata will remain unchanged. If there is no pending segment with this identifier, then this method will * metadata will remain unchanged. If there is no pending segment with this identifier, then this method will
* do nothing. * do nothing.
* <p/> * <p>
* You should not write to the dropped segment after calling "drop". If you need to drop all your data and * You should not write to the dropped segment after calling "drop". If you need to drop all your data and
* re-write it, consider {@link #clear()} instead. * re-write it, consider {@link #clear()} instead.
* *
@ -117,7 +119,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually * machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk. * disk.
* <p/> * <p>
* The add, clear, persistAll, and push methods should all be called from the same thread. * The add, clear, persistAll, and push methods should all be called from the same thread.
* *
* @param committer a committer associated with all data that has been added so far * @param committer a committer associated with all data that has been added so far
@ -129,9 +131,9 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
/** /**
* Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)} * Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)}
* using the provided Committer. * using the provided Committer.
* <p/> * <p>
* After this method is called, you cannot add new data to any segments that were previously under construction. * After this method is called, you cannot add new data to any segments that were previously under construction.
* <p/> * <p>
* The add, clear, persistAll, and push methods should all be called from the same thread. * The add, clear, persistAll, and push methods should all be called from the same thread.
* *
* @param identifiers list of segments to push * @param identifiers list of segments to push

View File

@ -18,6 +18,7 @@
package io.druid.segment.realtime.appenderator; package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -85,6 +86,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
@ -108,10 +110,11 @@ public class AppenderatorImpl implements Appenderator
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>( private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER String.CASE_INSENSITIVE_ORDER
); );
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final QuerySegmentWalker texasRanger; private final QuerySegmentWalker texasRanger;
private volatile ListeningExecutorService persistExecutor = null; private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService mergeExecutor = null; private volatile ListeningExecutorService pushExecutor = null;
private volatile long nextFlush; private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null; private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null; private volatile FileChannel basePersistDirLockChannel = null;
@ -188,26 +191,34 @@ public class AppenderatorImpl implements Appenderator
} }
final Sink sink = getOrCreateSink(identifier); final Sink sink = getOrCreateSink(identifier);
int sinkRetVal; final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
final int sinkRowsInMemoryAfterAdd;
try { try {
sinkRetVal = sink.add(row); sinkRowsInMemoryAfterAdd = sink.add(row);
} }
catch (IndexSizeExceededException e) { catch (IndexSizeExceededException e) {
// Try one more time after swapping, then throw the exception out if it happens again. // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
persistAll(committerSupplier.get()); // can't add the row (it just failed). This should never actually happen, though, because we check
sinkRetVal = sink.add(row); // sink.canAddRow after returning from add.
log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
throw e;
} }
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) { if (sinkRowsInMemoryAfterAdd < 0) {
persistAll(committerSupplier.get());
}
if (sinkRetVal < 0) {
throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier); throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
} else {
return sink.getNumRows();
} }
rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd);
if (!sink.canAppendRow()
|| System.currentTimeMillis() > nextFlush
|| rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
persistAll(committerSupplier.get());
}
return sink.getNumRows();
} }
@Override @Override
@ -228,6 +239,12 @@ public class AppenderatorImpl implements Appenderator
} }
} }
@VisibleForTesting
int getRowsInMemory()
{
return rowsCurrentlyInMemory.get();
}
private Sink getOrCreateSink(final SegmentIdentifier identifier) private Sink getOrCreateSink(final SegmentIdentifier identifier)
{ {
Sink retVal = sinks.get(identifier); Sink retVal = sinks.get(identifier);
@ -410,6 +427,9 @@ public class AppenderatorImpl implements Appenderator
runExecStopwatch.stop(); runExecStopwatch.stop();
resetNextFlush(); resetNextFlush();
// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
rowsCurrentlyInMemory.set(0);
return future; return future;
} }
@ -455,7 +475,7 @@ public class AppenderatorImpl implements Appenderator
return new SegmentsAndMetadata(dataSegments, commitMetadata); return new SegmentsAndMetadata(dataSegments, commitMetadata);
} }
}, },
mergeExecutor pushExecutor
); );
} }
@ -464,9 +484,9 @@ public class AppenderatorImpl implements Appenderator
* This is useful if we're going to do something that would otherwise potentially break currently in-progress * This is useful if we're going to do something that would otherwise potentially break currently in-progress
* pushes. * pushes.
*/ */
private ListenableFuture<?> mergeBarrier() private ListenableFuture<?> pushBarrier()
{ {
return mergeExecutor.submit( return pushExecutor.submit(
new Runnable() new Runnable()
{ {
@Override @Override
@ -480,7 +500,7 @@ public class AppenderatorImpl implements Appenderator
/** /**
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only
* be run in the single-threaded mergeExecutor. * be run in the single-threaded pushExecutor.
* *
* @param identifier sink identifier * @param identifier sink identifier
* @param sink sink to push * @param sink sink to push
@ -589,7 +609,7 @@ public class AppenderatorImpl implements Appenderator
try { try {
shutdownExecutors(); shutdownExecutors();
Preconditions.checkState(persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated"); Preconditions.checkState(persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated");
Preconditions.checkState(mergeExecutor.awaitTermination(365, TimeUnit.DAYS), "mergeExecutor not terminated"); Preconditions.checkState(pushExecutor.awaitTermination(365, TimeUnit.DAYS), "pushExecutor not terminated");
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -647,9 +667,9 @@ public class AppenderatorImpl implements Appenderator
) )
); );
} }
if (mergeExecutor == null) { if (pushExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow // use a blocking single threaded executor to throttle the firehose when write to disk is slow
mergeExecutor = MoreExecutors.listeningDecorator( pushExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded( Execs.newBlockingSingleThreaded(
"appenderator_merge_%d", 1 "appenderator_merge_%d", 1
) )
@ -660,7 +680,7 @@ public class AppenderatorImpl implements Appenderator
private void shutdownExecutors() private void shutdownExecutors()
{ {
persistExecutor.shutdownNow(); persistExecutor.shutdownNow();
mergeExecutor.shutdownNow(); pushExecutor.shutdownNow();
} }
private void resetNextFlush() private void resetNextFlush()
@ -830,12 +850,18 @@ public class AppenderatorImpl implements Appenderator
final boolean removeOnDiskData final boolean removeOnDiskData
) )
{ {
// Mark this identifier as dropping, so no future merge tasks will pick it up. // Ensure no future writes will be made to this sink.
sink.finishWriting();
// Mark this identifier as dropping, so no future push tasks will pick it up.
droppingSinks.add(identifier); droppingSinks.add(identifier);
// Wait for any outstanding merges to finish, then abandon the segment inside the persist thread. // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform( return Futures.transform(
mergeBarrier(), pushBarrier(),
new Function<Object, Object>() new Function<Object, Object>()
{ {
@Nullable @Nullable

View File

@ -228,6 +228,18 @@ public class Sink implements Iterable<FireHydrant>
} }
} }
public int getNumRowsInMemory()
{
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return 0;
}
return currHydrant.getIndex().size();
}
}
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{ {
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()

View File

@ -135,6 +135,56 @@ public class AppenderatorTest
} }
} }
@Test
public void testMaxRowsInMemory() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(3)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@Override
public Committer get()
{
final Object metadata = ImmutableMap.of("eventCount", eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
// Do nothing
}
};
}
};
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 1), committerSupplier);
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 1), committerSupplier);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}
@Test @Test
public void testRestoreFromDisk() throws Exception public void testRestoreFromDisk() throws Exception
{ {