NIFI-9538: Add C2 heartbeat capability to minifi-c2-service

- Added content hash code to avoid repeatedly updating with the same flow
- Gracefully handle agent classes and provide update URL to /config
- Fixed JDK 8 build issue with ConfigService

This closes #5755.

Signed-off-by: Kevin Doran <kdoran@apache.org>
This commit is contained in:
Matthew Burgess 2022-02-09 13:49:23 -05:00 committed by Kevin Doran
parent 5a2992c243
commit b5e61109f6
15 changed files with 577 additions and 21 deletions

View File

@ -18,6 +18,7 @@
package org.apache.nifi.minifi.c2.api;
import java.io.InputStream;
import java.net.URL;
/**
* Represents a MiNiFi configuration of a given version, format matches the format of the ConfigurationProvider
@ -44,4 +45,11 @@ public interface Configuration {
* @return an input stream to read the configuration with
*/
InputStream getInputStream() throws ConfigurationProviderException;
/**
* Gets the URL of the resource
*
* @return the URL of the resource
*/
URL getURL() throws ConfigurationProviderException;
}

View File

@ -26,7 +26,7 @@ limitations under the License.
<packaging>pom</packaging>
<description>This is the assembly of Apache MiNiFi's - Command And Control Server</description>
<properties>
<minifi.c2.server.port>10080</minifi.c2.server.port>
<minifi.c2.server.port>10090</minifi.c2.server.port>
<minifi.c2.server.secure>false</minifi.c2.server.secure>
<minifi.c2.server.keystore>./conf/keystore.jks</minifi.c2.server.keystore>

View File

@ -37,3 +37,31 @@ Paths:
# Default authorization lets anonymous pull any config. Remove below to change that.
- Authorization: ROLE_ANONYMOUS
Action: allow
/c2/config/heartbeat:
Default Action: deny
Actions:
- Authorization: CLASS_RASPI_3
Query Parameters:
class: raspi3
Action: allow
- Authorization: ROLE_SUPERUSER
Action: allow
# Default authorization lets anonymous pull any config. Remove below to change that.
- Authorization: ROLE_ANONYMOUS
Action: allow
/c2/config/acknowledge:
Default Action: deny
Actions:
- Authorization: CLASS_RASPI_3
Query Parameters:
class: raspi3
Action: allow
- Authorization: ROLE_SUPERUSER
Action: allow
# Default authorization lets anonymous pull any config. Remove below to change that.
- Authorization: ROLE_ANONYMOUS
Action: allow

View File

@ -25,6 +25,8 @@ import org.apache.nifi.minifi.c2.api.util.DelegatingOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@ -82,6 +84,15 @@ public class FileSystemWritableConfiguration implements WriteableConfiguration {
}
}
@Override
public URL getURL() throws ConfigurationProviderException {
try {
return path.toUri().toURL();
} catch (MalformedURLException murle) {
throw new ConfigurationProviderException("Could not determine URL of " + path, murle);
}
}
@Override
public String getName() {
return path.getFileName().toString();

View File

@ -23,13 +23,14 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
public class S3WritableConfiguration implements WriteableConfiguration {
private AmazonS3 s3;
private final AmazonS3 s3;
private final S3Object s3Object;
private final String version;
@ -81,6 +82,11 @@ public class S3WritableConfiguration implements WriteableConfiguration {
return s3Object.getObjectContent();
}
@Override
public URL getURL() throws ConfigurationProviderException {
return s3.getUrl(s3Object.getBucketName(), s3Object.getKey());
}
@Override
public String getName() {
return s3Object.getKey();

View File

@ -15,7 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>minifi-c2-provider</artifactId>
@ -53,6 +54,10 @@ limitations under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</exclusion>
</exclusions>
</dependency>

View File

@ -29,9 +29,29 @@ limitations under the License.
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-c2-api</artifactId>
<version>${project.version}</version>
<version>1.17.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>c2-protocol-api</artifactId>
<version>1.17.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.minifi.c2.configuration;
import org.apache.nifi.minifi.c2.service.C2JsonProviderFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
@ -29,6 +30,8 @@ public class C2ResourceConfig extends ResourceConfig {
public C2ResourceConfig(@Context ServletContext servletContext) {
final ApplicationContext appCtx = WebApplicationContextUtils.getWebApplicationContext(servletContext);
// register Jackson Object Mapper Resolver
register(C2JsonProviderFeature.class);
register(appCtx.getBean("configService"));
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.service;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.Provider;
@Provider
@Produces(MediaType.APPLICATION_JSON)
public class C2JsonProvider extends JacksonJaxbJsonProvider {
private static final ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public C2JsonProvider() {
super();
setMapper(objectMapper);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.service;
import javax.ws.rs.core.Feature;
import javax.ws.rs.core.FeatureContext;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
public class C2JsonProviderFeature implements Feature {
@Override
public boolean configure(FeatureContext context) {
context.register(C2JsonProvider.class, MessageBodyReader.class, MessageBodyWriter.class);
return true;
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
public class C2ProtocolContext {
private static final Logger logger = LoggerFactory.getLogger(C2ProtocolContext.class);
private static final C2ProtocolContext EMPTY = builder().build();
private final URI baseUri;
private final Long contentLength;
private final String sha256;
C2ProtocolContext(final Builder builder) {
this.baseUri = builder.baseUri;
this.contentLength = builder.contentLength;
this.sha256 = builder.sha256;
}
public URI getBaseUri() {
return baseUri;
}
public Long getContentLength() {
return contentLength;
}
public String getSha256() {
return sha256;
}
public static Builder builder() {
return new Builder();
}
public static C2ProtocolContext empty() {
return EMPTY;
}
public static class Builder {
private URI baseUri;
private Long contentLength;
private String sha256;
private Builder() {
}
public Builder baseUri(final URI baseUri) {
this.baseUri = baseUri;
return this;
}
public Builder contentLength(final Long contentLength) {
this.contentLength = contentLength;
return this;
}
public Builder contentLength(final String contentLength) {
try {
this.contentLength = Long.valueOf(contentLength);
} catch (final NumberFormatException e) {
logger.debug("Could not parse content length string: " + contentLength, e);
}
return this;
}
public Builder sha256(final String sha256) {
this.sha256 = sha256;
return this;
}
public C2ProtocolContext build() {
return new C2ProtocolContext(this);
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.service;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
public interface C2ProtocolService {
C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat, C2ProtocolContext context);
void processOperationAck(C2OperationAck operationAck, C2ProtocolContext context);
}

View File

@ -18,14 +18,19 @@
package org.apache.nifi.minifi.c2.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.swagger.annotations.ApiModel;
import org.apache.nifi.minifi.c2.api.Configuration;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.minifi.c2.api.ConfigurationProvider;
import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
import org.apache.nifi.minifi.c2.api.InvalidParameterException;
@ -35,10 +40,13 @@ import org.apache.nifi.minifi.c2.api.util.Pair;
import org.apache.nifi.minifi.c2.util.HttpRequestUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
@ -50,10 +58,11 @@ import javax.ws.rs.core.UriInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@ -61,36 +70,53 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
@Configuration
@Path("/config")
@ApiModel(
value = "/config",
description = "Provides configuration for MiNiFi instances"
description = "Provides configuration and heartbeat/acknowledge capabilities for MiNiFi instances"
)
public class ConfigService {
public static final String MESSAGE_400 = "MiNiFi C2 server was unable to complete the request because it was invalid. The request should not be retried without modification.";
private static final Logger logger = LoggerFactory.getLogger(ConfigService.class);
private final Authorizer authorizer;
private final ObjectMapper objectMapper;
private final Supplier<ConfigurationProviderInfo> configurationProviderInfo;
private final LoadingCache<ConfigurationProviderKey, ConfigurationProviderValue> configurationCache;
private final C2ProtocolService c2ProtocolService;
@Context
protected HttpServletRequest httpServletRequest;
@Context
protected UriInfo uriInfo;
public ConfigService(List<ConfigurationProvider> configurationProviders, Authorizer authorizer) {
this(configurationProviders, authorizer, 1000, 300_000);
}
public ConfigService(List<ConfigurationProvider> configurationProviders, Authorizer authorizer, long maximumCacheSize, long cacheTtlMillis) {
this.authorizer = authorizer;
this.objectMapper = new ObjectMapper();
this.authorizer = authorizer;
if (configurationProviders == null || configurationProviders.size() == 0) {
throw new IllegalArgumentException("Expected at least one configuration provider");
}
this.configurationProviderInfo = Suppliers.memoizeWithExpiration(() -> initContentTypeInfo(configurationProviders), cacheTtlMillis, TimeUnit.MILLISECONDS);
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
if (maximumCacheSize >= 0) {
cacheBuilder = cacheBuilder.maximumSize(maximumCacheSize);
cacheBuilder.maximumSize(maximumCacheSize);
}
if (cacheTtlMillis >= 0) {
cacheBuilder = cacheBuilder.refreshAfterWrite(cacheTtlMillis, TimeUnit.MILLISECONDS);
cacheBuilder.refreshAfterWrite(cacheTtlMillis, TimeUnit.MILLISECONDS);
}
this.configurationCache = cacheBuilder
.build(new CacheLoader<ConfigurationProviderKey, ConfigurationProviderValue>() {
@ -99,6 +125,7 @@ public class ConfigService {
return initConfigurationProviderValue(key);
}
});
this.c2ProtocolService = new SimpleC2ProtocolService();
}
public ConfigurationProviderValue initConfigurationProviderValue(ConfigurationProviderKey key) {
@ -147,6 +174,127 @@ public class ConfigService {
return new ConfigurationProviderInfo(mediaTypeList, contentTypes, null);
}
@POST
@Path("/heartbeat")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(
value = "An endpoint for a MiNiFi Agent to send a heartbeat to the C2 server",
response = C2HeartbeatResponse.class
)
@ApiResponses({
@ApiResponse(code = 400, message = MESSAGE_400)})
public Response heartbeat(
@Context HttpServletRequest request, @Context HttpHeaders httpHeaders, @Context UriInfo uriInfo,
@ApiParam(required = true) final C2Heartbeat heartbeat) {
try {
authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), uriInfo);
} catch (AuthorizationException e) {
logger.warn(HttpRequestUtil.getClientString(request) + " not authorized to access " + uriInfo, e);
return Response.status(403).build();
}
List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
boolean defaultAccept = false;
if (acceptValues.size() == 0) {
acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
defaultAccept = true;
}
if (logger.isDebugEnabled()) {
StringBuilder builder = new StringBuilder("Handling request from ")
.append(HttpRequestUtil.getClientString(request))
.append(" with Accept");
if (defaultAccept) {
builder.append(" default value");
}
builder.append(": ")
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(", ")));
logger.debug(builder.toString());
}
try {
final String flowId;
Response response;
final String agentClass = heartbeat.getAgentClass();
if (agentClass == null || agentClass.equals("")) {
logger.warn("No agent class provided, returning OK (200)");
response = Response.ok().build();
return response;
} else {
Map<String, List<String>> parameters = Collections.singletonMap("class", Collections.singletonList(agentClass));
ConfigurationProviderValue configurationProviderValue = configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
org.apache.nifi.minifi.c2.api.Configuration configuration;
try {
configuration = configurationProviderValue.getConfiguration();
} catch (ConfigurationProviderException cpe) {
logger.warn("No flow available for agent class " + agentClass + ", returning No Content (204)");
response = Response.noContent().build();
return response;
}
try (InputStream inputStream = configuration.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
byte[] buffer = new byte[1024];
int read;
while ((read = inputStream.read(buffer)) >= 0) {
outputStream.write(buffer, 0, read);
sha256.update(buffer, 0, read);
}
flowId = bytesToHex(sha256.digest());
} catch (ConfigurationProviderException | IOException | NoSuchAlgorithmException e) {
logger.error("Error reading or checksumming configuration file", e);
throw new WebApplicationException(500);
}
final C2ProtocolContext heartbeatContext = C2ProtocolContext.builder()
.baseUri(uriInfo.getBaseUriBuilder().path("/config").queryParam("class", agentClass).build())
.contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
.sha256(flowId)
.build();
try {
final C2HeartbeatResponse heartbeatResponse = c2ProtocolService.processHeartbeat(heartbeat, heartbeatContext);
response = Response.ok(heartbeatResponse).build();
} catch (Exception e) {
logger.error("Heartbeat processing failed", e);
response = Response.status(BAD_REQUEST).entity(e.getMessage()).build();
}
}
return response;
} catch (ExecutionException | UncheckedExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WebApplicationException) {
throw (WebApplicationException) cause;
}
logger.error(HttpRequestUtil.getClientString(request) + " made request with " + HttpRequestUtil.getQueryString(request) + " that caused error.", cause);
return Response.status(500).entity("Internal error").build();
}
}
@POST
@Path("/acknowledge")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(
value = "An endpoint for a MiNiFi Agent to send an operation acknowledgement to the C2 server"
)
@ApiResponses({
@ApiResponse(code = 400, message = MESSAGE_400)})
public Response acknowledge(
@ApiParam(required = true) final C2OperationAck operationAck) {
final C2ProtocolContext ackContext = C2ProtocolContext.builder()
.baseUri(getBaseUri())
.contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
.build();
c2ProtocolService.processOperationAck(operationAck, ackContext);
return Response.ok().build();
}
@GET
@Path("/contentTypes")
@Produces(MediaType.APPLICATION_JSON)
@ -182,14 +330,11 @@ public class ConfigService {
logger.warn(HttpRequestUtil.getClientString(request) + " not authorized to access " + uriInfo, e);
return Response.status(403).build();
}
Map<String, List<String>> parameters = new HashMap<>();
for (Map.Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
parameters.put(entry.getKey(), entry.getValue());
}
Map<String, List<String>> parameters = new HashMap<>(uriInfo.getQueryParameters());
List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
boolean defaultAccept = false;
if (acceptValues.size() == 0) {
acceptValues = Arrays.asList(MediaType.WILDCARD_TYPE);
acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
defaultAccept = true;
}
if (logger.isDebugEnabled()) {
@ -199,16 +344,16 @@ public class ConfigService {
.append(parameters)
.append(" and Accept");
if (defaultAccept) {
builder = builder.append(" default value");
builder.append(" default value");
}
builder = builder.append(": ")
builder.append(": ")
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(", ")));
logger.debug(builder.toString());
}
try {
ConfigurationProviderValue configurationProviderValue = configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
Configuration configuration = configurationProviderValue.getConfiguration();
org.apache.nifi.minifi.c2.api.Configuration configuration = configurationProviderValue.getConfiguration();
Response.ResponseBuilder ok = Response.ok();
ok = ok.header("X-Content-Version", configuration.getVersion());
ok = ok.type(configurationProviderValue.getMediaType());
@ -240,7 +385,7 @@ public class ConfigService {
} catch (ConfigurationProviderException e) {
logger.warn("Unable to get configuration.", e);
return Response.status(500).build();
} catch (ExecutionException|UncheckedExecutionException e) {
} catch (ExecutionException | UncheckedExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WebApplicationException) {
throw (WebApplicationException) cause;
@ -279,4 +424,9 @@ public class ConfigService {
"\"Accept: " + acceptValues.stream().map(Object::toString).collect(Collectors.joining(", ")) + "\" supported media types are " +
mediaTypeList.stream().map(Pair::getFirst).map(Object::toString).collect(Collectors.joining(", "))).build());
}
private URI getBaseUri() {
// Forwarded Headers are expected to have been applied as part of servlet filter chain
return uriInfo.getBaseUri();
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.service;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationState;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@Service
public class SimpleC2ProtocolService implements C2ProtocolService {
private static final Logger logger = LoggerFactory.getLogger(SimpleC2ProtocolService.class);
private static final Set<String> issuedOperationIds = new HashSet<>();
private final Map<String, String> currentFlowIds;
public SimpleC2ProtocolService() {
currentFlowIds = new HashMap<>(1000);
}
@Override
public void processOperationAck(final C2OperationAck operationAck, final C2ProtocolContext context) {
// This service assumes there is a single Operation UPDATE to pass over the updated flow
logger.debug("Received operation acknowledgement: {}; {}", operationAck, context);
// Remove the operator ID from the list of issued operations and log the state
final String operationId = operationAck.getOperationId();
try {
OperationState opState = OperationState.DONE;
String details = null;
/* Partial applications are rare and only happen when an operation consists of updating multiple config
* items and some succeed ( we don't yet have the concept of rollback in agents ).
* Fully Applied yields an operation success.
* Operation Not Understood and Not Applied give little details but also will result in Operation Failure.
* We should explore if providing textual details. */
final C2OperationState c2OperationState = operationAck.getOperationState();
if (null != c2OperationState) {
details = c2OperationState.getDetails();
if (c2OperationState.getState() != C2OperationState.OperationState.FULLY_APPLIED) {
opState = OperationState.FAILED;
}
}
if (!issuedOperationIds.remove(operationId)) {
logger.warn("Operation with ID " + operationId + " has either already been acknowledged or is unknown to this server");
} else if (null != c2OperationState) {
final C2OperationState.OperationState operationState = c2OperationState.getState();
logger.debug("Operation with ID " + operationId + " acknowledged with a state of " + operationState.name() + "(" + opState.name() + "), details = "
+ (details == null ? "" : details));
}
// Optionally, an acknowledgement can include some of the info normally passed in a heartbeat.
// If this info is present, process it as a heartbeat, so we update our latest known state of the agent.
if (operationAck.getAgentInfo() != null
|| operationAck.getDeviceInfo() != null
|| operationAck.getFlowInfo() != null) {
final C2Heartbeat heartbeatInfo = toHeartbeat(operationAck);
logger.trace("Operation acknowledgement contains additional info. Processing as heartbeat: {}", heartbeatInfo);
processHeartbeat(heartbeatInfo, context);
}
} catch (final Exception e) {
logger.warn("Encountered exception while processing operation ack", e);
}
}
@Override
public C2HeartbeatResponse processHeartbeat(final C2Heartbeat heartbeat, final C2ProtocolContext context) {
C2HeartbeatResponse c2HeartbeatResponse = new C2HeartbeatResponse();
String currentFlowId = currentFlowIds.get(heartbeat.getAgentId());
if (currentFlowId == null || !currentFlowId.equals(context.getSha256())) {
// Create a single UPDATE operation to fetch the flow from the specified URL
C2Operation c2Operation = new C2Operation();
final String operationID = UUID.randomUUID().toString();
issuedOperationIds.add(operationID);
c2Operation.setIdentifier(operationID);
c2Operation.setOperation(OperationType.UPDATE);
c2Operation.setOperand(OperandType.CONFIGURATION);
c2Operation.setArgs(Collections.singletonMap("location", context.getBaseUri().toString()));
List<C2Operation> requestedOperations = Collections.singletonList(c2Operation);
c2HeartbeatResponse.setRequestedOperations(requestedOperations);
currentFlowIds.put(heartbeat.getAgentId(), context.getSha256());
}
return c2HeartbeatResponse;
}
private static C2Heartbeat toHeartbeat(final C2OperationAck ack) {
final C2Heartbeat heartbeat = new C2Heartbeat();
heartbeat.setDeviceInfo(ack.getDeviceInfo());
heartbeat.setAgentInfo(ack.getAgentInfo());
heartbeat.setFlowInfo(ack.getFlowInfo());
return heartbeat;
}
}

View File

@ -41,7 +41,7 @@ limitations under the License.
</modules>
<properties>
<jersey.version>2.29</jersey.version>
<system.rules.version>1.16.1</system.rules.version>
<system.rules.version>1.19.0</system.rules.version>
<aws.sdk.version>1.11.172</aws.sdk.version>
<yammer.metrics.version>2.2.0</yammer.metrics.version>
</properties>