Make HttpPostEmitter more robust in the face of serious errors (like OutOfMemoryError) during onSealExclusive() (#5386)

This commit is contained in:
Roman Leventov 2018-02-14 04:00:58 +03:00 committed by Charles Allen
parent aa7aee53ce
commit 65225bd6e7
2 changed files with 72 additions and 17 deletions

View File

@ -90,8 +90,12 @@ class Batch extends AbstractQueuedLongSynchronizer
/**
* Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0.
* It's a boxed Integer rather than int, because we want to minimize the number of allocations done in
* {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}.
* @see HttpPostEmitter#onSealExclusive
* @see HttpPostEmitter#concurrentBatch
*/
final int batchNumber;
final Integer batchNumber;
/**
* The number of events in this batch, needed for event count-based batch emitting.

View File

@ -110,9 +110,12 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private final AtomicInteger approximateBuffersToReuseCount = new AtomicInteger();
/**
* concurrentBatch.get() == null means the service is closed.
* concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Integer,
* it means that some thread has failed with a serious error during {@link #onSealExclusive} (with the batch number
* corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} needs to be called. Otherwise (i. e.
* normally), an instance of {@link Batch} is stored in this atomic reference.
*/
private final AtomicReference<Batch> concurrentBatch = new AtomicReference<>();
private final AtomicReference<Object> concurrentBatch = new AtomicReference<>();
private final ConcurrentLinkedDeque<Batch> buffersToEmit = new ConcurrentLinkedDeque<>();
/**
@ -247,10 +250,15 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
}
while (true) {
Batch batch = concurrentBatch.get();
if (batch == null) {
Object batchObj = concurrentBatch.get();
if (batchObj instanceof Integer) {
tryRecoverCurrentBatch((Integer) batchObj);
continue;
}
if (batchObj == null) {
throw new RejectedExecutionException("Service is closed.");
}
Batch batch = (Batch) batchObj;
if (batch.tryAddEvent(eventBytes)) {
return batch;
} else {
@ -294,6 +302,25 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
* Called from {@link Batch} only once for each Batch in existence.
*/
void onSealExclusive(Batch batch, long elapsedTimeMillis)
{
try {
doOnSealExclusive(batch, elapsedTimeMillis);
}
catch (Throwable t) {
try {
if (!concurrentBatch.compareAndSet(batch, batch.batchNumber)) {
log.error("Unexpected failure to set currentBatch to the failed Batch.batchNumber");
}
log.error(t, "Serious error during onSealExclusive(), set currentBatch to the failed Batch.batchNumber");
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
}
private void doOnSealExclusive(Batch batch, long elapsedTimeMillis)
{
batchFillingTimeCounter.add((int) Math.max(elapsedTimeMillis, 0));
if (elapsedTimeMillis > 0) {
@ -305,13 +332,29 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
wakeUpEmittingThread();
if (!isTerminated()) {
int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(batch.batchNumber);
if (!concurrentBatch.compareAndSet(batch, new Batch(this, acquireBuffer(), nextBatchNumber))) {
// If compareAndSet failed, the service is closed concurrently.
byte[] newBuffer = acquireBuffer();
if (!concurrentBatch.compareAndSet(batch, new Batch(this, newBuffer, nextBatchNumber))) {
buffersToReuse.add(newBuffer);
// If compareAndSet failed, the service should be closed concurrently, i. e. we expect isTerminated() = true.
// If we don't see this, there should be some bug in HttpPostEmitter.
Preconditions.checkState(isTerminated());
}
}
}
private void tryRecoverCurrentBatch(Integer failedBatchNumber)
{
log.info("Trying to recover currentBatch");
int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(failedBatchNumber);
byte[] newBuffer = acquireBuffer();
if (concurrentBatch.compareAndSet(failedBatchNumber, new Batch(this, newBuffer, nextBatchNumber))) {
log.info("Successfully recovered currentBatch");
} else {
// It's normal, a concurrent thread could succeed to recover first.
buffersToReuse.add(newBuffer);
}
}
private void addBatchToEmitQueue(Batch batch)
{
limitBuffersToEmitSize();
@ -363,7 +406,10 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
public void flush() throws IOException
{
awaitStarted();
flush(concurrentBatch.get());
Object batchObj = concurrentBatch.get();
if (batchObj instanceof Batch) {
flush((Batch) batchObj);
}
}
private void flush(Batch batch) throws IOException
@ -396,8 +442,10 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
synchronized (startLock) {
if (running) {
running = false;
Batch lastBatch = concurrentBatch.getAndSet(null);
flush(lastBatch);
Object lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch instanceof Batch) {
flush((Batch) lastBatch);
}
emittingThread.shuttingDown = true;
// EmittingThread is interrupted after the last batch is flushed.
emittingThread.interrupt();
@ -478,16 +526,19 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
boolean needsToShutdown = Thread.interrupted() || shuttingDown;
if (needsToShutdown) {
Batch lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch != null) {
lastBatch.seal();
Object lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch instanceof Batch) {
((Batch) lastBatch).seal();
}
} else {
Batch batch = concurrentBatch.get();
if (batch != null) {
batch.sealIfFlushNeeded();
Object batch = concurrentBatch.get();
if (batch instanceof Batch) {
((Batch) batch).sealIfFlushNeeded();
} else {
needsToShutdown = true;
// batch == null means that HttpPostEmitter is terminated. Batch object could also be Integer, if some
// thread just failed with a serious error in onSealExclusive(), in this case we don't want to shutdown
// the emitter thread.
needsToShutdown = batch == null;
}
}
return needsToShutdown;