NIFI-9548: When disabling RPG transmission, wait for the ports to complete in a background thread instead of blocking the web thread. Also moved the RPG initialization logic into flow controller instead of flow service and added a delay in order to reduce likelihood of ConnectException happening when pointing to nodes in the same cluster

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5641.
This commit is contained in:
Mark Payne 2022-01-06 16:23:26 -05:00 committed by Joe Gresock
parent a3e1f32cae
commit 3511ce3d13
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
9 changed files with 66 additions and 24 deletions

View File

@ -69,6 +69,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -108,6 +109,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final AtomicReference<String> comments = new AtomicReference<>();
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicBoolean transmitting = new AtomicBoolean(false);
private final AtomicBoolean configuredToTransmit = new AtomicBoolean(false);
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final SSLContext sslContext;
@ -183,7 +185,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
initialized = true;
backgroundThreadExecutor.submit(() -> {
backgroundThreadExecutor.schedule(() -> {
try {
refreshFlowContents();
} catch (final Exception e) {
@ -194,7 +196,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
logger.warn("Unable to communicate with remote instance {}", this, e);
}
}
});
}, 3, TimeUnit.SECONDS);
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 60L, TimeUnit.SECONDS);
@ -1042,6 +1044,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return transmitting.get();
}
@Override
public boolean isConfiguredToTransmit() {
return configuredToTransmit.get();
}
@Override
public void startTransmitting() {
writeLock.lock();
@ -1063,6 +1070,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
transmitting.set(true);
configuredToTransmit.set(true);
} finally {
writeLock.unlock();
}
@ -1081,13 +1089,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
scheduler.startPort(port);
transmitting.set(true);
configuredToTransmit.set(true);
} finally {
writeLock.unlock();
}
}
@Override
public void stopTransmitting() {
public Future<?> stopTransmitting() {
writeLock.lock();
try {
verifyCanStopTransmitting();
@ -1100,12 +1109,24 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
scheduler.stopPort(port);
}
// Wait for the ports to stop
configuredToTransmit.set(false);
return scheduler.submitFrameworkTask(this::waitForPortShutdown);
} finally {
writeLock.unlock();
}
}
private void waitForPortShutdown() {
// Wait for the ports to stop
try {
for (final RemoteGroupPort port : getInputPorts()) {
while (port.isRunning()) {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
@ -1115,13 +1136,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
transmitting.set(false);
} finally {
writeLock.unlock();
transmitting.set(false);
}
}
@ -1142,6 +1163,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@ -1163,6 +1185,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
configuredToTransmit.set(stillTransmitting);
transmitting.set(stillTransmitting);
} finally {
writeLock.unlock();

View File

@ -550,7 +550,7 @@ public abstract class AbstractEventAccess implements EventAccess {
}
status.setId(remoteGroup.getIdentifier());
status.setTransmissionStatus(remoteGroup.isTransmitting() ? TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
status.setTransmissionStatus(remoteGroup.isConfiguredToTransmit() ? TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
status.setActiveThreadCount(activeThreadCount);
status.setReceivedContentSize(receivedContentSize);
status.setReceivedCount(receivedCount);

View File

@ -30,6 +30,7 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
@ -107,11 +108,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
String getCommunicationsTimeout();
/**
* @return Indicates whether or not the RemoteProcessGroup is currently scheduled to
* transmit data
* @return Indicates whether or not the RemoteProcessGroup is currently configured to transmit data OR if there are any threads that are currently
* active due to previously being scheduled to transmit that have not completed yet.
*/
boolean isTransmitting();
/**
* @return <code>true</code> if the RPG is configured to transmit, <code>false</code> otherwise
*/
boolean isConfiguredToTransmit();
/**
* Initiates communications between this instance and the remote instance.
*/
@ -121,7 +127,7 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
* Immediately terminates communications between this instance and the
* remote instance.
*/
void stopTransmitting();
Future<?> stopTransmitting();
/**
* Initiates communications between this instance and the remote instance

View File

@ -1114,6 +1114,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
startRemoteGroupPortsAfterInitialization.clear();
}
flowManager.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
for (final Connection connection : flowManager.findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing();
}

View File

@ -708,8 +708,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// request to stop all remote process groups
flowManager.getRootGroup().findAllRemoteProcessGroups()
.stream().filter(rpg -> rpg.isTransmitting())
.forEach(RemoteProcessGroup::stopTransmitting);
.stream().filter(RemoteProcessGroup::isTransmitting)
.forEach(rpg -> {
try {
rpg.stopTransmitting().get(rpg.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
} catch (final Exception e) {
logger.warn("Encountered failure while waiting for {} to shutdown", rpg, e);
}
});
// offload all queues on node
final Set<Connection> connections = flowManager.findAllConnections();

View File

@ -60,7 +60,6 @@ import org.apache.nifi.groups.ComponentIdGenerator;
import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.groups.GroupSynchronizationOptions;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
@ -349,8 +348,6 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
// Inherit templates, now that all necessary Process Groups have been created
inheritTemplates(controller, versionedFlow);
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
}
inheritSnippets(controller, proposedFlow);

View File

@ -39,6 +39,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
@ -273,14 +274,21 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
session.commitAsync();
} catch (final Throwable t) {
final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString());
logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
if (logger.isDebugEnabled()) {
logger.error("", t);
// If Exception is a TransmissionDisabledException, it's because the user explicitly terminated the connection in the middle.
// No need to log errors for this, just debug log and move on. Otherwise, log the error.
if (t instanceof TransmissionDisabledException) {
logger.debug(message, t);
} else {
logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
if (logger.isDebugEnabled()) {
logger.error("", t);
}
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
}
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
transaction.error();
throw new ProcessException(t);
}
}

View File

@ -1870,7 +1870,7 @@ public final class DtoFactory {
dto.setName(group.getName());
dto.setPosition(createPositionDto(group.getPosition()));
dto.setComments(group.getComments());
dto.setTransmitting(group.isTransmitting());
dto.setTransmitting(group.isConfiguredToTransmit());
dto.setCommunicationsTimeout(group.getCommunicationsTimeout());
dto.setYieldDuration(group.getYieldDuration());
dto.setParentGroupId(group.getProcessGroup().getIdentifier());

View File

@ -443,9 +443,9 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
if (isNotNull(isTransmitting)) {
// start or stop as necessary
if (!remoteProcessGroup.isTransmitting() && isTransmitting) {
if (!remoteProcessGroup.isConfiguredToTransmit() && isTransmitting) {
remoteProcessGroup.startTransmitting();
} else if (remoteProcessGroup.isTransmitting() && !isTransmitting) {
} else if (remoteProcessGroup.isConfiguredToTransmit() && !isTransmitting) {
remoteProcessGroup.stopTransmitting();
}
}