add log msg when event recvr firehose buffer is full (#3209)

This commit is contained in:
Himanshu 2016-07-01 17:35:30 -05:00 committed by Parag Jain
parent 8eeae2e844
commit e1313e4b90
1 changed files with 8 additions and 1 deletions

View File

@ -57,7 +57,6 @@ import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.util.Collection;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -147,6 +146,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private volatile InputRow nextRow = null; private volatile InputRow nextRow = null;
private volatile boolean closed = false; private volatile boolean closed = false;
private final AtomicLong bytesReceived = new AtomicLong(0); private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
public EventReceiverFirehose(MapInputRowParser parser) public EventReceiverFirehose(MapInputRowParser parser)
{ {
@ -298,6 +298,13 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
boolean added = false; boolean added = false;
while (!closed && !added) { while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS); added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
if (!added) {
long currTime = System.currentTimeMillis();
long lastTime = lastBufferAddFailMsgTime.get();
if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
}
}
} }
if (!added) { if (!added) {