NIFI-6501 Refactor CaptureChangeMySQL with bounded queue of 1000

This closes #6791

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2022-12-16 13:46:15 -05:00 committed by exceptionfactory
parent 58e3024db0
commit 8986f4b33a
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 13 additions and 11 deletions

View File

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class BinlogEventListener implements BinaryLogClient.EventListener {
protected final AtomicBoolean stopNow = new AtomicBoolean(false);
private final AtomicBoolean stopNow = new AtomicBoolean(false);
private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
private final BlockingQueue<RawBinlogEvent> queue;
@ -49,17 +49,17 @@ public class BinlogEventListener implements BinaryLogClient.EventListener {
@Override
public void onEvent(Event event) {
while (!stopNow.get()) {
RawBinlogEvent ep = new RawBinlogEvent(event, client.getBinlogFilename());
try {
RawBinlogEvent ep = new RawBinlogEvent(event, client.getBinlogFilename());
try {
while (!stopNow.get()) {
if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, TimeUnit.MILLISECONDS)) {
return;
} else {
throw new RuntimeException("Unable to add event to the queue");
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while adding event to the queue");
}
throw new RuntimeException("Stopped while waiting to enqueue event");
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while adding event to the queue");
}
}
}
}

View File

@ -114,6 +114,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -434,7 +435,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private BinlogLifecycleListener lifecycleListener;
private GtidSet gtidSet;
private final LinkedBlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>();
// Set queue capacity to avoid excessive memory consumption
private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000);
private volatile String currentBinlogFile = null;
private volatile long currentBinlogPosition = 4;
private volatile String currentGtidSet = null;
@ -1192,7 +1194,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
* @param q A queue used to communicate events between the listener and the NiFi processor thread.
* @return A BinlogEventListener instance, which will be notified of events associated with the specified client
*/
BinlogEventListener createBinlogEventListener(BinaryLogClient client, LinkedBlockingQueue<RawBinlogEvent> q) {
BinlogEventListener createBinlogEventListener(BinaryLogClient client, BlockingQueue<RawBinlogEvent> q) {
return new BinlogEventListener(client, q);
}