NIFI-13123 MiNiFi async C2 operation processing

Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>

This closes #8738.
This commit is contained in:
Ferenc Kis 2024-05-02 16:03:24 +02:00 committed by Csaba Bejan
parent b87e9c8d9d
commit 52e257e16c
No known key found for this signature in database
GPG Key ID: C59951609F8BDDEB
20 changed files with 1183 additions and 634 deletions

View File

@ -57,6 +57,7 @@ public class C2ClientConfig {
private final long keepAliveDuration;
private final String c2RequestCompression;
private final String c2AssetDirectory;
private final long bootstrapAcknowledgeTimeout;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@ -86,6 +87,7 @@ public class C2ClientConfig {
this.keepAliveDuration = builder.keepAliveDuration;
this.c2RequestCompression = builder.c2RequestCompression;
this.c2AssetDirectory = builder.c2AssetDirectory;
this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
}
public String getC2Url() {
@ -196,6 +198,10 @@ public class C2ClientConfig {
return keepAliveDuration;
}
public long getBootstrapAcknowledgeTimeout() {
return bootstrapAcknowledgeTimeout;
}
/**
* Builder for client configuration.
*/
@ -231,6 +237,7 @@ public class C2ClientConfig {
private long keepAliveDuration;
private String c2RequestCompression;
private String c2AssetDirectory;
private long bootstrapAcknowledgeTimeout;
public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
@ -377,6 +384,11 @@ public class C2ClientConfig {
return this;
}
public Builder bootstrapAcknowledgeTimeout(long bootstrapAcknowledgeTimeout) {
this.bootstrapAcknowledgeTimeout = bootstrapAcknowledgeTimeout;
return this;
}
public C2ClientConfig build() {
return new C2ClientConfig(this);
}

View File

@ -1,149 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class C2ClientService {
private static final Logger logger = LoggerFactory.getLogger(C2ClientService.class);
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
private final C2OperationHandlerProvider c2OperationHandlerProvider;
private final RequestedOperationDAO requestedOperationDAO;
private final Consumer<C2Operation> c2OperationRegister;
private volatile boolean heartbeatLocked = false;
public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider c2OperationHandlerProvider,
RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.c2OperationHandlerProvider = c2OperationHandlerProvider;
this.requestedOperationDAO = requestedOperationDAO;
this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
if (heartbeatLocked) {
logger.debug("Heartbeats are locked, skipping sending for now");
} else {
try {
C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
} catch (Exception e) {
logger.error("Failed to send/process heartbeat:", e);
}
}
}
public void sendAcknowledge(C2OperationAck operationAck) {
try {
client.acknowledgeOperation(operationAck);
} catch (Exception e) {
logger.error("Failed to send acknowledge:", e);
}
}
public void enableHeartbeat() {
heartbeatLocked = false;
}
private void disableHeartbeat() {
heartbeatLocked = true;
}
public void handleRequestedOperations(List<C2Operation> requestedOperations) {
LinkedList<C2Operation> c2Operations = new LinkedList<>(requestedOperations);
C2Operation requestedOperation;
while ((requestedOperation = c2Operations.poll()) != null) {
Optional<C2OperationHandler> c2OperationHandler = c2OperationHandlerProvider.getHandlerForOperation(requestedOperation);
if (!c2OperationHandler.isPresent()) {
continue;
}
C2OperationHandler operationHandler = c2OperationHandler.get();
C2OperationAck c2OperationAck = operationHandler.handle(requestedOperation);
if (requiresRestart(operationHandler, c2OperationAck)) {
if (initiateRestart(c2Operations, requestedOperation)) {
return;
}
C2OperationState c2OperationState = new C2OperationState();
c2OperationState.setState(OperationState.NOT_APPLIED);
c2OperationAck.setOperationState(c2OperationState);
}
sendAcknowledge(c2OperationAck);
}
enableHeartbeat();
requestedOperationDAO.cleanup();
}
private void processResponse(C2HeartbeatResponse response) {
List<C2Operation> requestedOperations = response.getRequestedOperations();
if (requestedOperations != null && !requestedOperations.isEmpty()) {
logger.info("Received {} operations from the C2 server", requestedOperations.size());
handleRequestedOperations(requestedOperations);
} else {
logger.trace("No operations received from the C2 server in the server. Nothing to do.");
}
}
private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
}
private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
return Optional.ofNullable(c2OperationAck)
.map(C2OperationAck::getOperationState)
.map(C2OperationState::getState)
.filter(FULLY_APPLIED::equals)
.isPresent();
}
private boolean initiateRestart(LinkedList<C2Operation> requestedOperations, C2Operation requestedOperation) {
try {
disableHeartbeat();
requestedOperationDAO.save(new OperationQueue(requestedOperation, requestedOperations));
c2OperationRegister.accept(requestedOperation);
return true;
} catch (Exception e) {
logger.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e);
requestedOperationDAO.cleanup();
}
return false;
}
}

View File

@ -17,6 +17,13 @@
package org.apache.nifi.c2.client.service;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingInt;
import static java.util.Map.entry;
import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.toSet;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.File;
@ -25,13 +32,12 @@ import java.lang.management.OperatingSystemMXBean;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.PersistentUuidGenerator;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@ -137,8 +143,7 @@ public class C2HeartbeatFactory {
}
private DeviceInfo generateDeviceInfo() {
// Populate DeviceInfo
final DeviceInfo deviceInfo = new DeviceInfo();
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.setNetworkInfo(generateNetworkInfo());
deviceInfo.setIdentifier(getDeviceIdentifier(deviceInfo.getNetworkInfo()));
deviceInfo.setSystemInfo(generateSystemInfo());
@ -146,67 +151,63 @@ public class C2HeartbeatFactory {
}
private NetworkInfo generateNetworkInfo() {
NetworkInfo networkInfo = new NetworkInfo();
try {
// Determine all interfaces
final Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
Set<NetworkInterface> eligibleInterfaces = list(getNetworkInterfaces())
.stream()
.filter(this::isEligibleInterface)
.collect(toSet());
final Set<NetworkInterface> operationIfaces = new HashSet<>();
// Determine eligible interfaces
while (networkInterfaces.hasMoreElements()) {
final NetworkInterface networkInterface = networkInterfaces.nextElement();
if (!networkInterface.isLoopback() && networkInterface.isUp()) {
operationIfaces.add(networkInterface);
}
if (logger.isTraceEnabled()) {
logger.trace("Found {} eligible interfaces with names {}", eligibleInterfaces.size(),
eligibleInterfaces.stream()
.map(NetworkInterface::getName)
.collect(toSet())
);
}
logger.trace("Have {} interfaces with names {}", operationIfaces.size(),
operationIfaces.stream()
.map(NetworkInterface::getName)
.collect(Collectors.toSet())
);
if (!operationIfaces.isEmpty()) {
if (operationIfaces.size() > 1) {
logger.debug("Instance has multiple interfaces. Generated information may be non-deterministic.");
}
boolean networkInfoUnset = true;
for (NetworkInterface networkInterface : operationIfaces) {
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
// IPv4 address is preferred over IPv6 as it provides more readable information for the user
if (inetAddress instanceof Inet4Address) {
updateNetworkInfo(networkInfo, networkInterface, inetAddress);
return networkInfo;
}
if (networkInfoUnset) {
updateNetworkInfo(networkInfo, networkInterface, inetAddress);
networkInfoUnset = false;
}
}
}
}
Comparator<Map.Entry<NetworkInterface, InetAddress>> orderByIp4AddressesFirst = comparingInt(item -> item.getValue() instanceof Inet4Address ? 0 : 1);
Comparator<Map.Entry<NetworkInterface, InetAddress>> orderByNetworkInterfaceName = comparing(entry -> entry.getKey().getName());
return eligibleInterfaces.stream()
.flatMap(networkInterface -> list(networkInterface.getInetAddresses())
.stream()
.map(inetAddress -> entry(networkInterface, inetAddress)))
.sorted(orderByIp4AddressesFirst.thenComparing(orderByNetworkInterfaceName))
.findFirst()
.map(entry -> createNetworkInfo(entry.getKey(), entry.getValue()))
.orElseGet(NetworkInfo::new);
} catch (Exception e) {
logger.error("Network Interface processing failed", e);
return new NetworkInfo();
}
return networkInfo;
}
private void updateNetworkInfo(NetworkInfo networkInfo, NetworkInterface networkInterface, InetAddress inetAddress) {
private boolean isEligibleInterface(NetworkInterface networkInterface) {
try {
return !networkInterface.isLoopback()
&& !networkInterface.isVirtual()
&& networkInterface.isUp()
&& nonNull(networkInterface.getHardwareAddress());
} catch (SocketException e) {
logger.warn("Error processing network interface", e);
return false;
}
}
private NetworkInfo createNetworkInfo(NetworkInterface networkInterface, InetAddress inetAddress) {
NetworkInfo networkInfo = new NetworkInfo();
networkInfo.setDeviceId(networkInterface.getName());
networkInfo.setHostname(inetAddress.getHostName());
networkInfo.setIpAddress(inetAddress.getHostAddress());
return networkInfo;
}
private String getDeviceIdentifier(NetworkInfo networkInfo) {
if (deviceId == null) {
if (networkInfo.getDeviceId() != null) {
try {
final NetworkInterface netInterface = NetworkInterface.getByName(networkInfo.getDeviceId());
NetworkInterface netInterface = NetworkInterface.getByName(networkInfo.getDeviceId());
byte[] hardwareAddress = netInterface.getHardwareAddress();
final StringBuilder macBuilder = new StringBuilder();
StringBuilder macBuilder = new StringBuilder();
if (hardwareAddress != null) {
for (byte address : hardwareAddress) {
macBuilder.append(String.format("%02X", address));
@ -221,7 +222,6 @@ public class C2HeartbeatFactory {
deviceId = getConfiguredDeviceId();
}
}
return deviceId;
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service;
import static java.util.Optional.ofNullable;
import static java.util.function.Predicate.not;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class C2HeartbeatManager implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(C2HeartbeatManager.class);
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
private final ReentrantLock heartbeatLock;
private final RuntimeInfoWrapper runtimeInfoWrapper;
private final C2OperationManager c2OperationManager;
public C2HeartbeatManager(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, ReentrantLock heartbeatLock, RuntimeInfoWrapper runtimeInfoWrapper,
C2OperationManager c2OperationManager) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.heartbeatLock = heartbeatLock;
this.runtimeInfoWrapper = runtimeInfoWrapper;
this.c2OperationManager = c2OperationManager;
}
@Override
public void run() {
if (!heartbeatLock.tryLock()) {
LOGGER.debug("Heartbeat lock is hold by another thread, skipping heartbeat sending");
return;
}
try {
LOGGER.debug("Heartbeat lock is acquired, sending heartbeat");
C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
} catch (Exception e) {
LOGGER.error("Failed to send/process heartbeat", e);
} finally {
heartbeatLock.unlock();
LOGGER.debug("Heartbeat unlocked lock and heartbeat is sent successfully");
}
}
private void processResponse(C2HeartbeatResponse response) {
ofNullable(response.getRequestedOperations())
.filter(not(List::isEmpty))
.ifPresentOrElse(operations -> {
LOGGER.info("Received {} operations from the C2 server", operations.size());
operations.forEach(c2OperationManager::add);
},
() -> LOGGER.debug("No operations received from the C2 server")
);
}
}

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
import static java.util.function.Predicate.not;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class C2OperationManager implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(C2OperationManager.class);
private final C2Client client;
private final C2OperationHandlerProvider c2OperationHandlerProvider;
private final ReentrantLock heartbeatLock;
private final OperationQueueDAO operationQueueDAO;
private final C2OperationRestartHandler c2OperationRestartHandler;
private final BlockingQueue<C2Operation> c2Operations;
public C2OperationManager(C2Client client, C2OperationHandlerProvider c2OperationHandlerProvider, ReentrantLock heartbeatLock,
OperationQueueDAO operationQueueDAO, C2OperationRestartHandler c2OperationRestartHandler) {
this.client = client;
this.c2OperationHandlerProvider = c2OperationHandlerProvider;
this.heartbeatLock = heartbeatLock;
this.operationQueueDAO = operationQueueDAO;
this.c2OperationRestartHandler = c2OperationRestartHandler;
this.c2Operations = new LinkedBlockingQueue<>();
}
public void add(C2Operation c2Operation) {
try {
c2Operations.put(c2Operation);
} catch (InterruptedException e) {
LOGGER.warn("Thread was interrupted", e);
}
}
@Override
public void run() {
processRestartState();
while (true) {
C2Operation operation;
try {
operation = c2Operations.take();
} catch (InterruptedException e) {
LOGGER.warn("Thread was interrupted", e);
return;
}
LOGGER.debug("Processing operation {}", operation);
C2OperationHandler operationHandler = c2OperationHandlerProvider.getHandlerForOperation(operation).orElse(null);
if (operationHandler == null) {
LOGGER.debug("No handler is present for C2 Operation {}, available handlers {}", operation, c2OperationHandlerProvider.getHandlers());
continue;
}
C2OperationAck c2OperationAck = operationHandler.handle(operation);
if (!requiresRestart(operationHandler, c2OperationAck)) {
LOGGER.debug("No restart is required. Sending ACK to C2 server {}", c2OperationAck);
sendAcknowledge(c2OperationAck);
continue;
}
heartbeatLock.lock();
LOGGER.debug("Restart is required. Heartbeats are stopped until restart is completed");
Optional<C2OperationState> restartState = initRestart(operation);
if (!restartState.isPresent()) {
LOGGER.debug("Restart in progress, stopping C2OperationManager");
break;
}
try {
C2OperationState failedState = restartState.get();
LOGGER.debug("Restart handler returned with a failed state {}", failedState);
c2OperationAck.setOperationState(failedState);
sendAcknowledge(c2OperationAck);
} finally {
operationQueueDAO.cleanup();
LOGGER.debug("Heartbeats are enabled again");
heartbeatLock.unlock();
}
}
}
private void processRestartState() {
Optional<OperationQueue> operationQueue = operationQueueDAO.load();
operationQueue.map(OperationQueue::getRemainingOperations)
.filter(not(List::isEmpty))
.ifPresent(this::processRemainingOperations);
operationQueue.map(OperationQueue::getCurrentOperation)
.ifPresentOrElse(this::processCurrentOperation,
() -> LOGGER.debug("No operation to acknowledge to C2 server"));
operationQueue.ifPresent(__ -> operationQueueDAO.cleanup());
}
private void processRemainingOperations(List<C2Operation> remainingOperations) {
LOGGER.debug("Found remaining operations operations after restart. Heartbeats are stopped until processing is completed");
heartbeatLock.lock();
try {
List<C2Operation> mergedOperations = new LinkedList<>();
mergedOperations.addAll(remainingOperations);
mergedOperations.addAll(c2Operations);
c2Operations.clear();
mergedOperations.forEach(c2Operations::add);
} catch (Exception e) {
LOGGER.warn("Unable to recover operations from operation queue", e);
} finally {
heartbeatLock.unlock();
LOGGER.debug("Heartbeat lock released");
}
}
private void processCurrentOperation(C2Operation operation) {
LOGGER.debug("Found operation {} to acknowledge to C2 server", operation);
C2OperationState c2OperationState = c2OperationRestartHandler.waitForResponse()
.map(this::c2OperationState)
.orElse(c2OperationState(NOT_APPLIED));
C2OperationAck c2OperationAck = new C2OperationAck();
c2OperationAck.setOperationId(operation.getIdentifier());
c2OperationAck.setOperationState(c2OperationState);
sendAcknowledge(c2OperationAck);
}
private Optional<C2OperationState> initRestart(C2Operation operation) {
try {
LOGGER.debug("Restart initiated");
OperationQueue operationQueue = OperationQueue.create(operation, c2Operations);
operationQueueDAO.save(operationQueue);
return c2OperationRestartHandler.handleRestart(operation).map(this::c2OperationState);
} catch (Exception e) {
LOGGER.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e);
return of(c2OperationState(NOT_APPLIED));
}
}
private C2OperationState c2OperationState(OperationState operationState) {
C2OperationState c2OperationState = new C2OperationState();
c2OperationState.setState(operationState);
return c2OperationState;
}
private void sendAcknowledge(C2OperationAck operationAck) {
try {
client.acknowledgeOperation(operationAck);
} catch (Exception e) {
LOGGER.error("Failed to send acknowledge", e);
}
}
private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
}
private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
return ofNullable(c2OperationAck)
.map(C2OperationAck::getOperationState)
.map(C2OperationState::getState)
.filter(FULLY_APPLIED::equals)
.isPresent();
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationState;
public interface C2OperationRestartHandler {
Optional<C2OperationState.OperationState> handleRestart(C2Operation c2Operation);
Optional<C2OperationState.OperationState> waitForResponse();
}

View File

@ -17,10 +17,12 @@
package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import org.apache.nifi.c2.protocol.api.C2Operation;
public class OperationQueue implements Serializable {
@ -29,12 +31,23 @@ public class OperationQueue implements Serializable {
private C2Operation currentOperation;
private List<C2Operation> remainingOperations;
public static OperationQueue create(C2Operation currentOperation, Queue<C2Operation> remainingOperations) {
return new OperationQueue(
currentOperation,
ofNullable(remainingOperations)
.map(queue -> queue.stream().toList())
.orElseGet(List::of)
);
}
public OperationQueue() {
}
public OperationQueue(C2Operation currentOperation, List<C2Operation> remainingOperations) {
this.currentOperation = currentOperation;
this.remainingOperations = remainingOperations == null ? Collections.emptyList() : remainingOperations;
this.remainingOperations = remainingOperations;
}
public C2Operation getCurrentOperation() {

View File

@ -22,7 +22,7 @@ import java.util.Optional;
/**
* The purpose of this interface is to be able to persist operations between restarts.
*/
public interface RequestedOperationDAO {
public interface OperationQueueDAO {
/**
* Persist the given requested operation list

View File

@ -1,271 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class C2ClientServiceTest {
@Mock
private C2Client client;
@Mock
private C2HeartbeatFactory c2HeartbeatFactory;
@Mock
private C2OperationHandlerProvider operationService;
@Mock
private RuntimeInfoWrapper runtimeInfoWrapper;
@Mock
private RequestedOperationDAO requestedOperationDAO;
@Mock
private Consumer<C2Operation> c2OperationRegister;
@InjectMocks
private C2ClientService c2ClientService;
@Test
void testSendHeartbeatAndAckWhenOperationPresent() {
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
final List<C2Operation> c2Operations = generateOperation(1);
hbResponse.setRequestedOperations(c2Operations);
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
when(c2OperationHandler.handle(c2Operations.get(0))).thenReturn(new C2OperationAck());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
verify(c2OperationHandler).handle(any());
verify(client).acknowledgeOperation(any());
}
@Test
void testSendHeartbeatAndAckForMultipleOperationPresent() {
int operationNum = 5;
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
hbResponse.setRequestedOperations(generateOperation(operationNum));
C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
when(c2OperationHandler.handle(any())).thenReturn(new C2OperationAck());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
verify(c2OperationHandler, times(operationNum)).handle(any());
verify(client, times(operationNum)).acknowledgeOperation(any());
}
@Test
void testSendHeartbeatHandlesNoHeartbeatResponse() {
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
verify(client, times(0)).acknowledgeOperation(any());
}
@Test
void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() {
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
verify(client, times(0)).acknowledgeOperation(any());
}
@Test
void testSendHeartbeatNotAckWhenOperationAckMissing() {
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
hbResponse.setRequestedOperations(generateOperation(1));
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
verify(client, times(0)).acknowledgeOperation(any());
}
@Test
void shouldHeartbeatSendingNotPropagateExceptions() {
when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new RuntimeException());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
}
@Test
void shouldAckSendingNotPropagateExceptions() {
C2OperationAck operationAck = mock(C2OperationAck.class);
doThrow(new RuntimeException()).when(client).acknowledgeOperation(operationAck);
c2ClientService.sendAcknowledge(operationAck);
}
@Test
void shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState() {
C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
C2OperationAck operationAck = new C2OperationAck();
C2OperationState c2OperationState = new C2OperationState();
c2OperationState.setState(OperationState.NOT_APPLIED);
operationAck.setOperationState(c2OperationState);
when(c2OperationHandler.requiresRestart()).thenReturn(true);
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(operationAck);
c2ClientService.handleRequestedOperations(generateOperation(1));
verify(operationService).getHandlerForOperation(any(C2Operation.class));
verify(c2OperationHandler).handle(any(C2Operation.class));
verify(requestedOperationDAO).cleanup();
verify(client).acknowledgeOperation(operationAck);
verifyNoMoreInteractions(operationService, client, requestedOperationDAO);
verifyNoInteractions(c2HeartbeatFactory, c2OperationRegister);
}
@Test
void shouldSaveOperationQueueIfRestartIsNeededAndThereAreMultipleRequestedOperations() {
C2Operation c2Operation1 = new C2Operation();
c2Operation1.setIdentifier("1");
C2Operation c2Operation2 = new C2Operation();
c2Operation2.setIdentifier("2");
C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
when(c2OperationHandler.requiresRestart()).thenReturn(true);
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
C2OperationAck c2OperationAck = new C2OperationAck();
C2OperationState c2OperationState = new C2OperationState();
c2OperationState.setState(OperationState.FULLY_APPLIED);
c2OperationAck.setOperationState(c2OperationState);
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
c2ClientService.handleRequestedOperations(Arrays.asList(c2Operation1, c2Operation2));
verify(requestedOperationDAO).save(new OperationQueue(c2Operation1, Collections.singletonList(c2Operation2)));
verify(c2OperationRegister).accept(c2Operation1);
verifyNoInteractions(client, c2HeartbeatFactory);
verifyNoMoreInteractions(requestedOperationDAO, c2OperationRegister, operationService);
}
@Test
void shouldReEnableHeartbeatsIfExceptionHappensDuringRegisteringOperationAndThereIsNoMoreOperationInQueue() {
C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
C2Operation operation = new C2Operation();
when(c2OperationHandler.requiresRestart()).thenReturn(true);
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
C2OperationAck c2OperationAck = new C2OperationAck();
C2OperationState c2OperationState = new C2OperationState();
c2OperationState.setState(OperationState.FULLY_APPLIED);
c2OperationAck.setOperationState(c2OperationState);
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
doThrow(new RuntimeException()).when(c2OperationRegister).accept(any(C2Operation.class));
c2ClientService.handleRequestedOperations(Collections.singletonList(operation));
when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenReturn(new C2Heartbeat());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(runtimeInfoWrapper);
verify(client).publishHeartbeat(any(C2Heartbeat.class));
}
@Test
void shouldContinueWithRemainingOperationsIfExceptionHappensDuringRegisteringOperationAndThereAreMoreOperationsInQueue() {
C2OperationHandler c2OperationHandlerForRestart = mock(C2OperationHandler.class);
C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
C2Operation operation1 = new C2Operation();
operation1.setIdentifier("1");
C2Operation operation2 = new C2Operation();
operation2.setIdentifier("2");
C2OperationAck c2OperationAck = new C2OperationAck();
C2OperationState c2OperationState = new C2OperationState();
c2OperationState.setState(OperationState.FULLY_APPLIED);
c2OperationAck.setOperationState(c2OperationState);
when(c2OperationHandler.requiresRestart()).thenReturn(false);
when(c2OperationHandlerForRestart.requiresRestart()).thenReturn(true);
when(operationService.getHandlerForOperation(operation1)).thenReturn(Optional.of(c2OperationHandlerForRestart));
when(operationService.getHandlerForOperation(operation2)).thenReturn(Optional.of(c2OperationHandler));
when(c2OperationHandlerForRestart.handle(operation1)).thenReturn(c2OperationAck);
when(c2OperationHandler.handle(operation2)).thenReturn(c2OperationAck);
doThrow(new RuntimeException()).when(c2OperationRegister).accept(operation1);
c2ClientService.handleRequestedOperations(Arrays.asList(operation1, operation2));
verify(client, times(2)).acknowledgeOperation(c2OperationAck);
}
private List<C2Operation> generateOperation(int num) {
return IntStream.range(0, num)
.mapToObj(x -> new C2Operation())
.collect(Collectors.toList());
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class C2HeartbeatManagerTest {
@Mock
private C2Client mockC2Client;
@Mock
private C2HeartbeatFactory mockC2HeartbeatFactory;
@Mock
private ReentrantLock mockHeartbeatLock;
@Mock
private RuntimeInfoWrapper mockRuntimeInfoWrapper;
@Mock
private C2OperationManager mockC2OperationManager;
@InjectMocks
private C2HeartbeatManager testHeartbeatManager;
@Test
void shouldSkipSendingHeartbeatIfHeartbeatLockIsAcquired() {
when(mockHeartbeatLock.tryLock()).thenReturn(false);
testHeartbeatManager.run();
verify(mockC2HeartbeatFactory, never()).create(any());
verify(mockC2Client, never()).publishHeartbeat(any());
verify(mockC2OperationManager, never()).add(any());
verify(mockHeartbeatLock, never()).unlock();
}
@Test
void shouldSendHeartbeatAndProcessEmptyResponse() {
when(mockHeartbeatLock.tryLock()).thenReturn(true);
C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(empty());
testHeartbeatManager.run();
verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper);
verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
verify(mockC2OperationManager, never()).add(any());
verify(mockHeartbeatLock, times(1)).unlock();
}
@Test
void shouldSendHeartbeatAndProcessResponseWithNoOperation() {
when(mockHeartbeatLock.tryLock()).thenReturn(true);
C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
C2HeartbeatResponse mockC2HeartbeatResponse = mock(C2HeartbeatResponse.class);
when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of());
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(Optional.of(mockC2HeartbeatResponse));
testHeartbeatManager.run();
verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper);
verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
verify(mockC2OperationManager, never()).add(any());
verify(mockHeartbeatLock, times(1)).unlock();
}
@Test
void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() {
when(mockHeartbeatLock.tryLock()).thenReturn(true);
C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
C2HeartbeatResponse mockC2HeartbeatResponse = mock(C2HeartbeatResponse.class);
C2Operation mockOperation1 = mock(C2Operation.class);
C2Operation mockOperation2 = mock(C2Operation.class);
when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of(mockOperation1, mockOperation2));
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(ofNullable(mockC2HeartbeatResponse));
testHeartbeatManager.run();
verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper);
verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
verify(mockC2OperationManager, times(1)).add(mockOperation1);
verify(mockC2OperationManager, times(1)).add(mockOperation2);
verify(mockHeartbeatLock, times(1)).unlock();
}
@Test
void shouldReleaseHeartbeatLockWhenExceptionOccurs() {
when(mockHeartbeatLock.tryLock()).thenReturn(true);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenThrow(new RuntimeException());
testHeartbeatManager.run();
verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper);
verify(mockC2Client, never()).publishHeartbeat(any());
verify(mockC2OperationManager, never()).add(any());
verify(mockHeartbeatLock, times(1)).unlock();
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class C2OperationManagerTest {
private static final long MAX_WAIT_TIME_MS = 1000;
@Mock
private C2Client mockC2Client;
@Mock
private C2OperationHandlerProvider mockC2OperationHandlerProvider;
@Mock
private ReentrantLock mockHeartbeatLock;
@Mock
private OperationQueueDAO mockOperationQueueDAO;
@Mock
private C2OperationRestartHandler mockC2OperationRestartHandler;
@InjectMocks
private C2OperationManager testC2OperationManager;
@Captor
ArgumentCaptor<C2OperationAck> c2OperationAckArgumentCaptor;
private ExecutorService executorService;
@BeforeEach
void setup() {
executorService = newVirtualThreadPerTaskExecutor();
}
@AfterEach
void teardown() {
executorService.shutdownNow();
}
@Test
void shouldWaitForIncomingOperationThenTimeout() {
Future<?> future = executorService.submit(testC2OperationManager);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockC2OperationHandlerProvider, never()).getHandlerForOperation(any());
}
@Test
void shouldContinueWithoutProcessingWhenNoHandlerIsDefined() {
C2Operation testOperation = mock(C2Operation.class);
when(mockC2OperationHandlerProvider.getHandlerForOperation(testOperation)).thenReturn(empty());
Future<?> future = executorService.submit(testC2OperationManager);
testC2OperationManager.add(testOperation);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockC2Client, never()).acknowledgeOperation(any());
verify(mockHeartbeatLock, never()).lock();
}
@Test
void shouldProcessOperationWithoutRestartAndAcknowledge() {
C2Operation mockOperation = mock(C2Operation.class);
C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class);
C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
when(mockOperationHandler.requiresRestart()).thenReturn(false);
Future<?> future = executorService.submit(testC2OperationManager);
testC2OperationManager.add(mockOperation);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockC2Client, times(1)).acknowledgeOperation(mockC2OperationAck);
verify(mockHeartbeatLock, never()).lock();
}
@Test
void shouldProcessOperationWithSuccessfulRestart() {
C2Operation mockOperation = mock(C2Operation.class);
C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class);
C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
C2OperationState mockC2OperationState = mock(C2OperationState.class);
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
when(mockOperationHandler.requiresRestart()).thenReturn(true);
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(empty());
Future<?> future = executorService.submit(testC2OperationManager);
testC2OperationManager.add(mockOperation);
assertDoesNotThrow(() -> future.get());
verify(mockC2Client, never()).acknowledgeOperation(mockC2OperationAck);
verify(mockHeartbeatLock, times(1)).lock();
verify(mockHeartbeatLock, never()).unlock();
verify(mockOperationQueueDAO, times(1)).save(any());
verify(mockOperationQueueDAO, never()).cleanup();
}
@Test
void shouldProcessOperationWithFailedRestartDueToFailedResponse() {
C2Operation mockOperation = mock(C2Operation.class);
C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class);
C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
C2OperationState mockC2OperationState = mock(C2OperationState.class);
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
when(mockOperationHandler.requiresRestart()).thenReturn(true);
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(ofNullable(NOT_APPLIED));
Future<?> future = executorService.submit(testC2OperationManager);
testC2OperationManager.add(mockOperation);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockHeartbeatLock, times(1)).lock();
verify(mockHeartbeatLock, times(1)).unlock();
verify(mockC2Client, times(1)).acknowledgeOperation(mockC2OperationAck);
verify(mockOperationQueueDAO, times(1)).save(any());
verify(mockOperationQueueDAO, times(1)).cleanup();
}
@Test
void shouldProcessOperationWithFailedRestartDueToException() {
C2Operation mockOperation = mock(C2Operation.class);
C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class);
C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
C2OperationState mockC2OperationState = mock(C2OperationState.class);
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
when(mockOperationHandler.requiresRestart()).thenReturn(true);
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenThrow(new RuntimeException());
Future<?> future = executorService.submit(testC2OperationManager);
testC2OperationManager.add(mockOperation);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockHeartbeatLock, times(1)).lock();
verify(mockHeartbeatLock, times(1)).unlock();
verify(mockC2Client, times(1)).acknowledgeOperation(mockC2OperationAck);
verify(mockOperationQueueDAO, times(1)).save(any());
verify(mockOperationQueueDAO, times(1)).cleanup();
}
@Test
void shouldProcessStateWithOneCurrentAndNoRemainingOperations() {
OperationQueue mockOperationQueue = mock(OperationQueue.class);
C2Operation mockCurrentOperation = mock(C2Operation.class);
when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation);
when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of());
when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue));
when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED));
Future<?> future = executorService.submit(testC2OperationManager);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockHeartbeatLock, never()).lock();
verify(mockHeartbeatLock, never()).unlock();
verify(mockC2Client, times(1)).acknowledgeOperation(c2OperationAckArgumentCaptor.capture());
assertEquals(FULLY_APPLIED, c2OperationAckArgumentCaptor.getValue().getOperationState().getState());
}
@Test
void shouldProcessStateWithOneCurrentAndOneRemainingOperation() {
OperationQueue mockOperationQueue = mock(OperationQueue.class);
C2Operation mockCurrentOperation = mock(C2Operation.class);
C2Operation mockRemainingOperation = mock(C2Operation.class);
when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation);
when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of(mockRemainingOperation));
when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue));
when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED));
C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class);
C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockRemainingOperation)).thenReturn(ofNullable(mockOperationHandler));
when(mockOperationHandler.handle(mockRemainingOperation)).thenReturn(mockC2OperationAck);
when(mockOperationHandler.requiresRestart()).thenReturn(false);
Future<?> future = executorService.submit(testC2OperationManager);
assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
verify(mockHeartbeatLock, times(1)).lock();
verify(mockHeartbeatLock, times(1)).unlock();
verify(mockC2Client, times(2)).acknowledgeOperation(any());
}
}

View File

@ -91,6 +91,7 @@ public enum MiNiFiProperties {
C2_SECURITY_KEYSTORE_PASSWORD("c2.security.keystore.password", "", true, false, VALID),
C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID),
C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID),
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.bootstrap.CommandResult;
import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BootstrapC2OperationRestartHandler implements C2OperationRestartHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapC2OperationRestartHandler.class);
private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
private static final String TIMEOUT = "timeout";
private static final Map<MiNiFiCommandState, OperationState> OPERATION_STATE_MAP = Map.of(
MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED,
MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION,
MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, NOT_APPLIED,
MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, NOT_APPLIED);
private final BootstrapCommunicator bootstrapCommunicator;
private final BlockingQueue<OperationState> operationStateHolder;
private final long bootstrapAcknowledgeTimeoutMs;
public BootstrapC2OperationRestartHandler(BootstrapCommunicator bootstrapCommunicator, long bootstrapAcknowledgeTimeoutMs) {
this.bootstrapCommunicator = bootstrapCommunicator;
this.operationStateHolder = new ArrayBlockingQueue<>(1);
this.bootstrapAcknowledgeTimeoutMs = bootstrapAcknowledgeTimeoutMs;
bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, this::bootstrapCallback);
}
@Override
public Optional<OperationState> handleRestart(C2Operation c2Operation) {
CommandResult sendCommandResult = sendBootstrapCommand(c2Operation);
if (sendCommandResult == SUCCESS) {
LOGGER.debug("Bootstrap successfully received command. Waiting for response");
return waitForResponse();
} else {
LOGGER.debug("Bootstrap failed to receive command");
return Optional.of(NOT_APPLIED);
}
}
@Override
public Optional<OperationState> waitForResponse() {
try {
OperationState operationState = operationStateHolder.poll(bootstrapAcknowledgeTimeoutMs, MILLISECONDS);
LOGGER.debug("Bootstrap returned response: {}", ofNullable(operationState).map(Objects::toString).orElse(TIMEOUT));
return Optional.of(ofNullable(operationState).orElse(NOT_APPLIED));
} catch (InterruptedException e) {
LOGGER.debug("Bootstrap response waiting interrupted, possible due to Bootstrap is restarting MiNiFi process");
return empty();
}
}
private CommandResult sendBootstrapCommand(C2Operation c2Operation) {
String command = createBootstrapCommand(c2Operation);
try {
return bootstrapCommunicator.sendCommand(command);
} catch (IOException e) {
LOGGER.error("Failed to send operation to bootstrap", e);
return FAILURE;
}
}
private String createBootstrapCommand(C2Operation c2Operation) {
return ofNullable(c2Operation.getOperand())
.map(operand -> c2Operation.getOperation().name() + "_" + operand.name())
.orElse(c2Operation.getOperation().name());
}
private void bootstrapCallback(String[] params, OutputStream outputStream) {
LOGGER.info("Received acknowledge message from bootstrap process");
if (params.length < 1) {
LOGGER.error("Invalid arguments coming from bootstrap");
return;
}
MiNiFiCommandState miNiFiCommandState = MiNiFiCommandState.valueOf(params[0]);
OperationState operationState = OPERATION_STATE_MAP.get(miNiFiCommandState);
try {
operationStateHolder.put(operationState);
} catch (InterruptedException e) {
LOGGER.warn("Bootstrap hook thread was interrupted");
}
}
}

View File

@ -17,12 +17,19 @@
package org.apache.nifi.minifi.c2;
import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_CLASS;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENTIFIER;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_ASSET_DIRECTORY;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_CONFIG_DIRECTORY;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FULL_HEARTBEAT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
@ -45,24 +52,24 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KE
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_LOCATION;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE;
import static org.apache.nifi.util.FormatUtils.getPreciseTimeDuration;
import static org.apache.nifi.util.NiFiProperties.FLOW_CONFIGURATION_FILE;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.http.C2HttpClient;
import org.apache.nifi.c2.client.service.C2ClientService;
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
import org.apache.nifi.c2.client.service.C2HeartbeatManager;
import org.apache.nifi.c2.client.service.C2OperationManager;
import org.apache.nifi.c2.client.service.FlowIdHolder;
import org.apache.nifi.c2.client.service.ManifestHashProvider;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@ -70,8 +77,7 @@ import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
@ -81,10 +87,6 @@ import org.apache.nifi.c2.client.service.operation.UpdatePropertiesOperationHand
import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.controller.FlowController;
@ -99,14 +101,13 @@ import org.apache.nifi.minifi.c2.command.PropertiesPersister;
import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.nar.ExtensionManagerHolder;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -115,35 +116,25 @@ public class C2NifiClientService {
private static final Logger LOGGER = LoggerFactory.getLogger(C2NifiClientService.class);
private static final String ROOT_GROUP_ID = "root";
private static final Long INITIAL_DELAY = 10000L;
private static final Integer TERMINATION_WAIT = 5000;
private static final int MINIFI_RESTART_TIMEOUT_SECONDS = 60;
private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
private static final int IS_ACK_RECEIVED_POLL_INTERVAL = 1000;
private static final int MAX_WAIT_FOR_BOOTSTRAP_ACK_MS = 20000;
private static final Long INITIAL_HEARTBEAT_DELAY_MS = 10000L;
private static final Map<MiNiFiCommandState, OperationState> OPERATION_STATE_MAP = Map.of(
MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED,
MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION,
MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, OperationState.NOT_APPLIED,
MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, OperationState.NOT_APPLIED);
private final ScheduledExecutorService heartbeatManagerExecutorService;
private final ExecutorService operationManagerExecutorService;
private final C2ClientService c2ClientService;
private final FlowController flowController;
private final ScheduledThreadPoolExecutor heartbeatExecutorService;
private final ScheduledThreadPoolExecutor bootstrapAcknowledgeExecutorService;
private final ExtensionManifestParser extensionManifestParser;
private final RuntimeManifestService runtimeManifestService;
private final SupportedOperationsProvider supportedOperationsProvider;
private final RequestedOperationDAO requestedOperationDAO;
private final BootstrapCommunicator bootstrapCommunicator;
private final C2HeartbeatManager c2HeartbeatManager;
private final C2OperationManager c2OperationManager;
private final long heartbeatPeriod;
private volatile boolean ackReceived = false;
public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowController, BootstrapCommunicator bootstrapCommunicator, FlowService flowService) {
this.heartbeatExecutorService = new ScheduledThreadPoolExecutor(1);
this.bootstrapAcknowledgeExecutorService = new ScheduledThreadPoolExecutor(1);
this.heartbeatManagerExecutorService = newScheduledThreadPool(1);
this.operationManagerExecutorService = newSingleThreadExecutor();
this.extensionManifestParser = new JAXBExtensionManifestParser();
C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
@ -160,36 +151,34 @@ public class C2NifiClientService {
C2HttpClient client = C2HttpClient.create(clientConfig, new C2JacksonSerializer());
FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
this.requestedOperationDAO = new FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), new ObjectMapper());
String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file");
C2OperationHandlerProvider c2OperationHandlerProvider = c2OperationHandlerProvider(niFiProperties, flowController, flowService, flowIdHolder,
client, heartbeatFactory, bootstrapConfigFileLocation, clientConfig.getC2AssetDirectory());
this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
this.bootstrapCommunicator = bootstrapCommunicator;
this.bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, (params, output) -> acknowledgeHandler(params));
OperationQueueDAO operationQueueDAO =
new FileBasedOperationQueueDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), new ObjectMapper());
ReentrantLock heartbeatLock = new ReentrantLock();
BootstrapC2OperationRestartHandler c2OperationRestartHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, clientConfig.getBootstrapAcknowledgeTimeout());
this.c2OperationManager = new C2OperationManager(
client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler);
this.c2HeartbeatManager = new C2HeartbeatManager(
client, heartbeatFactory, heartbeatLock, generateRuntimeInfo(), c2OperationManager);
}
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
return new C2ClientConfig.Builder()
.agentClass(properties.getProperty(C2_AGENT_CLASS.getKey(), C2_AGENT_CLASS.getDefaultValue()))
.agentIdentifier(properties.getProperty(C2_AGENT_IDENTIFIER.getKey()))
.fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), C2_FULL_HEARTBEAT.getDefaultValue())))
.heartbeatPeriod(Long.parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
.connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CONNECTION_TIMEOUT.getKey(),
C2_REST_CONNECTION_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
.readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_READ_TIMEOUT.getKey(),
C2_REST_READ_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
.callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CALL_TIMEOUT.getKey(),
C2_REST_CALL_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
.maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
.keepAliveDuration((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(),
C2_KEEP_ALIVE_DURATION.getDefaultValue()), TimeUnit.MILLISECONDS))
.fullHeartbeat(parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), C2_FULL_HEARTBEAT.getDefaultValue())))
.heartbeatPeriod(parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(), C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
.connectTimeout(durationPropertyInMilliSecs(properties, C2_REST_CONNECTION_TIMEOUT))
.readTimeout(durationPropertyInMilliSecs(properties, C2_REST_READ_TIMEOUT))
.callTimeout(durationPropertyInMilliSecs(properties, C2_REST_CALL_TIMEOUT))
.maxIdleConnections(parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
.keepAliveDuration(durationPropertyInMilliSecs(properties, C2_KEEP_ALIVE_DURATION))
.httpHeaders(properties.getProperty(C2_REST_HTTP_HEADERS.getKey(), C2_REST_HTTP_HEADERS.getDefaultValue()))
.c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), C2_REQUEST_COMPRESSION.getDefaultValue()))
.c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), C2_ASSET_DIRECTORY.getDefaultValue()))
@ -207,9 +196,14 @@ public class C2NifiClientService {
.c2RestPathBase(properties.getProperty(C2_REST_PATH_BASE.getKey(), C2_REST_PATH_BASE.getDefaultValue()))
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), C2_REST_PATH_HEARTBEAT.getDefaultValue()))
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(), C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
.build();
}
private long durationPropertyInMilliSecs(NiFiProperties properties, MiNiFiProperties property) {
return (long) getPreciseTimeDuration(properties.getProperty(property.getKey(), property.getDefaultValue()), MILLISECONDS);
}
private C2OperationHandlerProvider c2OperationHandlerProvider(NiFiProperties niFiProperties, FlowController flowController, FlowService flowService,
FlowIdHolder flowIdHolder, C2HttpClient client, C2HeartbeatFactory heartbeatFactory,
String bootstrapConfigFileLocation, String c2AssetDirectory) {
@ -240,107 +234,29 @@ public class C2NifiClientService {
}
public void start() {
handleOngoingOperations();
heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
// need to be synchronized to prevent parallel run coming from acknowledgeHandler/ackTimeoutTask
private synchronized void handleOngoingOperations() {
Optional<OperationQueue> operationQueue = requestedOperationDAO.load();
LOGGER.info("Handling ongoing operations: {}", operationQueue);
if (operationQueue.isPresent()) {
try {
waitForAcknowledgeFromBootstrap();
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
} catch (Exception e) {
LOGGER.error("Failed to process c2 operations queue", e);
c2ClientService.enableHeartbeat();
}
} else {
c2ClientService.enableHeartbeat();
}
}
private void waitForAcknowledgeFromBootstrap() {
LOGGER.info("Waiting for ACK signal from Bootstrap");
int currentWaitTime = 0;
while (!ackReceived) {
try {
Thread.sleep(IS_ACK_RECEIVED_POLL_INTERVAL);
} catch (InterruptedException e) {
LOGGER.warn("Thread interrupted while waiting for Acknowledge");
}
currentWaitTime += IS_ACK_RECEIVED_POLL_INTERVAL;
if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
break;
}
}
}
private void registerOperation(C2Operation c2Operation) {
try {
ackReceived = false;
registerAcknowledgeTimeoutTask(c2Operation);
String command = ofNullable(c2Operation.getOperand())
.map(operand -> c2Operation.getOperation().name() + "_" + operand.name())
.orElse(c2Operation.getOperation().name());
bootstrapCommunicator.sendCommand(command);
} catch (IOException e) {
LOGGER.error("Failed to send operation to bootstrap", e);
throw new UncheckedIOException(e);
}
}
private void registerAcknowledgeTimeoutTask(C2Operation c2Operation) {
bootstrapAcknowledgeExecutorService.schedule(() -> {
if (!ackReceived) {
LOGGER.info("Operation requiring restart is failed, and no restart/acknowledge is happened after {} seconds for {}. Handling remaining operations.",
MINIFI_RESTART_TIMEOUT_SECONDS, c2Operation);
handleOngoingOperations();
}
}, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
private void acknowledgeHandler(String[] params) {
LOGGER.info("Received acknowledge message from bootstrap process");
if (params.length < 1) {
LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
return;
}
Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.load();
ackReceived = true;
if (optionalOperationQueue.isPresent()) {
OperationQueue operationQueue = optionalOperationQueue.get();
C2Operation c2Operation = operationQueue.getCurrentOperation();
C2OperationAck c2OperationAck = new C2OperationAck();
c2OperationAck.setOperationId(c2Operation.getIdentifier());
C2OperationState c2OperationState = new C2OperationState();
MiNiFiCommandState miNiFiCommandState = MiNiFiCommandState.valueOf(params[0]);
OperationState state = OPERATION_STATE_MAP.get(miNiFiCommandState);
c2OperationState.setState(state);
c2OperationAck.setOperationState(c2OperationState);
c2ClientService.sendAcknowledge(c2OperationAck);
if (MiNiFiCommandState.NO_OPERATION == miNiFiCommandState || MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART == miNiFiCommandState) {
LOGGER.debug("No restart happened because of an error / the app was already in the desired state");
handleOngoingOperations();
}
} else {
LOGGER.error("Can not send acknowledge due to empty Operation Queue");
}
operationManagerExecutorService.execute(c2OperationManager);
LOGGER.debug("Scheduling heartbeats with {} ms periodicity", heartbeatPeriod);
heartbeatManagerExecutorService.scheduleAtFixedRate(c2HeartbeatManager, INITIAL_HEARTBEAT_DELAY_MS, heartbeatPeriod, MILLISECONDS);
}
public void stop() {
bootstrapAcknowledgeExecutorService.shutdownNow();
heartbeatExecutorService.shutdown();
heartbeatManagerExecutorService.shutdown();
try {
if (!heartbeatExecutorService.awaitTermination(TERMINATION_WAIT, TimeUnit.MILLISECONDS)) {
heartbeatExecutorService.shutdownNow();
if (!heartbeatManagerExecutorService.awaitTermination(TERMINATION_WAIT, MILLISECONDS)) {
heartbeatManagerExecutorService.shutdownNow();
}
} catch (InterruptedException ignore) {
LOGGER.info("Stopping C2 Client's thread was interrupted but shutting down anyway the C2NifiClientService");
heartbeatExecutorService.shutdownNow();
LOGGER.info("Stopping C2 heartbeat executor service was interrupted, forcing shutdown");
heartbeatManagerExecutorService.shutdownNow();
}
operationManagerExecutorService.shutdown();
try {
if (!operationManagerExecutorService.awaitTermination(TERMINATION_WAIT, MILLISECONDS)) {
operationManagerExecutorService.shutdownNow();
}
} catch (InterruptedException ignore) {
LOGGER.info("Stopping C2 operation executor service was interrupted, forcing shutdown");
operationManagerExecutorService.shutdownNow();
}
}

View File

@ -17,22 +17,25 @@
package org.apache.nifi.minifi.c2;
import static org.slf4j.LoggerFactory.getLogger;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.util.Optional;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
public class FileBasedOperationQueueDAO implements OperationQueueDAO {
private static final Logger LOGGER = getLogger(FileBasedOperationQueueDAO.class);
protected static final String REQUESTED_OPERATIONS_FILE_NAME = "requestedOperations.data";
private final ObjectMapper objectMapper;
private final File requestedOperationsFile;
public FileBasedRequestedOperationDAO(String runDir, ObjectMapper objectMapper) {
public FileBasedOperationQueueDAO(String runDir, ObjectMapper objectMapper) {
this.requestedOperationsFile = new File(runDir, REQUESTED_OPERATIONS_FILE_NAME);
this.objectMapper = objectMapper;
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperationType.START;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.bootstrap.CommandResult;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class BootstrapC2OperationRestartHandlerTest {
@Test
void shouldReturnNotAppliedWhenBootstrapCommunicatorReturnsFalse() throws IOException {
C2Operation inputOperation = new C2Operation();
inputOperation.setOperation(START);
BootstrapCommunicator bootstrapCommunicator = mock(BootstrapCommunicator.class);
when(bootstrapCommunicator.sendCommand(START.name())).thenReturn(FAILURE);
long bootstrapAcknowledgeTimeoutMs = 0;
BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs);
Optional<OperationState> result = testHandler.handleRestart(inputOperation);
assertTrue(result.isPresent());
assertEquals(NOT_APPLIED, result.get());
}
@Test
void shouldReturnNotAppliedWhenBootstrapCommunicatorThrowsException() throws IOException {
C2Operation inputOperation = new C2Operation();
inputOperation.setOperation(START);
BootstrapCommunicator bootstrapCommunicator = mock(BootstrapCommunicator.class);
when(bootstrapCommunicator.sendCommand(START.name())).thenThrow(new IOException());
long bootstrapAcknowledgeTimeoutMs = 0;
BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs);
Optional<OperationState> result = testHandler.handleRestart(inputOperation);
assertTrue(result.isPresent());
assertEquals(NOT_APPLIED, result.get());
}
@Test
void shouldReturnStateAcknowledgedByBootstrapCommunicator() {
C2Operation inputOperation = new C2Operation();
inputOperation.setOperation(START);
long bootstrapAcknowledgeTimeoutMs = 1000;
long waitBeforeAcknowledgeMs = 100;
String[] callbackResult = new String[] {FULLY_APPLIED.name()};
BootstrapCommunicatorStub bootstrapCommunicator = new BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs);
try (ExecutorService executorService = newVirtualThreadPerTaskExecutor()) {
executorService.execute(bootstrapCommunicator);
Optional<OperationState> result = testHandler.handleRestart(inputOperation);
assertTrue(result.isPresent());
assertEquals(FULLY_APPLIED, result.get());
}
}
@Test
void shouldReturnNotAppliedWhenBootstrapAcknowledgeTimesOut() {
C2Operation inputOperation = new C2Operation();
inputOperation.setOperation(START);
String[] callbackResult = new String[] {FULLY_APPLIED.name()};
long bootstrapAcknowledgeTimeoutMs = 1000;
long waitBeforeAcknowledgeMs = 2000;
BootstrapCommunicatorStub bootstrapCommunicator = new BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs);
try (ExecutorService executorService = newVirtualThreadPerTaskExecutor()) {
executorService.execute(bootstrapCommunicator);
Optional<OperationState> result = testHandler.handleRestart(inputOperation);
assertTrue(result.isPresent());
assertEquals(NOT_APPLIED, result.get());
}
}
@Test
void shouldReturnNotAppliedWhenBootstrapSendInvalidResponse() {
C2Operation inputOperation = new C2Operation();
inputOperation.setOperation(START);
String[] callbackResult = new String[] {};
long bootstrapAcknowledgeTimeoutMs = 1000;
long waitBeforeAcknowledgeMs = 100;
BootstrapCommunicatorStub bootstrapCommunicator = new BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs);
try (ExecutorService executorService = newVirtualThreadPerTaskExecutor()) {
executorService.execute(bootstrapCommunicator);
Optional<OperationState> result = testHandler.handleRestart(inputOperation);
assertTrue(result.isPresent());
assertEquals(NOT_APPLIED, result.get());
}
}
static class BootstrapCommunicatorStub implements BootstrapCommunicator, Runnable {
private final CommandResult sendCommandResult;
private final String[] callbackResult;
private final long waitBeforeAcknowledgeMs;
private BiConsumer<String[], OutputStream> handler;
BootstrapCommunicatorStub(CommandResult sendCommandResult, String[] callbackResult, long waitBeforeAcknowledgeMs) {
this.sendCommandResult = sendCommandResult;
this.callbackResult = callbackResult;
this.waitBeforeAcknowledgeMs = waitBeforeAcknowledgeMs;
}
@Override
public void run() {
try {
sleep(waitBeforeAcknowledgeMs);
} catch (InterruptedException ignore) {
}
handler.accept(callbackResult, null);
}
@Override
public CommandResult sendCommand(String command, String... args) {
return sendCommandResult;
}
@Override
public void registerMessageHandler(String command, BiConsumer<String[], OutputStream> handler) {
this.handler = handler;
}
}
}

View File

@ -17,7 +17,7 @@
package org.apache.nifi.minifi.c2;
import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
import static org.apache.nifi.minifi.c2.FileBasedOperationQueueDAO.REQUESTED_OPERATIONS_FILE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.protocol.api.C2Operation;
@ -45,7 +46,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class FileBasedRequestedOperationDAOTest {
class FileBasedOperationQueueDAOTest {
@Mock
private ObjectMapper objectMapper;
@ -53,11 +54,11 @@ class FileBasedRequestedOperationDAOTest {
@TempDir
File tmpDir;
private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
private FileBasedOperationQueueDAO fileBasedRequestedOperationDAO;
@BeforeEach
void setup() {
fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
fileBasedRequestedOperationDAO = new FileBasedOperationQueueDAO(tmpDir.getAbsolutePath(), objectMapper);
}
@Test
@ -109,6 +110,6 @@ class FileBasedRequestedOperationDAOTest {
C2Operation currentOperation = new C2Operation();
currentOperation.setIdentifier("id2");
return new OperationQueue(currentOperation, Collections.singletonList(c2Operation));
return new OperationQueue(currentOperation, List.of(c2Operation));
}
}

View File

@ -14,8 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap;
import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
@ -38,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.bootstrap.CommandResult;
import org.apache.nifi.minifi.MiNiFiServer;
import org.apache.nifi.minifi.commons.status.FlowStatusReport;
import org.apache.nifi.minifi.status.StatusRequestException;
@ -90,29 +95,29 @@ public class BootstrapListener implements BootstrapCommunicator {
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
sendCommand("PORT", new String[]{String.valueOf(localPort), secretKey});
sendCommand("PORT", new String[] {String.valueOf(localPort), secretKey});
}
public void reload() throws IOException {
if (listener != null) {
listener.stop();
}
sendCommand(RELOAD, new String[]{});
sendCommand(RELOAD, new String[] {});
}
public void stop() throws IOException {
if (listener != null) {
listener.stop();
}
sendCommand(SHUTDOWN, new String[]{});
sendCommand(SHUTDOWN, new String[] {});
}
public void sendStartedStatus(boolean status) throws IOException {
logger.debug("Notifying Bootstrap that the status of starting MiNiFi is {}", status);
sendCommand(STARTED, new String[]{String.valueOf(status)});
sendCommand(STARTED, new String[] {String.valueOf(status)});
}
public void sendCommand(String command, String[] args) throws IOException {
public CommandResult sendCommand(String command, String[] args) throws IOException {
try (Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", bootstrapPort));
@ -134,8 +139,10 @@ public class BootstrapListener implements BootstrapCommunicator {
String response = reader.readLine();
if ("OK".equals(response)) {
logger.info("Successfully initiated communication with Bootstrap");
return SUCCESS;
} else {
logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from MiNiFi");
return FAILURE;
}
}
}

View File

@ -25,11 +25,13 @@ public interface BootstrapCommunicator {
/**
* Sends a command with specific arguments to the bootstrap process
*
* @param command the command to send
* @param args the args to send
* @param args the args to send
* @return {@link CommandResult} of the command sent to Bootstrap
* @throws IOException exception in case of communication issue
*/
void sendCommand(String command, String... args) throws IOException;
CommandResult sendCommand(String command, String... args) throws IOException;
/**
* Register a handler for messages coming from bootstrap process

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
public enum CommandResult {
FAILURE,
SUCCESS
}