mirror of https://github.com/apache/nifi.git
NIFI-10531 Add supported operations to c2 heartbeat:
This closes #6438 Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>
This commit is contained in:
parent
16bcb8f145
commit
504baae227
|
@ -41,8 +41,12 @@ public class C2ClientService {
|
|||
}
|
||||
|
||||
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
|
||||
C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
|
||||
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
|
||||
try {
|
||||
C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
|
||||
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to send/process heartbeat:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processResponse(C2HeartbeatResponse response) {
|
||||
|
|
|
@ -23,12 +23,9 @@ import java.lang.management.ManagementFactory;
|
|||
import java.lang.management.OperatingSystemMXBean;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -37,6 +34,7 @@ import org.apache.nifi.c2.client.C2ClientConfig;
|
|||
import org.apache.nifi.c2.client.PersistentUuidGenerator;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.protocol.api.AgentInfo;
|
||||
import org.apache.nifi.c2.protocol.api.AgentManifest;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositories;
|
||||
import org.apache.nifi.c2.protocol.api.AgentStatus;
|
||||
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
|
||||
|
@ -44,8 +42,8 @@ import org.apache.nifi.c2.protocol.api.DeviceInfo;
|
|||
import org.apache.nifi.c2.protocol.api.FlowInfo;
|
||||
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
|
||||
import org.apache.nifi.c2.protocol.api.NetworkInfo;
|
||||
import org.apache.nifi.c2.protocol.api.SupportedOperation;
|
||||
import org.apache.nifi.c2.protocol.api.SystemInfo;
|
||||
import org.apache.nifi.c2.protocol.component.api.Bundle;
|
||||
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -59,14 +57,16 @@ public class C2HeartbeatFactory {
|
|||
|
||||
private final C2ClientConfig clientConfig;
|
||||
private final FlowIdHolder flowIdHolder;
|
||||
private final ManifestHashProvider manifestHashProvider;
|
||||
|
||||
private String agentId;
|
||||
private String deviceId;
|
||||
private File confDirectory;
|
||||
|
||||
public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder) {
|
||||
public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder, ManifestHashProvider manifestHashProvider) {
|
||||
this.clientConfig = clientConfig;
|
||||
this.flowIdHolder = flowIdHolder;
|
||||
this.manifestHashProvider = manifestHashProvider;
|
||||
}
|
||||
|
||||
public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {
|
||||
|
@ -97,7 +97,7 @@ public class C2HeartbeatFactory {
|
|||
agentStatus.setRepositories(repos);
|
||||
|
||||
agentInfo.setStatus(agentStatus);
|
||||
agentInfo.setAgentManifestHash(calculateManifestHash(manifest.getBundles()));
|
||||
agentInfo.setAgentManifestHash(manifestHashProvider.calculateManifestHash(manifest.getBundles(), getSupportedOperations(manifest)));
|
||||
|
||||
if (clientConfig.isFullHeartbeat()) {
|
||||
agentInfo.setAgentManifest(manifest);
|
||||
|
@ -106,6 +106,17 @@ public class C2HeartbeatFactory {
|
|||
return agentInfo;
|
||||
}
|
||||
|
||||
private Set<SupportedOperation> getSupportedOperations(RuntimeManifest manifest) {
|
||||
Set<SupportedOperation> supportedOperations;
|
||||
// supported operations has value only in case of minifi, therefore we return empty collection if
|
||||
if (manifest instanceof AgentManifest) {
|
||||
supportedOperations = ((AgentManifest) manifest).getSupportedOperations();
|
||||
} else {
|
||||
supportedOperations = Collections.emptySet();
|
||||
}
|
||||
return supportedOperations;
|
||||
}
|
||||
|
||||
private String getAgentId() {
|
||||
if (agentId == null) {
|
||||
String rawAgentId = clientConfig.getAgentIdentifier();
|
||||
|
@ -235,26 +246,4 @@ public class C2HeartbeatFactory {
|
|||
return confDirectory;
|
||||
}
|
||||
|
||||
private String calculateManifestHash(List<Bundle> loadedBundles) {
|
||||
byte[] bytes;
|
||||
try {
|
||||
bytes = MessageDigest.getInstance("SHA-512").digest(loadedBundles.stream()
|
||||
.map(bundle -> bundle.getGroup() + bundle.getArtifact() + bundle.getVersion())
|
||||
.sorted()
|
||||
.collect(Collectors.joining(","))
|
||||
.getBytes(StandardCharsets.UTF_8));
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException("Unable to set up manifest hash calculation due to not having support for the chosen digest algorithm", e);
|
||||
}
|
||||
|
||||
return bytesToHex(bytes);
|
||||
}
|
||||
|
||||
private String bytesToHex(byte[] in) {
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
for (byte b : in) {
|
||||
builder.append(String.format("%02x", b));
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.nifi.c2.protocol.api.SupportedOperation;
|
||||
import org.apache.nifi.c2.protocol.component.api.Bundle;
|
||||
|
||||
public class ManifestHashProvider {
|
||||
private String currentBundles = null;
|
||||
private Set<SupportedOperation> currentSupportedOperations = Collections.emptySet();
|
||||
private int currentHashCode;
|
||||
private String currentManifestHash;
|
||||
|
||||
public String calculateManifestHash(List<Bundle> loadedBundles, Set<SupportedOperation> supportedOperations) {
|
||||
String bundleString = loadedBundles.stream()
|
||||
.map(bundle -> bundle.getGroup() + bundle.getArtifact() + bundle.getVersion())
|
||||
.sorted()
|
||||
.collect(Collectors.joining(","));
|
||||
int hashCode = Objects.hash(bundleString, supportedOperations);
|
||||
if (hashCode != currentHashCode
|
||||
|| !(Objects.equals(bundleString, currentBundles) && Objects.equals(supportedOperations, currentSupportedOperations))) {
|
||||
byte[] bytes;
|
||||
try {
|
||||
bytes = MessageDigest.getInstance("SHA-512").digest(getBytes(supportedOperations, bundleString));
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException("Unable to set up manifest hash calculation due to not having support for the chosen digest algorithm", e);
|
||||
}
|
||||
currentHashCode = hashCode;
|
||||
currentManifestHash = bytesToHex(bytes);
|
||||
currentBundles = bundleString;
|
||||
currentSupportedOperations = supportedOperations;
|
||||
}
|
||||
return currentManifestHash;
|
||||
}
|
||||
|
||||
private byte[] getBytes(Set<SupportedOperation> supportedOperations, String bundleString) {
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
|
||||
oos.write(bundleString.getBytes(StandardCharsets.UTF_8));
|
||||
oos.writeObject(supportedOperations);
|
||||
oos.flush();
|
||||
return bos.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to transform supportedOperations and bundles to byte array", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String bytesToHex(byte[] in) {
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
for (byte b : in) {
|
||||
builder.append(String.format("%02x", b));
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
|
@ -40,6 +41,13 @@ public interface C2OperationHandler {
|
|||
*/
|
||||
OperandType getOperandType();
|
||||
|
||||
/**
|
||||
* Returns the properties context for the given operand
|
||||
*
|
||||
* @return the property map
|
||||
*/
|
||||
Map<String, Object> getProperties();
|
||||
|
||||
/**
|
||||
* Handler logic for the specific C2Operation
|
||||
*
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -47,6 +48,19 @@ public class C2OperationService {
|
|||
});
|
||||
}
|
||||
|
||||
public Map<OperationType, Map<OperandType, C2OperationHandler>> getHandlers() {
|
||||
Map<OperationType, Map<OperandType, C2OperationHandler>> handlers = new HashMap<>();
|
||||
handlerMap.entrySet()
|
||||
.forEach(operationEntry -> {
|
||||
Map<OperandType, C2OperationHandler> operands = new HashMap<>();
|
||||
operationEntry.getValue()
|
||||
.entrySet()
|
||||
.forEach(o -> operands.put(o.getKey(), o.getValue()));
|
||||
handlers.put(operationEntry.getKey(), Collections.unmodifiableMap(operands));
|
||||
});
|
||||
return Collections.unmodifiableMap(handlers);
|
||||
}
|
||||
|
||||
private Optional<C2OperationHandler> getHandlerForOperation(C2Operation operation) {
|
||||
return Optional.ofNullable(handlerMap.get(operation.getOperation()))
|
||||
.map(operandMap -> operandMap.get(operation.getOperand()));
|
||||
|
|
|
@ -39,6 +39,7 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -69,14 +70,18 @@ public class DebugOperationHandler implements C2OperationHandler {
|
|||
private final C2Client c2Client;
|
||||
private final List<Path> bundleFilePaths;
|
||||
private final Predicate<String> contentFilter;
|
||||
private final OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
private DebugOperationHandler(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
|
||||
private DebugOperationHandler(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter,
|
||||
OperandPropertiesProvider operandPropertiesProvider) {
|
||||
this.c2Client = c2Client;
|
||||
this.bundleFilePaths = bundleFilePaths;
|
||||
this.contentFilter = contentFilter;
|
||||
this.operandPropertiesProvider = operandPropertiesProvider;
|
||||
}
|
||||
|
||||
public static DebugOperationHandler create(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
|
||||
public static DebugOperationHandler create(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter,
|
||||
OperandPropertiesProvider operandPropertiesProvider) {
|
||||
if (c2Client == null) {
|
||||
throw new IllegalArgumentException("C2Client should not be null");
|
||||
}
|
||||
|
@ -87,7 +92,7 @@ public class DebugOperationHandler implements C2OperationHandler {
|
|||
throw new IllegalArgumentException("Content filter should not be null");
|
||||
}
|
||||
|
||||
return new DebugOperationHandler(c2Client, bundleFilePaths, contentFilter);
|
||||
return new DebugOperationHandler(c2Client, bundleFilePaths, contentFilter, operandPropertiesProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,6 +105,11 @@ public class DebugOperationHandler implements C2OperationHandler {
|
|||
return DEBUG;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return operandPropertiesProvider.getProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public C2OperationAck handle(C2Operation operation) {
|
||||
String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
|
||||
|
|
|
@ -20,6 +20,7 @@ import static org.apache.commons.lang3.StringUtils.EMPTY;
|
|||
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
|
||||
import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
|
||||
|
@ -31,19 +32,18 @@ import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
|||
import org.apache.nifi.c2.protocol.api.C2OperationState;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DescribeManifestOperationHandler implements C2OperationHandler {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DescribeManifestOperationHandler.class);
|
||||
|
||||
private final C2HeartbeatFactory heartbeatFactory;
|
||||
private final Supplier<RuntimeInfoWrapper> runtimeInfoSupplier;
|
||||
private final OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier) {
|
||||
public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier,
|
||||
OperandPropertiesProvider operandPropertiesProvider) {
|
||||
this.heartbeatFactory = heartbeatFactory;
|
||||
this.runtimeInfoSupplier = runtimeInfoSupplier;
|
||||
this.operandPropertiesProvider = operandPropertiesProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,4 +78,9 @@ public class DescribeManifestOperationHandler implements C2OperationHandler {
|
|||
|
||||
return operationAck;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return operandPropertiesProvider.getProperties();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class EmptyOperandPropertiesProvider implements OperandPropertiesProvider {
|
||||
|
||||
private static final Map<String, Object> EMPTY_MAP = Collections.unmodifiableMap(new HashMap<>());
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return EMPTY_MAP;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Common interface to provide properties for different operands.
|
||||
* */
|
||||
public interface OperandPropertiesProvider {
|
||||
|
||||
/**
|
||||
* Get the properties for the given operand.
|
||||
* The Value of the Map must implement the Serializable interface.
|
||||
*
|
||||
* @return the properties of the given operand
|
||||
* */
|
||||
Map<String, Object> getProperties();
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.apache.nifi.c2.protocol.api.SupportedOperation;
|
||||
|
||||
public class SupportedOperationsProvider {
|
||||
private final Map<OperationType, Map<OperandType, C2OperationHandler>> operationHandlers;
|
||||
|
||||
public SupportedOperationsProvider(Map<OperationType, Map<OperandType, C2OperationHandler>> handlers) {
|
||||
operationHandlers = handlers;
|
||||
}
|
||||
|
||||
public Set<SupportedOperation> getSupportedOperations() {
|
||||
return operationHandlers.entrySet()
|
||||
.stream()
|
||||
.map(operationEntry -> getSupportedOperation(operationEntry.getKey(), operationEntry.getValue()))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private SupportedOperation getSupportedOperation(OperationType operationType, Map<OperandType, C2OperationHandler> operands) {
|
||||
SupportedOperation supportedOperation = new SupportedOperation();
|
||||
supportedOperation.setType(operationType);
|
||||
|
||||
Map<OperandType, Map<String, Object>> properties = operands.values()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(C2OperationHandler::getOperandType, C2OperationHandler::getProperties));
|
||||
|
||||
supportedOperation.setProperties(properties);
|
||||
|
||||
return supportedOperation;
|
||||
}
|
||||
|
||||
}
|
|
@ -45,11 +45,14 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
|
|||
private final C2Client client;
|
||||
private final Function<byte[], Boolean> updateFlow;
|
||||
private final FlowIdHolder flowIdHolder;
|
||||
private final OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
public UpdateConfigurationOperationHandler(C2Client client, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow) {
|
||||
public UpdateConfigurationOperationHandler(C2Client client, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow,
|
||||
OperandPropertiesProvider operandPropertiesProvider) {
|
||||
this.client = client;
|
||||
this.updateFlow = updateFlow;
|
||||
this.flowIdHolder = flowIdHolder;
|
||||
this.operandPropertiesProvider = operandPropertiesProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,4 +131,9 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return operandPropertiesProvider.getProperties();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.nifi.c2.client.service;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -26,28 +25,34 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.nifi.c2.client.C2ClientConfig;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.protocol.api.AgentManifest;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositories;
|
||||
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
|
||||
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.apache.nifi.c2.protocol.api.SupportedOperation;
|
||||
import org.apache.nifi.c2.protocol.component.api.Bundle;
|
||||
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
public class C2HeartbeatFactoryTest {
|
||||
|
||||
private static final String AGENT_CLASS = "agentClass";
|
||||
private static final String FLOW_ID = "flowId";
|
||||
private static final String MANIFEST_HASH = "hash";
|
||||
|
||||
@Mock
|
||||
private C2ClientConfig clientConfig;
|
||||
|
@ -58,6 +63,9 @@ public class C2HeartbeatFactoryTest {
|
|||
@Mock
|
||||
private RuntimeInfoWrapper runtimeInfoWrapper;
|
||||
|
||||
@Mock
|
||||
private ManifestHashProvider manifestHashProvider;
|
||||
|
||||
@InjectMocks
|
||||
private C2HeartbeatFactory c2HeartbeatFactory;
|
||||
|
||||
|
@ -129,33 +137,27 @@ public class C2HeartbeatFactoryTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void testManifestHashChangesWhenManifestBundleChanges() {
|
||||
Bundle bundle1 = new Bundle("group1", "artifact1", "version1");
|
||||
Bundle bundle2 = new Bundle("group2", "artifact2", "version2");
|
||||
RuntimeManifest manifest1 = createManifest(bundle1);
|
||||
RuntimeManifest manifest2 = createManifest(bundle2);
|
||||
RuntimeManifest manifest3 = createManifest(bundle1, bundle2);
|
||||
void testAgentManifestHashIsPopulatedInCaseOfRuntimeManifest() {
|
||||
RuntimeManifest manifest = createManifest();
|
||||
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH);
|
||||
|
||||
when(runtimeInfoWrapper.getManifest()).thenReturn(manifest1);
|
||||
C2Heartbeat heartbeat1 = c2HeartbeatFactory.create(runtimeInfoWrapper);
|
||||
String hash1 = heartbeat1.getAgentInfo().getAgentManifestHash();
|
||||
assertNotNull(hash1);
|
||||
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
|
||||
|
||||
// same manifest should result in the same hash
|
||||
assertEquals(hash1, c2HeartbeatFactory.create(runtimeInfoWrapper).getAgentInfo().getAgentManifestHash());
|
||||
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
|
||||
}
|
||||
|
||||
// different manifest should result in hash change
|
||||
when(runtimeInfoWrapper.getManifest()).thenReturn(manifest2);
|
||||
C2Heartbeat heartbeat2 = c2HeartbeatFactory.create(runtimeInfoWrapper);
|
||||
String hash2 = heartbeat2.getAgentInfo().getAgentManifestHash();
|
||||
assertNotEquals(hash2, hash1);
|
||||
@Test
|
||||
void testAgentManifestHashIsPopulatedInCaseOfAgentManifest() {
|
||||
AgentManifest manifest = new AgentManifest(createManifest());
|
||||
SupportedOperation supportedOperation = new SupportedOperation();
|
||||
supportedOperation.setType(OperationType.HEARTBEAT);
|
||||
Set<SupportedOperation> supportedOperations = Collections.singleton(supportedOperation);
|
||||
manifest.setSupportedOperations(supportedOperations);
|
||||
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH);
|
||||
|
||||
// different manifest with multiple bundles should result in hash change compared to all previous
|
||||
when(runtimeInfoWrapper.getManifest()).thenReturn(manifest3);
|
||||
C2Heartbeat heartbeat3 = c2HeartbeatFactory.create(runtimeInfoWrapper);
|
||||
String hash3 = heartbeat3.getAgentInfo().getAgentManifestHash();
|
||||
assertNotEquals(hash3, hash1);
|
||||
assertNotEquals(hash3, hash2);
|
||||
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
|
||||
|
||||
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
|
||||
}
|
||||
|
||||
private RuntimeManifest createManifest() {
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.apache.nifi.c2.protocol.api.SupportedOperation;
|
||||
import org.apache.nifi.c2.protocol.component.api.Bundle;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class ManifestHashProviderTest {
|
||||
private ManifestHashProvider manifestHashProvider = new ManifestHashProvider();
|
||||
|
||||
@Test
|
||||
void testManifestHashChangesWhenManifestBundleChanges() {
|
||||
Bundle bundle1 = new Bundle("group1", "artifact1", "version1");
|
||||
Bundle bundle2 = new Bundle("group2", "artifact2", "version2");
|
||||
|
||||
SupportedOperation supportedOperation1 = new SupportedOperation();
|
||||
supportedOperation1.setType(OperationType.HEARTBEAT);
|
||||
SupportedOperation supportedOperation2 = new SupportedOperation();
|
||||
supportedOperation2.setType(OperationType.ACKNOWLEDGE);
|
||||
|
||||
String hash1 = manifestHashProvider.calculateManifestHash(Collections.singletonList(bundle1), Collections.singleton(supportedOperation1));
|
||||
assertNotNull(hash1);
|
||||
|
||||
// same manifest should result in the same hash
|
||||
assertEquals(hash1, manifestHashProvider.calculateManifestHash(Collections.singletonList(bundle1), Collections.singleton(supportedOperation1)));
|
||||
|
||||
// different manifest should result in hash change if only bundle change
|
||||
String hash2 = manifestHashProvider.calculateManifestHash(Collections.singletonList(bundle2), Collections.singleton(supportedOperation1));
|
||||
assertNotEquals(hash2, hash1);
|
||||
|
||||
// different manifest should result in hash change if only supported operation change
|
||||
String hash3 = manifestHashProvider.calculateManifestHash(Collections.singletonList(bundle1), Collections.singleton(supportedOperation2));
|
||||
assertNotEquals(hash3, hash1);
|
||||
|
||||
// different manifest with multiple bundles should result in hash change compared to all previous
|
||||
String hash4 = manifestHashProvider.calculateManifestHash(Arrays.asList(bundle1, bundle2), Collections.singleton(supportedOperation1));
|
||||
|
||||
assertNotEquals(hash4, hash1);
|
||||
assertNotEquals(hash4, hash2);
|
||||
assertNotEquals(hash4, hash3);
|
||||
}
|
||||
}
|
|
@ -16,11 +16,16 @@
|
|||
*/
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
|
||||
import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
|
@ -45,7 +50,7 @@ public class C2OperationServiceTest {
|
|||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setOperation(OperationType.UPDATE);
|
||||
operation.setOperand(OperandType.CONFIGURATION);
|
||||
operation.setOperand(CONFIGURATION);
|
||||
Optional<C2OperationAck> ack = service.handleOperation(operation);
|
||||
|
||||
assertFalse(ack.isPresent());
|
||||
|
@ -56,8 +61,8 @@ public class C2OperationServiceTest {
|
|||
C2OperationService service = new C2OperationService(Collections.singletonList(new TestDescribeOperationHandler()));
|
||||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setOperation(OperationType.DESCRIBE);
|
||||
operation.setOperand(OperandType.MANIFEST);
|
||||
operation.setOperation(DESCRIBE);
|
||||
operation.setOperand(MANIFEST);
|
||||
Optional<C2OperationAck> ack = service.handleOperation(operation);
|
||||
|
||||
assertTrue(ack.isPresent());
|
||||
|
@ -69,23 +74,41 @@ public class C2OperationServiceTest {
|
|||
C2OperationService service = new C2OperationService(Collections.singletonList(new TestInvalidOperationHandler()));
|
||||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setOperation(OperationType.DESCRIBE);
|
||||
operation.setOperand(OperandType.MANIFEST);
|
||||
operation.setOperation(DESCRIBE);
|
||||
operation.setOperand(MANIFEST);
|
||||
Optional<C2OperationAck> ack = service.handleOperation(operation);
|
||||
|
||||
assertFalse(ack.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandlersAreReturned() {
|
||||
C2OperationService service = new C2OperationService(Arrays.asList(new TestDescribeOperationHandler(), new TestInvalidOperationHandler()));
|
||||
|
||||
Map<OperationType, Map<OperandType, C2OperationHandler>> handlers = service.getHandlers();
|
||||
|
||||
assertEquals(1, handlers.keySet().size());
|
||||
assertTrue(handlers.keySet().contains(DESCRIBE));
|
||||
Map<OperandType, C2OperationHandler> operands = handlers.values().stream().findFirst().get();
|
||||
assertEquals(2, operands.size());
|
||||
assertTrue(operands.keySet().containsAll(Arrays.asList(MANIFEST, CONFIGURATION)));
|
||||
}
|
||||
|
||||
private static class TestDescribeOperationHandler implements C2OperationHandler {
|
||||
|
||||
@Override
|
||||
public OperationType getOperationType() {
|
||||
return OperationType.DESCRIBE;
|
||||
return DESCRIBE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperandType getOperandType() {
|
||||
return OperandType.MANIFEST;
|
||||
return MANIFEST;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,12 +121,17 @@ public class C2OperationServiceTest {
|
|||
|
||||
@Override
|
||||
public OperationType getOperationType() {
|
||||
return OperationType.DESCRIBE;
|
||||
return DESCRIBE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperandType getOperandType() {
|
||||
return OperandType.CONFIGURATION;
|
||||
return CONFIGURATION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -79,6 +79,9 @@ public class DebugOperationHandlerTest {
|
|||
@Mock
|
||||
private C2Client c2Client;
|
||||
|
||||
@Mock
|
||||
private OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
@TempDir
|
||||
private File tempDir;
|
||||
|
||||
|
@ -96,13 +99,13 @@ public class DebugOperationHandlerTest {
|
|||
@ParameterizedTest(name = "c2Client={0} bundleFileList={1} contentFilter={2}")
|
||||
@MethodSource("invalidConstructorArguments")
|
||||
public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
|
||||
assertThrows(IllegalArgumentException.class, () -> DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter));
|
||||
assertThrows(IllegalArgumentException.class, () -> DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter, operandPropertiesProvider));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOperationAndOperandTypesAreMatching() {
|
||||
// given
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
|
||||
// when + then
|
||||
assertEquals(TRANSFER, testHandler.getOperationType());
|
||||
|
@ -112,7 +115,7 @@ public class DebugOperationHandlerTest {
|
|||
@Test
|
||||
public void testC2CallbackUrlIsNullInArgs() {
|
||||
// given
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
C2Operation c2Operation = operation(null);
|
||||
|
||||
// when
|
||||
|
@ -131,7 +134,7 @@ public class DebugOperationHandlerTest {
|
|||
List<Path> createBundleFiles = bundleFileNamesWithContents.entrySet().stream()
|
||||
.map(entry -> placeFileWithContent(entry.getKey(), entry.getValue()))
|
||||
.collect(toList());
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, createBundleFiles, DEFAULT_CONTENT_FILTER);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, createBundleFiles, DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
|
||||
|
||||
// when
|
||||
|
@ -151,7 +154,8 @@ public class DebugOperationHandlerTest {
|
|||
@Test
|
||||
public void testFileToCollectDoesNotExist() {
|
||||
// given
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")), DEFAULT_CONTENT_FILTER);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")),
|
||||
DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
|
||||
|
||||
// when
|
||||
|
@ -190,7 +194,7 @@ public class DebugOperationHandlerTest {
|
|||
// given
|
||||
Path bundleFile = placeFileWithContent(fileName, inputContent);
|
||||
Predicate<String> testContentFilter = content -> !content.contains(filterKeyword);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(bundleFile), testContentFilter);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(bundleFile), testContentFilter, operandPropertiesProvider);
|
||||
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
|
||||
|
||||
// when
|
||||
|
|
|
@ -43,10 +43,12 @@ public class DescribeManifestOperationHandlerTest {
|
|||
|
||||
@Mock
|
||||
private C2HeartbeatFactory heartbeatFactory;
|
||||
@Mock
|
||||
private OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
@Test
|
||||
void testDescribeManifestOperationHandlerCreateSuccess() {
|
||||
DescribeManifestOperationHandler handler = new DescribeManifestOperationHandler(null, null);
|
||||
DescribeManifestOperationHandler handler = new DescribeManifestOperationHandler(null, null, operandPropertiesProvider);
|
||||
|
||||
assertEquals(OperationType.DESCRIBE, handler.getOperationType());
|
||||
assertEquals(OperandType.MANIFEST, handler.getOperandType());
|
||||
|
@ -67,7 +69,7 @@ public class DescribeManifestOperationHandlerTest {
|
|||
heartbeat.setFlowInfo(flowInfo);
|
||||
|
||||
when(heartbeatFactory.create(runtimeInfoWrapper)).thenReturn(heartbeat);
|
||||
DescribeManifestOperationHandler handler = new DescribeManifestOperationHandler(heartbeatFactory, () -> runtimeInfoWrapper);
|
||||
DescribeManifestOperationHandler handler = new DescribeManifestOperationHandler(heartbeatFactory, () -> runtimeInfoWrapper, operandPropertiesProvider);
|
||||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setIdentifier(OPERATION_ID);
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.util.Collections;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class EmptyOperandPropertiesProviderTest {
|
||||
|
||||
private final OperandPropertiesProvider operandPropertiesProvider = new EmptyOperandPropertiesProvider();
|
||||
|
||||
@Test
|
||||
void testEmptyMapReturn() {
|
||||
assertEquals(Collections.emptyMap(), operandPropertiesProvider.getProperties());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.apache.nifi.c2.protocol.api.SupportedOperation;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class SupportedOperationsProviderTest {
|
||||
|
||||
@Test
|
||||
void testSupportedOperationsAreProvided() {
|
||||
C2OperationHandler describeManifestOperationHandler = mock(C2OperationHandler.class);
|
||||
C2OperationHandler describeConfigurationOperationHandler = mock(C2OperationHandler.class);
|
||||
Map<String, Object> describeManifestProperties = Collections.singletonMap("availableProperties", Arrays.asList("property1", "property2"));
|
||||
Map<String, Object> describeConfigurationProperties = Collections.emptyMap();
|
||||
when(describeManifestOperationHandler.getProperties()).thenReturn(describeManifestProperties);
|
||||
when(describeManifestOperationHandler.getOperandType()).thenReturn(OperandType.MANIFEST);
|
||||
when(describeConfigurationOperationHandler.getProperties()).thenReturn(describeConfigurationProperties);
|
||||
when(describeConfigurationOperationHandler.getOperandType()).thenReturn(OperandType.CONFIGURATION);
|
||||
|
||||
Map<OperationType, Map<OperandType, C2OperationHandler>> operationHandlers = new HashMap<>();
|
||||
operationHandlers.put(OperationType.PAUSE, Collections.emptyMap());
|
||||
Map<OperandType, C2OperationHandler> operandHandlers = new HashMap<>();
|
||||
operandHandlers.put(OperandType.MANIFEST, describeManifestOperationHandler);
|
||||
operandHandlers.put(OperandType.CONFIGURATION, describeConfigurationOperationHandler);
|
||||
operationHandlers.put(OperationType.DESCRIBE, operandHandlers);
|
||||
|
||||
SupportedOperationsProvider supportedOperationsProvider = new SupportedOperationsProvider(operationHandlers);
|
||||
|
||||
SupportedOperation pauseOperation = new SupportedOperation();
|
||||
pauseOperation.setType(OperationType.PAUSE);
|
||||
pauseOperation.setProperties(Collections.emptyMap());
|
||||
SupportedOperation describeOperation = new SupportedOperation();
|
||||
describeOperation.setType(OperationType.DESCRIBE);
|
||||
Map<OperandType, Map<String, Object>> operands = new HashMap<>();
|
||||
operands.put(OperandType.MANIFEST, describeManifestProperties);
|
||||
operands.put(OperandType.CONFIGURATION, describeConfigurationProperties);
|
||||
describeOperation.setProperties(operands);
|
||||
|
||||
Set<SupportedOperation> expected = new HashSet<>(Arrays.asList(pauseOperation, describeOperation));
|
||||
assertEquals(expected, supportedOperationsProvider.getSupportedOperations());
|
||||
}
|
||||
}
|
|
@ -17,8 +17,8 @@
|
|||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.FLOW_ID;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -51,10 +51,12 @@ public class UpdateConfigurationOperationHandlerTest {
|
|||
|
||||
@Mock
|
||||
private FlowIdHolder flowIdHolder;
|
||||
@Mock
|
||||
private OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
@Test
|
||||
void testUpdateConfigurationOperationHandlerCreateSuccess() {
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null, operandPropertiesProvider);
|
||||
|
||||
assertEquals(OperationType.UPDATE, handler.getOperationType());
|
||||
assertEquals(OperandType.CONFIGURATION, handler.getOperandType());
|
||||
|
@ -62,7 +64,7 @@ public class UpdateConfigurationOperationHandlerTest {
|
|||
|
||||
@Test
|
||||
void testHandleIncorrectArg() {
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null, operandPropertiesProvider);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setArgs(INCORRECT_LOCATION_MAP);
|
||||
|
||||
|
@ -76,7 +78,7 @@ public class UpdateConfigurationOperationHandlerTest {
|
|||
Function<byte[], Boolean> successUpdate = x -> true;
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate);
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setIdentifier(OPERATION_ID);
|
||||
|
||||
|
@ -95,7 +97,7 @@ public class UpdateConfigurationOperationHandlerTest {
|
|||
void testHandleReturnsNotAppliedWithNoContent() {
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.empty());
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null);
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null, operandPropertiesProvider);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setArgs(CORRECT_LOCATION_MAP);
|
||||
|
||||
|
@ -110,7 +112,7 @@ public class UpdateConfigurationOperationHandlerTest {
|
|||
Function<byte[], Boolean> failedToUpdate = x -> false;
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate);
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate, operandPropertiesProvider);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setIdentifier(OPERATION_ID);
|
||||
operation.setArgs(CORRECT_LOCATION_MAP);
|
||||
|
@ -126,7 +128,7 @@ public class UpdateConfigurationOperationHandlerTest {
|
|||
Function<byte[], Boolean> successUpdate = x -> true;
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate);
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setIdentifier(OPERATION_ID);
|
||||
operation.setArgs(CORRECT_LOCATION_MAP);
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.protocol.api;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
|
||||
|
||||
@ApiModel
|
||||
public class AgentManifest extends RuntimeManifest {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@ApiModelProperty("All supported operations by agent")
|
||||
private Set<SupportedOperation> supportedOperations;
|
||||
|
||||
public AgentManifest() {
|
||||
super();
|
||||
}
|
||||
|
||||
public AgentManifest(RuntimeManifest manifest) {
|
||||
super();
|
||||
setAgentType(manifest.getAgentType());
|
||||
setIdentifier(manifest.getIdentifier());
|
||||
setBundles(manifest.getBundles());
|
||||
setBuildInfo(manifest.getBuildInfo());
|
||||
setSchedulingDefaults(manifest.getSchedulingDefaults());
|
||||
setVersion(manifest.getVersion());
|
||||
}
|
||||
|
||||
public Set<SupportedOperation> getSupportedOperations() {
|
||||
return supportedOperations;
|
||||
}
|
||||
|
||||
public void setSupportedOperations(Set<SupportedOperation> supportedOperations) {
|
||||
this.supportedOperations = supportedOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
AgentManifest that = (AgentManifest) o;
|
||||
return Objects.equals(supportedOperations, that.supportedOperations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), supportedOperations);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AgentManifest{" +
|
||||
"supportedOperations=" + supportedOperations +
|
||||
"}, " + super.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.protocol.api;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
@ApiModel
|
||||
public class SupportedOperation implements Serializable {
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
@ApiModelProperty("The type of the operation supported by the agent")
|
||||
private OperationType type;
|
||||
|
||||
@ApiModelProperty("Operand specific properties defined by the agent")
|
||||
private Map<OperandType, Map<String, Object>> properties;
|
||||
|
||||
public OperationType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(OperationType type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Map<OperandType, Map<String, Object>> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public void setProperties(Map<OperandType, Map<String, Object>> properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SupportedOperation that = (SupportedOperation) o;
|
||||
return type == that.type && Objects.equals(properties, that.properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(type, properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SupportedOperation{" +
|
||||
"type=" + type +
|
||||
", properties=" + properties +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -152,6 +152,21 @@ To load a new dataflow for a MiNiFi instance to run:
|
|||
1. Change the flow definition on the C2 Server
|
||||
2. When a new flow is available on the C2 server, MiNiFi will download it via C2 and restart itself to pick up the changes
|
||||
|
||||
## C2 Heartbeat
|
||||
Heartbeat provides status(agent, flowm device) and operational capabilities to C2 server(s)
|
||||
|
||||
### Agent manifest
|
||||
The agent manifest is the descriptor of the available extensions. The size of the heartbeat
|
||||
might increase depending on the added extensions.
|
||||
|
||||
With the `c2.full.heartbeat` parameter you can control whether to always include the manifest in the heartbeat or not.
|
||||
|
||||
The `agentInfo.agentManifestHash` node can be used to detect in the C2 server whether the manifest changed compared to the previous heartbeat.
|
||||
|
||||
If change is detected, a full heartbeat can be retrieved by sending a DESCRIBE MANIFEST Operation in the `requestedOperations` node of the C2 Heartbeat response.
|
||||
|
||||
For more details about the C2 protocol please visit [Apache NiFi - MiNiFi C2 wiki page](https://cwiki.apache.org/confluence/display/MINIFI/C2).
|
||||
|
||||
## Using Processors Not Packaged with MiNiFi
|
||||
MiNiFi is able to use the following processors out of the box:
|
||||
* UpdateAttribute
|
||||
|
|
|
@ -45,11 +45,16 @@ import org.apache.nifi.c2.client.http.C2HttpClient;
|
|||
import org.apache.nifi.c2.client.service.C2ClientService;
|
||||
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
|
||||
import org.apache.nifi.c2.client.service.FlowIdHolder;
|
||||
import org.apache.nifi.c2.client.service.ManifestHashProvider;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.client.service.operation.C2OperationService;
|
||||
import org.apache.nifi.c2.client.service.operation.DebugOperationHandler;
|
||||
import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
|
||||
import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
|
||||
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
|
||||
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
|
||||
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
|
||||
import org.apache.nifi.c2.protocol.api.AgentManifest;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositories;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
|
||||
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
|
||||
|
@ -103,6 +108,8 @@ public class C2NifiClientService {
|
|||
private final ExtensionManifestParser extensionManifestParser = new JAXBExtensionManifestParser();
|
||||
|
||||
private final RuntimeManifestService runtimeManifestService;
|
||||
|
||||
private final SupportedOperationsProvider supportedOperationsProvider;
|
||||
private final long heartbeatPeriod;
|
||||
|
||||
public C2NifiClientService(final NiFiProperties niFiProperties, final FlowController flowController) {
|
||||
|
@ -118,16 +125,19 @@ public class C2NifiClientService {
|
|||
this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
|
||||
this.flowController = flowController;
|
||||
C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer());
|
||||
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder);
|
||||
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
|
||||
OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
|
||||
C2OperationService c2OperationService = new C2OperationService(Arrays.asList(
|
||||
new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent, emptyOperandPropertiesProvider),
|
||||
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider),
|
||||
DebugOperationHandler.create(client, debugBundleFiles(niFiProperties), EXCLUDE_SENSITIVE_TEXT, emptyOperandPropertiesProvider)
|
||||
));
|
||||
this.c2ClientService = new C2ClientService(
|
||||
client,
|
||||
heartbeatFactory,
|
||||
new C2OperationService(Arrays.asList(
|
||||
new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent),
|
||||
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo),
|
||||
DebugOperationHandler.create(client, debugBundleFiles(niFiProperties), EXCLUDE_SENSITIVE_TEXT)
|
||||
))
|
||||
c2OperationService
|
||||
);
|
||||
this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationService.getHandlers());
|
||||
}
|
||||
|
||||
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
|
||||
|
@ -159,12 +169,7 @@ public class C2NifiClientService {
|
|||
}
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not start C2 Client Heartbeat Reporting", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -177,7 +182,9 @@ public class C2NifiClientService {
|
|||
}
|
||||
|
||||
private RuntimeInfoWrapper generateRuntimeInfo() {
|
||||
return new RuntimeInfoWrapper(getAgentRepositories(), runtimeManifestService.getManifest(), getQueueStatus());
|
||||
AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest());
|
||||
agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations());
|
||||
return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus());
|
||||
}
|
||||
|
||||
private AgentRepositories getAgentRepositories() {
|
||||
|
|
Loading…
Reference in New Issue