Fix maxBytesInMemory for heap overhead of all sinks and hydrants check (#10891)

* fix maxBytesInMemory

* fix maxBytesInMemory check

* fix maxBytesInMemory check

* fix test
This commit is contained in:
Maytas Monsereenusorn 2021-02-18 21:48:57 -08:00 committed by GitHub
parent cbbef80c7f
commit f5bfccc720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 39 deletions

View File

@ -332,6 +332,47 @@ public class AppenderatorImpl implements Appenderator
if (allowIncrementalPersists) { if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it. // persistAll clears rowsCurrentlyInMemory, no need to update it.
log.info("Flushing in-memory data to disk because %s.", String.join(",", persistReasons)); log.info("Flushing in-memory data to disk because %s.", String.join(",", persistReasons));
long bytesPersisted = 0L;
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
final Sink sinkEntry = entry.getValue();
if (sinkEntry != null) {
bytesPersisted += sinkEntry.getBytesInMemory();
if (sinkEntry.swappable()) {
// After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
}
}
}
if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - bytesPersisted > maxBytesTuningConfig) {
// We are still over maxBytesTuningConfig even after persisting.
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
final String alertMessage = StringUtils.format(
"Task has exceeded safe estimated heap usage limits, failing "
+ "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
getTotalRowCount()
);
final String errorMessage = StringUtils.format(
"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
+ "great to have enough space to process additional input rows. This check, along with metering the overhead "
+ "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting "
+ "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter "
+ "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an "
+ "increase in heap footprint, but will allow for more intermediary segment persists to occur before "
+ "reaching this condition.",
alertMessage
);
log.makeAlert(alertMessage)
.addData("dataSource", schema.getDataSource())
.emit();
throw new RuntimeException(errorMessage);
}
Futures.addCallback( Futures.addCallback(
persistAll(committerSupplier == null ? null : committerSupplier.get()), persistAll(committerSupplier == null ? null : committerSupplier.get()),
new FutureCallback<Object>() new FutureCallback<Object>()
@ -513,7 +554,6 @@ public class AppenderatorImpl implements Appenderator
public ListenableFuture<Object> persistAll(@Nullable final Committer committer) public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{ {
throwPersistErrorIfExists(); throwPersistErrorIfExists();
long bytesInMemoryBeforePersist = bytesCurrentlyInMemory.get();
final Map<String, Integer> currentHydrants = new HashMap<>(); final Map<String, Integer> currentHydrants = new HashMap<>();
final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>(); final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>();
int numPersistedRows = 0; int numPersistedRows = 0;
@ -539,16 +579,9 @@ public class AppenderatorImpl implements Appenderator
} }
if (sink.swappable()) { if (sink.swappable()) {
// After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
indexesToPersist.add(Pair.of(sink.swap(), identifier)); indexesToPersist.add(Pair.of(sink.swap(), identifier));
} }
} }
log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
final Object commitMetadata = committer == null ? null : committer.getMetadata(); final Object commitMetadata = committer == null ? null : committer.getMetadata();
@ -638,33 +671,6 @@ public class AppenderatorImpl implements Appenderator
log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, bytesPersisted); log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, bytesPersisted);
// bytesCurrentlyInMemory can change while persisting due to concurrent ingestion.
// Hence, we use bytesInMemoryBeforePersist to determine the change of this persist
if (!skipBytesInMemoryOverheadCheck && bytesInMemoryBeforePersist - bytesPersisted > maxBytesTuningConfig) {
// We are still over maxBytesTuningConfig even after persisting.
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
final String alertMessage = StringUtils.format(
"Task has exceeded safe estimated heap usage limits, failing "
+ "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
getTotalRowCount()
);
final String errorMessage = StringUtils.format(
"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
+ "great to have enough space to process additional input rows. This check, along with metering the overhead "
+ "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting "
+ "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter "
+ "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an "
+ "increase in heap footprint, but will allow for more intermediary segment persists to occur before "
+ "reaching this condition.",
alertMessage
);
log.makeAlert(alertMessage)
.addData("dataSource", schema.getDataSource())
.emit();
throw new RuntimeException(errorMessage);
}
return future; return future;
} }

View File

@ -257,7 +257,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test @Test
public void testMaxBytesInMemory() throws Exception public void testMaxBytesInMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, 10000, true)) { try (final AppenderatorTester tester = new AppenderatorTester(100, 15000, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -297,7 +297,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
); );
// We do multiple more adds to the same sink to cause persist. // We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 26; i++) { for (int i = 0; i < 53; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
} }
sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
@ -333,7 +333,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
); );
// We do multiple more adds to the same sink to cause persist. // We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 5; i++) { for (int i = 0; i < 31; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
} }
// currHydrant size is 0 since we just persist all indexes to disk. // currHydrant size is 0 since we just persist all indexes to disk.
@ -363,7 +363,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test(expected = RuntimeException.class) @Test(expected = RuntimeException.class)
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, 10, true)) { try (final AppenderatorTester tester = new AppenderatorTester(100, 5180, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {