diff --git a/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java b/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java index 90fbdba346..3f78b52269 100644 --- a/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java +++ b/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.c2.api; import java.io.InputStream; +import java.net.URL; /** * Represents a MiNiFi configuration of a given version, format matches the format of the ConfigurationProvider @@ -44,4 +45,11 @@ public interface Configuration { * @return an input stream to read the configuration with */ InputStream getInputStream() throws ConfigurationProviderException; + + /** + * Gets the URL of the resource + * + * @return the URL of the resource + */ + URL getURL() throws ConfigurationProviderException; } diff --git a/minifi/minifi-c2/minifi-c2-assembly/pom.xml b/minifi/minifi-c2/minifi-c2-assembly/pom.xml index ba2553e395..148a6efe7b 100644 --- a/minifi/minifi-c2/minifi-c2-assembly/pom.xml +++ b/minifi/minifi-c2/minifi-c2-assembly/pom.xml @@ -26,7 +26,7 @@ limitations under the License. pom This is the assembly of Apache MiNiFi's - Command And Control Server - 10080 + 10090 false ./conf/keystore.jks diff --git a/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml b/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml index 5669451940..14386e5c17 100644 --- a/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml +++ b/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml @@ -37,3 +37,31 @@ Paths: # Default authorization lets anonymous pull any config. Remove below to change that. - Authorization: ROLE_ANONYMOUS Action: allow + + /c2/config/heartbeat: + Default Action: deny + Actions: + - Authorization: CLASS_RASPI_3 + Query Parameters: + class: raspi3 + Action: allow + - Authorization: ROLE_SUPERUSER + Action: allow + + # Default authorization lets anonymous pull any config. Remove below to change that. + - Authorization: ROLE_ANONYMOUS + Action: allow + + /c2/config/acknowledge: + Default Action: deny + Actions: + - Authorization: CLASS_RASPI_3 + Query Parameters: + class: raspi3 + Action: allow + - Authorization: ROLE_SUPERUSER + Action: allow + + # Default authorization lets anonymous pull any config. Remove below to change that. + - Authorization: ROLE_ANONYMOUS + Action: allow diff --git a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java index bb48dcc3df..65afe5c1fd 100644 --- a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java +++ b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java @@ -25,6 +25,8 @@ import org.apache.nifi.minifi.c2.api.util.DelegatingOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -82,6 +84,15 @@ public class FileSystemWritableConfiguration implements WriteableConfiguration { } } + @Override + public URL getURL() throws ConfigurationProviderException { + try { + return path.toUri().toURL(); + } catch (MalformedURLException murle) { + throw new ConfigurationProviderException("Could not determine URL of " + path, murle); + } + } + @Override public String getName() { return path.getFileName().toString(); diff --git a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java index 4ab62c9885..7bcf0a668b 100644 --- a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java +++ b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java @@ -23,13 +23,14 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import java.io.InputStream; import java.io.OutputStream; +import java.net.URL; import org.apache.nifi.minifi.c2.api.ConfigurationProviderException; import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration; public class S3WritableConfiguration implements WriteableConfiguration { - private AmazonS3 s3; + private final AmazonS3 s3; private final S3Object s3Object; private final String version; @@ -81,6 +82,11 @@ public class S3WritableConfiguration implements WriteableConfiguration { return s3Object.getObjectContent(); } + @Override + public URL getURL() throws ConfigurationProviderException { + return s3.getUrl(s3Object.getBucketName(), s3Object.getKey()); + } + @Override public String getName() { return s3Object.getKey(); diff --git a/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml b/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml index cec43376aa..bd5517fc71 100644 --- a/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml +++ b/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml @@ -15,7 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 minifi-c2-provider @@ -53,6 +54,10 @@ limitations under the License. org.slf4j slf4j-log4j12 + + org.glassfish.jersey.media + jersey-media-json-jackson + diff --git a/minifi/minifi-c2/minifi-c2-service/pom.xml b/minifi/minifi-c2/minifi-c2-service/pom.xml index 6fcd217313..97de3fb79e 100644 --- a/minifi/minifi-c2/minifi-c2-service/pom.xml +++ b/minifi/minifi-c2/minifi-c2-service/pom.xml @@ -29,9 +29,29 @@ limitations under the License. org.apache.nifi.minifi minifi-c2-api - ${project.version} + 1.17.0-SNAPSHOT provided + + org.apache.nifi + c2-protocol-api + 1.17.0-SNAPSHOT + + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + provided + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.core + jackson-core + + + org.springframework spring-context diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java index 5ed07b23be..95dc5f620c 100644 --- a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java @@ -17,6 +17,7 @@ package org.apache.nifi.minifi.c2.configuration; +import org.apache.nifi.minifi.c2.service.C2JsonProviderFeature; import org.glassfish.jersey.server.ResourceConfig; import org.springframework.context.ApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; @@ -29,6 +30,8 @@ public class C2ResourceConfig extends ResourceConfig { public C2ResourceConfig(@Context ServletContext servletContext) { final ApplicationContext appCtx = WebApplicationContextUtils.getWebApplicationContext(servletContext); + // register Jackson Object Mapper Resolver + register(C2JsonProviderFeature.class); register(appCtx.getBean("configService")); } } diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java new file mode 100644 index 0000000000..c12cfc9e74 --- /dev/null +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.service; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider; + +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.ext.Provider; + +@Provider +@Produces(MediaType.APPLICATION_JSON) +public class C2JsonProvider extends JacksonJaxbJsonProvider { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + static { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public C2JsonProvider() { + super(); + setMapper(objectMapper); + } + + + +} \ No newline at end of file diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProviderFeature.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProviderFeature.java new file mode 100644 index 0000000000..e40e8201ad --- /dev/null +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProviderFeature.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.service; + +import javax.ws.rs.core.Feature; +import javax.ws.rs.core.FeatureContext; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.MessageBodyWriter; + +public class C2JsonProviderFeature implements Feature { + + @Override + public boolean configure(FeatureContext context) { + context.register(C2JsonProvider.class, MessageBodyReader.class, MessageBodyWriter.class); + return true; + } +} diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolContext.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolContext.java new file mode 100644 index 0000000000..e09af8e7b2 --- /dev/null +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolContext.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class C2ProtocolContext { + private static final Logger logger = LoggerFactory.getLogger(C2ProtocolContext.class); + + private static final C2ProtocolContext EMPTY = builder().build(); + + private final URI baseUri; + private final Long contentLength; + private final String sha256; + + C2ProtocolContext(final Builder builder) { + this.baseUri = builder.baseUri; + this.contentLength = builder.contentLength; + this.sha256 = builder.sha256; + } + + public URI getBaseUri() { + return baseUri; + } + + public Long getContentLength() { + return contentLength; + } + + public String getSha256() { + return sha256; + } + + public static Builder builder() { + return new Builder(); + } + + public static C2ProtocolContext empty() { + return EMPTY; + } + + public static class Builder { + + private URI baseUri; + private Long contentLength; + private String sha256; + + private Builder() { + } + + public Builder baseUri(final URI baseUri) { + this.baseUri = baseUri; + return this; + } + + public Builder contentLength(final Long contentLength) { + this.contentLength = contentLength; + return this; + } + + public Builder contentLength(final String contentLength) { + try { + this.contentLength = Long.valueOf(contentLength); + } catch (final NumberFormatException e) { + logger.debug("Could not parse content length string: " + contentLength, e); + } + return this; + } + + public Builder sha256(final String sha256) { + this.sha256 = sha256; + return this; + } + + public C2ProtocolContext build() { + return new C2ProtocolContext(this); + } + } +} diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolService.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolService.java new file mode 100644 index 0000000000..756c92065a --- /dev/null +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.service; + +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; +import org.apache.nifi.c2.protocol.api.C2OperationAck; + +public interface C2ProtocolService { + C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat, C2ProtocolContext context); + + void processOperationAck(C2OperationAck operationAck, C2ProtocolContext context); +} diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java index 0732befcd7..9b70fba527 100644 --- a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java @@ -18,14 +18,19 @@ package org.apache.nifi.minifi.c2.service; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.UncheckedExecutionException; import io.swagger.annotations.ApiModel; -import org.apache.nifi.minifi.c2.api.Configuration; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; +import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.minifi.c2.api.ConfigurationProvider; import org.apache.nifi.minifi.c2.api.ConfigurationProviderException; import org.apache.nifi.minifi.c2.api.InvalidParameterException; @@ -35,10 +40,13 @@ import org.apache.nifi.minifi.c2.api.util.Pair; import org.apache.nifi.minifi.c2.util.HttpRequestUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Configuration; import org.springframework.security.core.context.SecurityContextHolder; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; @@ -50,10 +58,11 @@ import javax.ws.rs.core.UriInfo; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -61,36 +70,53 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; +import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; + +@Configuration @Path("/config") @ApiModel( value = "/config", - description = "Provides configuration for MiNiFi instances" + description = "Provides configuration and heartbeat/acknowledge capabilities for MiNiFi instances" ) public class ConfigService { + + public static final String MESSAGE_400 = "MiNiFi C2 server was unable to complete the request because it was invalid. The request should not be retried without modification."; + private static final Logger logger = LoggerFactory.getLogger(ConfigService.class); private final Authorizer authorizer; private final ObjectMapper objectMapper; private final Supplier configurationProviderInfo; private final LoadingCache configurationCache; + private final C2ProtocolService c2ProtocolService; + + @Context + protected HttpServletRequest httpServletRequest; + + @Context + protected UriInfo uriInfo; public ConfigService(List configurationProviders, Authorizer authorizer) { this(configurationProviders, authorizer, 1000, 300_000); } + public ConfigService(List configurationProviders, Authorizer authorizer, long maximumCacheSize, long cacheTtlMillis) { - this.authorizer = authorizer; this.objectMapper = new ObjectMapper(); + + this.authorizer = authorizer; if (configurationProviders == null || configurationProviders.size() == 0) { throw new IllegalArgumentException("Expected at least one configuration provider"); } this.configurationProviderInfo = Suppliers.memoizeWithExpiration(() -> initContentTypeInfo(configurationProviders), cacheTtlMillis, TimeUnit.MILLISECONDS); CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); if (maximumCacheSize >= 0) { - cacheBuilder = cacheBuilder.maximumSize(maximumCacheSize); + cacheBuilder.maximumSize(maximumCacheSize); } if (cacheTtlMillis >= 0) { - cacheBuilder = cacheBuilder.refreshAfterWrite(cacheTtlMillis, TimeUnit.MILLISECONDS); + cacheBuilder.refreshAfterWrite(cacheTtlMillis, TimeUnit.MILLISECONDS); } this.configurationCache = cacheBuilder .build(new CacheLoader() { @@ -99,6 +125,7 @@ public class ConfigService { return initConfigurationProviderValue(key); } }); + this.c2ProtocolService = new SimpleC2ProtocolService(); } public ConfigurationProviderValue initConfigurationProviderValue(ConfigurationProviderKey key) { @@ -147,6 +174,127 @@ public class ConfigService { return new ConfigurationProviderInfo(mediaTypeList, contentTypes, null); } + @POST + @Path("/heartbeat") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation( + value = "An endpoint for a MiNiFi Agent to send a heartbeat to the C2 server", + response = C2HeartbeatResponse.class + ) + @ApiResponses({ + @ApiResponse(code = 400, message = MESSAGE_400)}) + public Response heartbeat( + @Context HttpServletRequest request, @Context HttpHeaders httpHeaders, @Context UriInfo uriInfo, + @ApiParam(required = true) final C2Heartbeat heartbeat) { + + try { + authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), uriInfo); + } catch (AuthorizationException e) { + logger.warn(HttpRequestUtil.getClientString(request) + " not authorized to access " + uriInfo, e); + return Response.status(403).build(); + } + + List acceptValues = httpHeaders.getAcceptableMediaTypes(); + boolean defaultAccept = false; + if (acceptValues.size() == 0) { + acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE); + defaultAccept = true; + } + if (logger.isDebugEnabled()) { + StringBuilder builder = new StringBuilder("Handling request from ") + .append(HttpRequestUtil.getClientString(request)) + .append(" with Accept"); + if (defaultAccept) { + builder.append(" default value"); + } + builder.append(": ") + .append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(", "))); + logger.debug(builder.toString()); + } + + try { + final String flowId; + Response response; + final String agentClass = heartbeat.getAgentClass(); + if (agentClass == null || agentClass.equals("")) { + logger.warn("No agent class provided, returning OK (200)"); + response = Response.ok().build(); + return response; + } else { + Map> parameters = Collections.singletonMap("class", Collections.singletonList(agentClass)); + ConfigurationProviderValue configurationProviderValue = configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters)); + org.apache.nifi.minifi.c2.api.Configuration configuration; + try { + configuration = configurationProviderValue.getConfiguration(); + } catch (ConfigurationProviderException cpe) { + logger.warn("No flow available for agent class " + agentClass + ", returning No Content (204)"); + response = Response.noContent().build(); + return response; + } + try (InputStream inputStream = configuration.getInputStream(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + MessageDigest sha256 = MessageDigest.getInstance("SHA-256"); + byte[] buffer = new byte[1024]; + int read; + while ((read = inputStream.read(buffer)) >= 0) { + outputStream.write(buffer, 0, read); + sha256.update(buffer, 0, read); + } + flowId = bytesToHex(sha256.digest()); + + } catch (ConfigurationProviderException | IOException | NoSuchAlgorithmException e) { + logger.error("Error reading or checksumming configuration file", e); + throw new WebApplicationException(500); + } + final C2ProtocolContext heartbeatContext = C2ProtocolContext.builder() + .baseUri(uriInfo.getBaseUriBuilder().path("/config").queryParam("class", agentClass).build()) + .contentLength(httpServletRequest.getHeader(CONTENT_LENGTH)) + .sha256(flowId) + .build(); + + try { + final C2HeartbeatResponse heartbeatResponse = c2ProtocolService.processHeartbeat(heartbeat, heartbeatContext); + response = Response.ok(heartbeatResponse).build(); + } catch (Exception e) { + logger.error("Heartbeat processing failed", e); + response = Response.status(BAD_REQUEST).entity(e.getMessage()).build(); + } + } + return response; + } catch (ExecutionException | UncheckedExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof WebApplicationException) { + throw (WebApplicationException) cause; + } + logger.error(HttpRequestUtil.getClientString(request) + " made request with " + HttpRequestUtil.getQueryString(request) + " that caused error.", cause); + return Response.status(500).entity("Internal error").build(); + } + } + + @POST + @Path("/acknowledge") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation( + value = "An endpoint for a MiNiFi Agent to send an operation acknowledgement to the C2 server" + ) + @ApiResponses({ + @ApiResponse(code = 400, message = MESSAGE_400)}) + public Response acknowledge( + @ApiParam(required = true) final C2OperationAck operationAck) { + + final C2ProtocolContext ackContext = C2ProtocolContext.builder() + .baseUri(getBaseUri()) + .contentLength(httpServletRequest.getHeader(CONTENT_LENGTH)) + .build(); + + c2ProtocolService.processOperationAck(operationAck, ackContext); + + return Response.ok().build(); + + } + @GET @Path("/contentTypes") @Produces(MediaType.APPLICATION_JSON) @@ -182,14 +330,11 @@ public class ConfigService { logger.warn(HttpRequestUtil.getClientString(request) + " not authorized to access " + uriInfo, e); return Response.status(403).build(); } - Map> parameters = new HashMap<>(); - for (Map.Entry> entry : uriInfo.getQueryParameters().entrySet()) { - parameters.put(entry.getKey(), entry.getValue()); - } + Map> parameters = new HashMap<>(uriInfo.getQueryParameters()); List acceptValues = httpHeaders.getAcceptableMediaTypes(); boolean defaultAccept = false; if (acceptValues.size() == 0) { - acceptValues = Arrays.asList(MediaType.WILDCARD_TYPE); + acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE); defaultAccept = true; } if (logger.isDebugEnabled()) { @@ -199,16 +344,16 @@ public class ConfigService { .append(parameters) .append(" and Accept"); if (defaultAccept) { - builder = builder.append(" default value"); + builder.append(" default value"); } - builder = builder.append(": ") + builder.append(": ") .append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(", "))); logger.debug(builder.toString()); } try { ConfigurationProviderValue configurationProviderValue = configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters)); - Configuration configuration = configurationProviderValue.getConfiguration(); + org.apache.nifi.minifi.c2.api.Configuration configuration = configurationProviderValue.getConfiguration(); Response.ResponseBuilder ok = Response.ok(); ok = ok.header("X-Content-Version", configuration.getVersion()); ok = ok.type(configurationProviderValue.getMediaType()); @@ -240,7 +385,7 @@ public class ConfigService { } catch (ConfigurationProviderException e) { logger.warn("Unable to get configuration.", e); return Response.status(500).build(); - } catch (ExecutionException|UncheckedExecutionException e) { + } catch (ExecutionException | UncheckedExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof WebApplicationException) { throw (WebApplicationException) cause; @@ -279,4 +424,9 @@ public class ConfigService { "\"Accept: " + acceptValues.stream().map(Object::toString).collect(Collectors.joining(", ")) + "\" supported media types are " + mediaTypeList.stream().map(Pair::getFirst).map(Object::toString).collect(Collectors.joining(", "))).build()); } + + private URI getBaseUri() { + // Forwarded Headers are expected to have been applied as part of servlet filter chain + return uriInfo.getBaseUri(); + } } diff --git a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java new file mode 100644 index 0000000000..c26e0449bf --- /dev/null +++ b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.service; + +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationState; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +@Service +public class SimpleC2ProtocolService implements C2ProtocolService { + + private static final Logger logger = LoggerFactory.getLogger(SimpleC2ProtocolService.class); + + private static final Set issuedOperationIds = new HashSet<>(); + + private final Map currentFlowIds; + + public SimpleC2ProtocolService() { + currentFlowIds = new HashMap<>(1000); + } + + @Override + public void processOperationAck(final C2OperationAck operationAck, final C2ProtocolContext context) { + // This service assumes there is a single Operation UPDATE to pass over the updated flow + logger.debug("Received operation acknowledgement: {}; {}", operationAck, context); + // Remove the operator ID from the list of issued operations and log the state + final String operationId = operationAck.getOperationId(); + try { + OperationState opState = OperationState.DONE; + String details = null; + + /* Partial applications are rare and only happen when an operation consists of updating multiple config + * items and some succeed ( we don't yet have the concept of rollback in agents ). + * Fully Applied yields an operation success. + * Operation Not Understood and Not Applied give little details but also will result in Operation Failure. + * We should explore if providing textual details. */ + final C2OperationState c2OperationState = operationAck.getOperationState(); + if (null != c2OperationState) { + details = c2OperationState.getDetails(); + if (c2OperationState.getState() != C2OperationState.OperationState.FULLY_APPLIED) { + opState = OperationState.FAILED; + } + } + + if (!issuedOperationIds.remove(operationId)) { + logger.warn("Operation with ID " + operationId + " has either already been acknowledged or is unknown to this server"); + } else if (null != c2OperationState) { + final C2OperationState.OperationState operationState = c2OperationState.getState(); + logger.debug("Operation with ID " + operationId + " acknowledged with a state of " + operationState.name() + "(" + opState.name() + "), details = " + + (details == null ? "" : details)); + } + + // Optionally, an acknowledgement can include some of the info normally passed in a heartbeat. + // If this info is present, process it as a heartbeat, so we update our latest known state of the agent. + if (operationAck.getAgentInfo() != null + || operationAck.getDeviceInfo() != null + || operationAck.getFlowInfo() != null) { + final C2Heartbeat heartbeatInfo = toHeartbeat(operationAck); + logger.trace("Operation acknowledgement contains additional info. Processing as heartbeat: {}", heartbeatInfo); + processHeartbeat(heartbeatInfo, context); + } + + } catch (final Exception e) { + logger.warn("Encountered exception while processing operation ack", e); + } + } + + @Override + public C2HeartbeatResponse processHeartbeat(final C2Heartbeat heartbeat, final C2ProtocolContext context) { + + C2HeartbeatResponse c2HeartbeatResponse = new C2HeartbeatResponse(); + String currentFlowId = currentFlowIds.get(heartbeat.getAgentId()); + if (currentFlowId == null || !currentFlowId.equals(context.getSha256())) { + // Create a single UPDATE operation to fetch the flow from the specified URL + C2Operation c2Operation = new C2Operation(); + final String operationID = UUID.randomUUID().toString(); + issuedOperationIds.add(operationID); + c2Operation.setIdentifier(operationID); + c2Operation.setOperation(OperationType.UPDATE); + c2Operation.setOperand(OperandType.CONFIGURATION); + c2Operation.setArgs(Collections.singletonMap("location", context.getBaseUri().toString())); + List requestedOperations = Collections.singletonList(c2Operation); + c2HeartbeatResponse.setRequestedOperations(requestedOperations); + currentFlowIds.put(heartbeat.getAgentId(), context.getSha256()); + } + + return c2HeartbeatResponse; + } + + private static C2Heartbeat toHeartbeat(final C2OperationAck ack) { + final C2Heartbeat heartbeat = new C2Heartbeat(); + heartbeat.setDeviceInfo(ack.getDeviceInfo()); + heartbeat.setAgentInfo(ack.getAgentInfo()); + heartbeat.setFlowInfo(ack.getFlowInfo()); + return heartbeat; + } +} diff --git a/minifi/pom.xml b/minifi/pom.xml index 7a55bc8cdd..796d4003a7 100644 --- a/minifi/pom.xml +++ b/minifi/pom.xml @@ -41,7 +41,7 @@ limitations under the License. 2.29 - 1.16.1 + 1.19.0 1.11.172 2.2.0