diff --git a/c2/c2-client-bundle/README.md b/c2/c2-client-bundle/README.md
new file mode 100644
index 0000000000..90915076d5
--- /dev/null
+++ b/c2/c2-client-bundle/README.md
@@ -0,0 +1,61 @@
+
+## Apache NiFi MiNiFi Command and Control (C2) Client
+The c2-client-bundle provides implementation for the client aspect of the [C2 Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design). The essence of the implementation is the heartbeat construction and the communication with the [C2 server](../../../../minifi-c2/README.md) via the C2HttpClient.
+
+Currently, relying on the [C2 Protocol API](../c2-protocol) is limited to sending heartbeats and processing/acknowledging UPDATE configuration operation in the response (if any). When exposed the new configuration will be downloaded and passed back to the system using the C2 protocol.
+
+When C2 is enabled, C2ClientService will be scheduled to send heartbeats periodically, so the C2 Server can notify the client about any operations that is defined by the protocol and needs to be executed on the client side.
+
+Using the client means that configuration changes and other operations can be triggered and controlled centrally via the C2 server making the management of clients more simple and configuring them more flexible. The client supports bidirectional TLS authentication.
+
+### Configuration
+To use the client, the parameters coming from `C2ClientConfig` need to be properly set (this configuration class is also used for instantiating `C2HeartbeatFactory` and `C2HttpClient`)
+
+```
+ # The C2 Server endpoint where the heartbeat is sent
+ private final String c2Url;
+
+ # The C2 Server endpoint where the acknowledge is sent
+ private final String c2AckUrl;
+
+ # The class the agent belongs to (flow definition is tied to agent class on the server side)
+ private final String agentClass;
+
+ # Unique identifier for the agent if not provided it will be generated
+ private final String agentIdentifier;
+
+ # Directory where persistent configuration (e.g.: generated agent and device id will be persisted)
+ private final String confDirectory;
+
+ # Property of RuntimeManifest defined in c2-protocol. A unique identifier for the manifest
+ private final String runtimeManifestIdentifier;
+
+ # Property of RuntimeManifest defined in c2-protocol. The type of the runtime binary. Usually set when the runtime is built
+ private final String runtimeType;
+
+ # The frequency of sending the heartbeats. This property is used by the c2-client-bundle user who should schedule the client
+ private final Long heartbeatPeriod;
+
+ # Security properties for communication with the C2 Server
+ private final String keystoreFilename;
+ private final String keystorePass;
+ private final String keyPass;
+ private final KeystoreType keystoreType;
+ private final String truststoreFilename;
+ private final String truststorePass;
+ private final KeystoreType truststoreType;
+ private final HostnameVerifier hostnameVerifier;
+```
\ No newline at end of file
diff --git a/c2/c2-client-bundle/c2-client-api/pom.xml b/c2/c2-client-bundle/c2-client-api/pom.xml
new file mode 100644
index 0000000000..4a9dca9626
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-api/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+ 4.0.0
+
+
+ c2-client-bundle
+ org.apache.nifi
+ 1.17.0-SNAPSHOT
+
+
+ c2-client-api
+ jar
+
+
+
+ org.apache.nifi
+ c2-protocol-api
+ 1.17.0-SNAPSHOT
+
+
+
diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java
new file mode 100644
index 0000000000..e9f61e1e94
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.api;
+
+import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+
+/**
+ * Defines interface methods used to implement a C2 Client. The controller can be application-specific but is used for such tasks as updating the flow.
+ */
+public interface C2Client {
+
+ /**
+ * Responsible for sending the C2Heartbeat to the C2 Server
+ *
+ * @param heartbeat the heartbeat to be sent
+ * @return optional response from the C2 Server if the response arrived it will be populated
+ */
+ Optional publishHeartbeat(C2Heartbeat heartbeat);
+
+ /**
+ * Retrive the content of the new flow from the C2 Server
+ *
+ * @param flowUpdateUrl url where the content should be downloaded from
+ * @return the actual downloaded content. Will be empty if no content can be downloaded
+ */
+ Optional retrieveUpdateContent(String flowUpdateUrl);
+
+ /**
+ * After operation completed the acknowledge to be sent to the C2 Server
+ *
+ * @param operationAck the acknowledge details to be sent
+ */
+ void acknowledgeOperation(C2OperationAck operationAck);
+}
diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Serializer.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Serializer.java
new file mode 100644
index 0000000000..6f98a09fd5
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Serializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.api;
+
+import java.util.Optional;
+
+/**
+ * Helper class to support central configuration and functionality for serialisation / deserialisation
+ */
+public interface C2Serializer {
+
+ /**
+ * Helper to serialise object
+ *
+ * @param content object to be serialised
+ * @param the type of the object
+ * @return the serialised string representation of the parameter object if it was successful empty otherwise
+ */
+ Optional serialize(T content);
+
+ /**
+ * Helper to deserialise an object
+ *
+ * @param content the string representation of the object to be deserialsed
+ * @param valueType the class of the target object
+ * @param the type of the target object
+ * @return the deserialised object if successful empty otherwise
+ */
+ Optional deserialize(String content, Class valueType);
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/ConfigurationFileHolder.java
similarity index 81%
rename from minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
rename to c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/ConfigurationFileHolder.java
index d5113e3597..ab43ff79b8 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
+++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/ConfigurationFileHolder.java
@@ -15,12 +15,20 @@
* limitations under the License.
*/
-package org.apache.nifi.minifi.bootstrap;
+package org.apache.nifi.c2.client.api;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
+/**
+ * Should be implemented by the class which bootstraps the agent.
+ */
public interface ConfigurationFileHolder {
+ /**
+ * Retrieve the reference to the config file
+ *
+ * @return config file reference
+ */
AtomicReference getConfigFileReference();
}
diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/Differentiator.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/Differentiator.java
new file mode 100644
index 0000000000..7f529becbc
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/Differentiator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.api;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Helper to support differentiating between config files to recognise changes
+ *
+ * @param the type of the config files
+ */
+public interface Differentiator {
+
+ /**
+ * Initialise the differentiator with the initial configuration
+ *
+ * @param properties the properties to be used
+ * @param configurationFileHolder holder for the config file
+ */
+ void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder);
+
+ /**
+ * Determine whether the config file changed
+ *
+ * @param input the conetnt of the new config file
+ * @return true if changed and false if not
+ * @throws IOException when there is a config file reading related error
+ */
+ boolean isNew(T input) throws IOException;
+}
diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/IdGenerator.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/IdGenerator.java
new file mode 100644
index 0000000000..ca63df4bac
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/IdGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.api;
+
+/**
+ * Id generator to be used in case user is not providing an optional id
+ */
+public interface IdGenerator {
+
+ /**
+ * Generate a random id
+ *
+ * @return the generated id
+ */
+ String generate();
+}
diff --git a/c2/c2-client-bundle/c2-client-base/pom.xml b/c2/c2-client-bundle/c2-client-base/pom.xml
new file mode 100644
index 0000000000..1e9464ecd5
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-base/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+ 4.0.0
+
+
+ c2-client-bundle
+ org.apache.nifi
+ 1.17.0-SNAPSHOT
+
+
+ c2-client-base
+ jar
+
+
+
+ org.apache.nifi
+ c2-client-api
+ 1.17.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ c2-protocol-api
+ 1.17.0-SNAPSHOT
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+
diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
new file mode 100644
index 0000000000..5d7481dd9d
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client;
+
+/**
+ * Configuration for a C2 Client.
+ */
+public class C2ClientConfig {
+
+ private final String c2Url;
+ private final String c2AckUrl;
+ private final String agentClass;
+ private final String agentIdentifier;
+ private final String confDirectory;
+ private final String runtimeManifestIdentifier;
+ private final String runtimeType;
+ private final long heartbeatPeriod;
+ private final String keystoreFilename;
+ private final String keystorePass;
+ private final String keyPass;
+ private final String keystoreType;
+ private final String truststoreFilename;
+ private final String truststorePass;
+ private final String truststoreType;
+ private final long callTimeout;
+ private final long readTimeout;
+ private final long connectTimeout;
+
+
+ private C2ClientConfig(final Builder builder) {
+ this.c2Url = builder.c2Url;
+ this.c2AckUrl = builder.c2AckUrl;
+ this.agentClass = builder.agentClass;
+ this.agentIdentifier = builder.agentIdentifier;
+ this.confDirectory = builder.confDirectory;
+ this.runtimeManifestIdentifier = builder.runtimeManifestIdentifier;
+ this.runtimeType = builder.runtimeType;
+ this.heartbeatPeriod = builder.heartbeatPeriod;
+ this.callTimeout = builder.callTimeout;
+ this.keystoreFilename = builder.keystoreFilename;
+ this.keystorePass = builder.keystorePass;
+ this.keyPass = builder.keyPass;
+ this.keystoreType = builder.keystoreType;
+ this.truststoreFilename = builder.truststoreFilename;
+ this.truststorePass = builder.truststorePass;
+ this.truststoreType = builder.truststoreType;
+ this.readTimeout = builder.readTimeout;
+ this.connectTimeout = builder.connectTimeout;
+ }
+
+ public String getC2Url() {
+ return c2Url;
+ }
+
+ public String getC2AckUrl() {
+ return c2AckUrl;
+ }
+
+ public String getAgentClass() {
+ return agentClass;
+ }
+
+ public String getAgentIdentifier() {
+ return agentIdentifier;
+ }
+
+ public String getConfDirectory() {
+ return confDirectory;
+ }
+
+ public String getRuntimeManifestIdentifier() {
+ return runtimeManifestIdentifier;
+ }
+
+ public String getRuntimeType() {
+ return runtimeType;
+ }
+
+ public long getHeartbeatPeriod() {
+ return heartbeatPeriod;
+ }
+
+ public long getCallTimeout() {
+ return callTimeout;
+ }
+
+ public String getKeystoreFilename() {
+ return keystoreFilename;
+ }
+
+ public String getKeystorePass() {
+ return keystorePass;
+ }
+
+ public String getKeyPass() {
+ return keyPass;
+ }
+
+ public String getKeystoreType() {
+ return keystoreType;
+ }
+
+ public String getTruststoreFilename() {
+ return truststoreFilename;
+ }
+
+ public String getTruststorePass() {
+ return truststorePass;
+ }
+
+ public String getTruststoreType() {
+ return truststoreType;
+ }
+
+ public long getReadTimeout() {
+ return readTimeout;
+ }
+
+ public long getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ /**
+ * Builder for client configuration.
+ */
+ public static class Builder {
+
+ private String c2Url;
+ private String c2AckUrl;
+ private String agentClass;
+ private String agentIdentifier;
+ private String confDirectory;
+ private String runtimeManifestIdentifier;
+ private String runtimeType;
+ private long heartbeatPeriod;
+ private long callTimeout;
+ private String keystoreFilename;
+ private String keystorePass;
+ private String keyPass;
+ private String keystoreType;
+ private String truststoreFilename;
+ private String truststorePass;
+ private String truststoreType;
+ private long readTimeout;
+ private long connectTimeout;
+
+ public Builder c2Url(final String c2Url) {
+ this.c2Url = c2Url;
+ return this;
+ }
+
+ public Builder c2AckUrl(final String c2AckUrl) {
+ this.c2AckUrl = c2AckUrl;
+ return this;
+ }
+
+ public Builder agentClass(final String agentClass) {
+ this.agentClass = agentClass;
+ return this;
+ }
+
+ public Builder agentIdentifier(final String agentIdentifier) {
+ this.agentIdentifier = agentIdentifier;
+ return this;
+ }
+
+ public Builder confDirectory(final String confDirectory) {
+ this.confDirectory = confDirectory;
+ return this;
+ }
+
+ public Builder runtimeManifestIdentifier(final String runtimeManifestIdentifier) {
+ this.runtimeManifestIdentifier = runtimeManifestIdentifier;
+ return this;
+ }
+
+ public Builder runtimeType(final String runtimeType) {
+ this.runtimeType = runtimeType;
+ return this;
+ }
+
+ public Builder heartbeatPeriod(final long heartbeatPeriod) {
+ this.heartbeatPeriod = heartbeatPeriod;
+ return this;
+ }
+
+ public Builder callTimeout(final long callTimeout) {
+ this.callTimeout = callTimeout;
+ return this;
+ }
+
+ public Builder keystoreFilename(final String keystoreFilename) {
+ this.keystoreFilename = keystoreFilename;
+ return this;
+ }
+
+ public Builder keystorePassword(final String keystorePass) {
+ this.keystorePass = keystorePass;
+ return this;
+ }
+
+ public Builder keyPassword(final String keyPass) {
+ this.keyPass = keyPass;
+ return this;
+ }
+
+ public Builder keystoreType(final String keystoreType) {
+ this.keystoreType = keystoreType;
+ return this;
+ }
+
+ public Builder truststoreFilename(final String truststoreFilename) {
+ this.truststoreFilename = truststoreFilename;
+ return this;
+ }
+
+ public Builder truststorePassword(final String truststorePass) {
+ this.truststorePass = truststorePass;
+ return this;
+ }
+
+ public Builder truststoreType(final String truststoreType) {
+ this.truststoreType = truststoreType;
+ return this;
+ }
+
+ public Builder readTimeout(final long readTimeout) {
+ this.readTimeout = readTimeout;
+ return this;
+ }
+
+ public Builder connectTimeout(final long connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ public C2ClientConfig build() {
+ return new C2ClientConfig(this);
+ }
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/PersistentUuidGenerator.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/PersistentUuidGenerator.java
new file mode 100644
index 0000000000..9c645180e1
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/PersistentUuidGenerator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client;
+
+import org.apache.nifi.c2.client.api.IdGenerator;
+import org.apache.nifi.c2.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+public class PersistentUuidGenerator implements IdGenerator {
+
+ private static final Logger logger = LoggerFactory.getLogger(PersistentUuidGenerator.class);
+
+ private final File persistenceLocation;
+
+ public PersistentUuidGenerator(final File persistenceLocation) {
+ this.persistenceLocation = persistenceLocation;
+ }
+
+ @Override
+ public String generate() {
+ if (this.persistenceLocation.exists()) {
+ return readFile();
+ } else {
+ return makeFile();
+ }
+ }
+
+ private String readFile() {
+ try {
+ final List fileLines = Files.readAllLines(persistenceLocation.toPath());
+ if (fileLines.size() != 1) {
+ throw new IllegalStateException(String.format("The file %s for the persisted identifier has the incorrect format.", persistenceLocation));
+ }
+ final String uuid = fileLines.get(0);
+ return uuid;
+ } catch (IOException e) {
+ throw new IllegalStateException(String.format("Could not read file %s for persisted identifier.", persistenceLocation), e);
+
+ }
+ }
+
+ private String makeFile() {
+ try {
+ final File parentDirectory = persistenceLocation.getParentFile();
+ FileUtils.ensureDirectoryExistAndCanAccess(parentDirectory);
+ final String uuid = UUID.randomUUID().toString();
+ Files.write(persistenceLocation.toPath(), Arrays.asList(uuid));
+ logger.debug("Created identifier {} at {}", uuid, persistenceLocation);
+ return uuid;
+ } catch (IOException e) {
+ throw new IllegalStateException(String.format("Could not create file %s as persistence file.", persistenceLocation), e);
+ }
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java
new file mode 100644
index 0000000000..1d8317d787
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.serializer;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Optional;
+import org.apache.nifi.c2.client.api.C2Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2JacksonSerializer implements C2Serializer {
+
+ private static final Logger logger = LoggerFactory.getLogger(C2JacksonSerializer.class);
+
+ private final ObjectMapper objectMapper;
+
+ public C2JacksonSerializer() {
+ objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ objectMapper.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
+ }
+
+ @Override
+ public Optional serialize(T object) {
+ if (object == null) {
+ logger.trace("C2 Object was null. Nothing to serialize. Returning empty.");
+ return Optional.empty();
+ }
+
+ String contentString = null;
+ try {
+ contentString = objectMapper.writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ logger.error("Object serialization to JSON failed", e);
+ }
+
+ return Optional.ofNullable(contentString);
+ }
+
+ @Override
+ public Optional deserialize(String content, Class valueType) {
+ if (content == null) {
+ logger.trace("Content for deserialization was null. Returning empty.");
+ return Optional.empty();
+ }
+
+ T responseObject = null;
+ try {
+ responseObject = objectMapper.readValue(content, valueType);
+ } catch (JsonProcessingException e) {
+ logger.error("Object deserialization from JSON failed", e);
+ }
+
+ return Optional.ofNullable(responseObject);
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/FileUtils.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/FileUtils.java
new file mode 100644
index 0000000000..fbab93cb64
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/FileUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.util;
+
+import java.io.File;
+import java.io.IOException;
+
+public class FileUtils {
+
+ public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException {
+ if (dir.exists() && !dir.isDirectory()) {
+ throw new IOException(dir.getAbsolutePath() + " is not a directory");
+ } else if (!dir.exists()) {
+ final boolean made = dir.mkdirs();
+ if (!made) {
+ throw new IOException(dir.getAbsolutePath() + " could not be created");
+ }
+ }
+ if (!(dir.canRead() && dir.canWrite())) {
+ throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege");
+ }
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-http/pom.xml b/c2/c2-client-bundle/c2-client-http/pom.xml
new file mode 100644
index 0000000000..75076c051e
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-http/pom.xml
@@ -0,0 +1,51 @@
+
+
+
+ 4.0.0
+
+
+ c2-client-bundle
+ org.apache.nifi
+ 1.17.0-SNAPSHOT
+
+
+ c2-client-http
+ jar
+
+
+
+ org.apache.nifi
+ c2-client-api
+ 1.17.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ c2-client-base
+ 1.17.0-SNAPSHOT
+
+
+ com.squareup.okhttp3
+ okhttp
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+
+
+
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
new file mode 100644
index 0000000000..7f9b1412af
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.http;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okhttp3.logging.HttpLoggingInterceptor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.c2.client.C2ClientConfig;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.api.C2Serializer;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2HttpClient implements C2Client {
+
+ private static final Logger logger = LoggerFactory.getLogger(C2HttpClient.class);
+ private static final MediaType MEDIA_TYPE_APPLICATION_JSON = MediaType.parse("application/json");
+
+ private final AtomicReference httpClientReference = new AtomicReference<>();
+ private final C2ClientConfig clientConfig;
+ private final C2Serializer serializer;
+
+ public C2HttpClient(C2ClientConfig clientConfig, C2Serializer serializer) {
+ super();
+ this.clientConfig = clientConfig;
+ this.serializer = serializer;
+ final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
+
+ // Configure request and response logging
+ HttpLoggingInterceptor logging = new HttpLoggingInterceptor(logger::debug);
+ logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
+ okHttpClientBuilder.addInterceptor(logging);
+
+ // Set whether to follow redirects
+ okHttpClientBuilder.followRedirects(true);
+
+ // Timeouts
+ okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);
+ okHttpClientBuilder.readTimeout(clientConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
+ okHttpClientBuilder.callTimeout(clientConfig.getCallTimeout(), TimeUnit.MILLISECONDS);
+
+ // check if the ssl path is set and add the factory if so
+ if (StringUtils.isNotBlank(clientConfig.getKeystoreFilename())) {
+ try {
+ setSslSocketFactory(okHttpClientBuilder);
+ } catch (Exception e) {
+ throw new IllegalStateException("OkHttp TLS configuration failed", e);
+ }
+ }
+
+ httpClientReference.set(okHttpClientBuilder.build());
+ }
+
+ @Override
+ public Optional publishHeartbeat(C2Heartbeat heartbeat) {
+ return serializer.serialize(heartbeat).flatMap(this::sendHeartbeat);
+ }
+
+ private Optional sendHeartbeat(String heartbeat) {
+ Optional c2HeartbeatResponse = Optional.empty();
+ Request request = new Request.Builder()
+ .post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON))
+ .url(clientConfig.getC2Url())
+ .build();
+
+ try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
+ c2HeartbeatResponse = getResponseBody(heartbeatResponse).flatMap(response -> serializer.deserialize(response, C2HeartbeatResponse.class));
+ } catch (IOException ce) {
+ logger.error("Send Heartbeat failed [{}]", clientConfig.getC2Url(), ce);
+ }
+
+ return c2HeartbeatResponse;
+ }
+
+ private Optional getResponseBody(Response response) {
+ String responseBody = null;
+
+ try {
+ responseBody = response.body().string();
+ logger.debug("Received response body {}", responseBody);
+ } catch (IOException e) {
+ logger.error("HTTP Request failed", e);
+ }
+
+ return Optional.ofNullable(responseBody);
+ }
+
+ private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder) throws Exception {
+ final String keystoreLocation = clientConfig.getKeystoreFilename();
+ final String keystoreType = clientConfig.getKeystoreType();
+ final String keystorePass = clientConfig.getKeystorePass();
+
+ assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType);
+
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+ try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
+ keyStore.load(keyStoreStream, keystorePass.toCharArray());
+ }
+
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+ // load truststore
+ final String truststoreLocation = clientConfig.getTruststoreFilename();
+ final String truststorePass = clientConfig.getTruststorePass();
+ final String truststoreType = clientConfig.getTruststoreType();
+ assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType);
+
+ KeyStore truststore = KeyStore.getInstance(truststoreType);
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
+ truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
+ trustManagerFactory.init(truststore);
+
+ final X509TrustManager x509TrustManager;
+ TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+ if (trustManagers[0] != null) {
+ x509TrustManager = (X509TrustManager) trustManagers[0];
+ } else {
+ throw new IllegalStateException("List of trust managers is null");
+ }
+
+ SSLContext tempSslContext;
+ try {
+ tempSslContext = SSLContext.getInstance("TLS");
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException("SSLContext creation failed", e);
+ }
+
+ final SSLContext sslContext = tempSslContext;
+ sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+
+ final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ okHttpClientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
+ }
+
+ private void assertKeystorePropertiesSet(String location, String password, String type) {
+ if (location == null || location.isEmpty()) {
+ throw new IllegalArgumentException(clientConfig.getKeystoreFilename() + " is null or is empty");
+ }
+
+ if (password == null || password.isEmpty()) {
+ throw new IllegalArgumentException("The client's keystore filename is set but its password is not (or is empty). If the location is set, the password must also be.");
+ }
+
+ if (type == null || type.isEmpty()) {
+ throw new IllegalArgumentException("The client's keystore filename is set but its type is not (or is empty). If the location is set, the type must also be.");
+ }
+ }
+
+ private void assertTruststorePropertiesSet(String location, String password, String type) {
+ if (location == null || location.isEmpty()) {
+ throw new IllegalArgumentException("The client's truststore filename is not set or is empty");
+ }
+
+ if (password == null || password.isEmpty()) {
+ throw new IllegalArgumentException("The client's truststore filename is set but its password is not (or is empty). If the location is set, the password must also be.");
+ }
+
+ if (type == null || type.isEmpty()) {
+ throw new IllegalArgumentException("The client's truststore filename is set but its type is not (or is empty). If the location is set, the type must also be.");
+ }
+ }
+
+ @Override
+ public Optional retrieveUpdateContent(String flowUpdateUrl) {
+ final Request.Builder requestBuilder = new Request.Builder()
+ .get()
+ .url(flowUpdateUrl);
+ final Request request = requestBuilder.build();
+
+ ResponseBody body;
+ try (final Response response = httpClientReference.get().newCall(request).execute()) {
+ int code = response.code();
+ if (code >= 400) {
+ final String message = String.format("Configuration retrieval failed: HTTP %d %s", code, response.body().string());
+ throw new IOException(message);
+ }
+
+ body = response.body();
+
+ if (body == null) {
+ logger.warn("No body returned when pulling a new configuration");
+ return Optional.empty();
+ }
+
+ return Optional.of(body.bytes());
+ } catch (Exception e) {
+ logger.warn("Configuration retrieval failed", e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void acknowledgeOperation(C2OperationAck operationAck) {
+ logger.info("Performing acknowledgement request to {} for operation {}", clientConfig.getC2AckUrl(), operationAck.getOperationId());
+ serializer.serialize(operationAck)
+ .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
+ .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
+ .ifPresent(this::sendAck);
+ }
+
+ private void sendAck(Request request) {
+ try(Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
+ if (!heartbeatResponse.isSuccessful()) {
+ logger.warn("Acknowledgement was not successful with c2 server [{}] with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code());
+ }
+ } catch (IOException e) {
+ logger.error("Could not transmit ack to c2 server [{}]", clientConfig.getC2AckUrl(), e);
+ }
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/pom.xml b/c2/c2-client-bundle/c2-client-service/pom.xml
new file mode 100644
index 0000000000..b45f7d225b
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ 4.0.0
+
+
+ c2-client-bundle
+ org.apache.nifi
+ 1.17.0-SNAPSHOT
+
+
+ c2-client-service
+ jar
+
+
+
+ org.apache.nifi
+ c2-client-api
+ 1.17.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ c2-client-base
+ 1.17.0-SNAPSHOT
+
+
+ org.apache.nifi
+ c2-client-http
+ 1.17.0-SNAPSHOT
+
+
+
+
\ No newline at end of file
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
new file mode 100644
index 0000000000..a36b7bf3fa
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2ClientService {
+
+ private static final Logger logger = LoggerFactory.getLogger(C2ClientService.class);
+
+ private final C2Client client;
+ private final C2HeartbeatFactory c2HeartbeatFactory;
+ private final C2OperationService operationService;
+ private final UpdateConfigurationOperationHandler updateConfigurationOperationHandler;
+
+ public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, FlowIdHolder flowIdHolder, Function updateFlow) {
+ this.client = client;
+ this.c2HeartbeatFactory = c2HeartbeatFactory;
+ this.updateConfigurationOperationHandler = new UpdateConfigurationOperationHandler(client, flowIdHolder, updateFlow);
+ this.operationService = new C2OperationService(Arrays.asList(updateConfigurationOperationHandler));
+ }
+
+ public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
+ C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+ client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ }
+
+ private void processResponse(C2HeartbeatResponse response) {
+ List requestedOperations = response.getRequestedOperations();
+ if (requestedOperations != null && !requestedOperations.isEmpty()) {
+ logger.info("Received {} operations from the C2 server", requestedOperations.size());
+ handleRequestedOperations(requestedOperations);
+ } else {
+ logger.trace("No operations received from the C2 server in the server. Nothing to do.");
+ }
+ }
+
+ private void handleRequestedOperations(List requestedOperations) {
+ for (C2Operation requestedOperation : requestedOperations) {
+ operationService.handleOperation(requestedOperation)
+ .ifPresent(client::acknowledgeOperation);
+ }
+ }
+}
+
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
new file mode 100644
index 0000000000..3fb40813cb
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.nifi.c2.client.C2ClientConfig;
+import org.apache.nifi.c2.client.PersistentUuidGenerator;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.AgentInfo;
+import org.apache.nifi.c2.protocol.api.AgentRepositories;
+import org.apache.nifi.c2.protocol.api.AgentStatus;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.DeviceInfo;
+import org.apache.nifi.c2.protocol.api.FlowInfo;
+import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.api.NetworkInfo;
+import org.apache.nifi.c2.protocol.api.SystemInfo;
+import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2HeartbeatFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(C2HeartbeatFactory.class);
+
+ private static final String AGENT_IDENTIFIER_FILENAME = "agent-identifier";
+ private static final String DEVICE_IDENTIFIER_FILENAME = "device-identifier";
+
+ private final C2ClientConfig clientConfig;
+ private final FlowIdHolder flowIdHolder;
+
+ private String agentId;
+ private String deviceId;
+ private File confDirectory;
+
+ public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder) {
+ this.clientConfig = clientConfig;
+ this.flowIdHolder = flowIdHolder;
+ }
+
+ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {
+ C2Heartbeat heartbeat = new C2Heartbeat();
+
+ heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest()));
+ heartbeat.setDeviceInfo(generateDeviceInfo());
+ heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus()));
+ heartbeat.setCreated(System.currentTimeMillis());
+
+ return heartbeat;
+ }
+
+ private FlowInfo getFlowInfo(Map queueStatus) {
+ FlowInfo flowInfo = new FlowInfo();
+ flowInfo.setQueues(queueStatus);
+ Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
+ return flowInfo;
+ }
+
+ private AgentInfo getAgentInfo(AgentRepositories repos, RuntimeManifest manifest) {
+ AgentInfo agentInfo = new AgentInfo();
+ agentInfo.setAgentClass(clientConfig.getAgentClass());
+ agentInfo.setIdentifier(getAgentId());
+
+ AgentStatus agentStatus = new AgentStatus();
+ agentStatus.setUptime(ManagementFactory.getRuntimeMXBean().getUptime());
+ agentStatus.setRepositories(repos);
+
+ agentInfo.setStatus(agentStatus);
+ agentInfo.setAgentManifest(manifest);
+
+ return agentInfo;
+ }
+
+ private String getAgentId() {
+ if (agentId == null) {
+ String rawAgentId = clientConfig.getAgentIdentifier();
+ if (isNotBlank(rawAgentId)) {
+ agentId = rawAgentId.trim();
+ } else {
+ File idFile = new File(getConfDirectory(), AGENT_IDENTIFIER_FILENAME);
+ agentId = new PersistentUuidGenerator(idFile).generate();
+ }
+ }
+
+ return agentId;
+ }
+
+ private DeviceInfo generateDeviceInfo() {
+ // Populate DeviceInfo
+ final DeviceInfo deviceInfo = new DeviceInfo();
+ deviceInfo.setNetworkInfo(generateNetworkInfo());
+ deviceInfo.setIdentifier(getDeviceIdentifier(deviceInfo.getNetworkInfo()));
+ deviceInfo.setSystemInfo(generateSystemInfo());
+ return deviceInfo;
+ }
+
+ private NetworkInfo generateNetworkInfo() {
+ NetworkInfo networkInfo = new NetworkInfo();
+ try {
+ // Determine all interfaces
+ final Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces();
+
+ final Set operationIfaces = new HashSet<>();
+
+ // Determine eligible interfaces
+ while (networkInterfaces.hasMoreElements()) {
+ final NetworkInterface networkInterface = networkInterfaces.nextElement();
+ if (!networkInterface.isLoopback() && networkInterface.isUp()) {
+ operationIfaces.add(networkInterface);
+ }
+ }
+ logger.trace("Have {} interfaces with names {}", operationIfaces.size(),
+ operationIfaces.stream()
+ .map(NetworkInterface::getName)
+ .collect(Collectors.toSet())
+ );
+
+ if (!operationIfaces.isEmpty()) {
+ if (operationIfaces.size() > 1) {
+ logger.debug("Instance has multiple interfaces. Generated information may be non-deterministic.");
+ }
+
+ NetworkInterface iface = operationIfaces.iterator().next();
+ Enumeration inetAddresses = iface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ String hostAddress = inetAddress.getHostAddress();
+ String hostName = inetAddress.getHostName();
+ byte[] address = inetAddress.getAddress();
+ String canonicalHostName = inetAddress.getCanonicalHostName();
+
+ networkInfo.setDeviceId(iface.getName());
+ networkInfo.setHostname(hostName);
+ networkInfo.setIpAddress(hostAddress);
+ }
+ }
+ } catch (
+ Exception e) {
+ logger.error("Network Interface processing failed", e);
+ }
+ return networkInfo;
+ }
+
+ private String getDeviceIdentifier(NetworkInfo networkInfo) {
+ if (deviceId == null) {
+ if (networkInfo.getDeviceId() != null) {
+ try {
+ final NetworkInterface netInterface = NetworkInterface.getByName(networkInfo.getDeviceId());
+ byte[] hardwareAddress = netInterface.getHardwareAddress();
+ final StringBuilder macBuilder = new StringBuilder();
+ if (hardwareAddress != null) {
+ for (byte address : hardwareAddress) {
+ macBuilder.append(String.format("%02X", address));
+ }
+ }
+ deviceId = macBuilder.toString();
+ } catch (Exception e) {
+ logger.warn("Could not determine device identifier. Generating a unique ID", e);
+ deviceId = getConfiguredDeviceId();
+ }
+ } else {
+ deviceId = getConfiguredDeviceId();
+ }
+ }
+
+ return deviceId;
+ }
+
+ private String getConfiguredDeviceId() {
+ File idFile = new File(getConfDirectory(), DEVICE_IDENTIFIER_FILENAME);
+ return new PersistentUuidGenerator(idFile).generate();
+ }
+
+ private SystemInfo generateSystemInfo() {
+ SystemInfo systemInfo = new SystemInfo();
+ systemInfo.setPhysicalMem(Runtime.getRuntime().maxMemory());
+ systemInfo.setMemoryUsage(Runtime.getRuntime().maxMemory() - Runtime.getRuntime().freeMemory());
+ systemInfo.setvCores(Runtime.getRuntime().availableProcessors());
+
+ OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
+ systemInfo.setMachineArch(osMXBean.getArch());
+ systemInfo.setOperatingSystem(osMXBean.getName());
+ systemInfo.setCpuUtilization(osMXBean.getSystemLoadAverage() / (double) osMXBean.getAvailableProcessors());
+
+ return systemInfo;
+ }
+
+ private File getConfDirectory() {
+ if (confDirectory == null) {
+ String configDirectoryName = clientConfig.getConfDirectory();
+ File configDirectory = new File(configDirectoryName);
+ if (!configDirectory.exists() || !configDirectory.isDirectory()) {
+ throw new IllegalStateException("Specified conf directory " + configDirectoryName + " does not exist or is not a directory.");
+ }
+
+ confDirectory = configDirectory;
+ }
+
+ return confDirectory;
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/FlowIdHolder.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/FlowIdHolder.java
new file mode 100644
index 0000000000..30d194f720
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/FlowIdHolder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Collections.singletonList;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import org.apache.nifi.c2.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowIdHolder {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowIdHolder.class);
+ private static final String FLOW_IDENTIFIER_FILENAME = "flow-identifier";
+
+ private volatile String flowId;
+ private final String configDirectoryName;
+
+ public FlowIdHolder(String configDirectoryName) {
+ this.configDirectoryName = configDirectoryName;
+ this.flowId = readFlowId();
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public void setFlowId(String flowId) {
+ this.flowId = flowId;
+ persistFlowId(flowId);
+ }
+
+ private void persistFlowId(String flowId) {
+ File flowIdFile = new File(configDirectoryName, FLOW_IDENTIFIER_FILENAME);
+ try {
+ FileUtils.ensureDirectoryExistAndCanAccess(flowIdFile.getParentFile());
+ saveFlowId(flowIdFile, flowId);
+ } catch (IOException e) {
+ LOGGER.error("Persisting Flow [{}] failed", flowId, e);
+ }
+ }
+
+ private void saveFlowId(File flowUpdateInfoFile, String flowId) {
+ try {
+ Files.write(flowUpdateInfoFile.toPath(), singletonList(flowId));
+ } catch (IOException e) {
+ LOGGER.error("Writing Flow [{}] failed", flowId, e);
+ }
+ }
+
+ private String readFlowId() {
+ File flowUpdateInfoFile = new File(configDirectoryName, FLOW_IDENTIFIER_FILENAME);
+ String flowId = null;
+ if (flowUpdateInfoFile.exists()) {
+ try {
+ List fileLines = Files.readAllLines(flowUpdateInfoFile.toPath());
+ if (fileLines.size() != 1) {
+ throw new IllegalStateException(String.format("The file %s for the persisted flow id has the incorrect format.", flowUpdateInfoFile));
+ }
+ flowId = fileLines.get(0);
+ } catch (IOException e) {
+ throw new IllegalStateException(String.format("Could not read file %s for persisted flow id.", flowUpdateInfoFile), e);
+ }
+ }
+ return flowId;
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
new file mode 100644
index 0000000000..c017ac0d11
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.model;
+
+import java.util.Map;
+import org.apache.nifi.c2.protocol.api.AgentRepositories;
+import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
+
+public class RuntimeInfoWrapper {
+ final AgentRepositories repos;
+ final RuntimeManifest manifest;
+ final Map queueStatus;
+
+ public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map queueStatus) {
+ this.repos = repos;
+ this.manifest = manifest;
+ this.queueStatus = queueStatus;
+ }
+
+ public AgentRepositories getAgentRepositories() {
+ return repos;
+ }
+
+ public RuntimeManifest getManifest() {
+ return manifest;
+ }
+
+ public Map getQueueStatus() {
+ return queueStatus;
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
new file mode 100644
index 0000000000..e9f2db29fb
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+/**
+ * Handler interface for the different operation types
+ */
+public interface C2OperationHandler {
+
+ /**
+ * Returns the supported OperationType by the handler
+ *
+ * @return the type of the operation
+ */
+ OperationType getOperationType();
+
+ /**
+ * Returns the supported OperandType by the handler
+ *
+ * @return the type of the operand
+ */
+ OperandType getOperandType();
+
+ /**
+ * Handler logic for the specific C2Operation
+ *
+ * @param operation the C2Operation to be handled
+ * @return the result of the operation handling
+ */
+ C2OperationAck handle(C2Operation operation);
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java
new file mode 100644
index 0000000000..fadc8ad79b
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+public class C2OperationService {
+
+ private final Map> handlerMap = new HashMap<>();
+
+ public C2OperationService(List handlers) {
+ for (C2OperationHandler handler : handlers) {
+ handlerMap.computeIfAbsent(handler.getOperationType(), x -> new HashMap<>()).put(handler.getOperandType(), handler);
+ }
+ }
+
+ public Optional handleOperation(C2Operation operation) {
+ return getHandlerForOperation(operation)
+ .map(handler -> handler.handle(operation));
+ }
+
+ private Optional getHandlerForOperation(C2Operation operation) {
+ return Optional.ofNullable(handlerMap.get(operation.getOperation()))
+ .map(operandMap -> operandMap.get(operation.getOperand()));
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
new file mode 100644
index 0000000000..b58e07547e
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.FlowIdHolder;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationOperationHandler implements C2OperationHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationOperationHandler.class);
+
+ private static final String LOCATION = "location";
+
+ private final C2Client client;
+ private final Function updateFlow;
+ private final FlowIdHolder flowIdHolder;
+
+ public UpdateConfigurationOperationHandler(C2Client client, FlowIdHolder flowIdHolder, Function updateFlow) {
+ this.client = client;
+ this.updateFlow = updateFlow;
+ this.flowIdHolder = flowIdHolder;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return UPDATE;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return CONFIGURATION;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String opIdentifier = Optional.ofNullable(operation.getIdentifier())
+ .orElse(EMPTY);
+ C2OperationAck operationAck = new C2OperationAck();
+ C2OperationState state = new C2OperationState();
+ operationAck.setOperationState(state);
+ operationAck.setOperationId(opIdentifier);
+
+ String updateLocation = Optional.ofNullable(operation.getArgs())
+ .map(map -> map.get(LOCATION))
+ .orElse(EMPTY);
+
+ String newFlowId = parseFlowId(updateLocation);
+ if (flowIdHolder.getFlowId() == null || !flowIdHolder.getFlowId().equals(newFlowId)) {
+ logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}", updateLocation, opIdentifier,
+ flowIdHolder.getFlowId() == null ? "not set" : flowIdHolder.getFlowId(), newFlowId);
+ } else {
+ logger.info("Flow is current, no update is necessary...");
+ }
+
+ flowIdHolder.setFlowId(newFlowId);
+ Optional updateContent = client.retrieveUpdateContent(updateLocation);
+ if (updateContent.isPresent()) {
+ if (updateFlow.apply(updateContent.get())) {
+ state.setState(C2OperationState.OperationState.FULLY_APPLIED);
+ logger.debug("Update configuration applied for operation #{}.", opIdentifier);
+ } else {
+ state.setState(C2OperationState.OperationState.NOT_APPLIED);
+ logger.error("Update resulted in error for operation #{}.", opIdentifier);
+ }
+ } else {
+ state.setState(C2OperationState.OperationState.NOT_APPLIED);
+ logger.error("Update content retrieval resulted in empty content so flow update was omitted for operation #{}.", opIdentifier);
+ }
+
+ return operationAck;
+ }
+
+ private String parseFlowId(String flowUpdateUrl) {
+ try {
+ URI flowUri = new URI(flowUpdateUrl);
+ String flowUriPath = flowUri.getPath();
+ String[] split = flowUriPath.split("/");
+ if (split.length > 4) {
+ return split[4];
+ } else {
+ throw new IllegalArgumentException(String.format("Flow Update URL format unexpected [%s]", flowUpdateUrl));
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not get flow id from the provided URL", e);
+ }
+ }
+}
diff --git a/c2/c2-client-bundle/pom.xml b/c2/c2-client-bundle/pom.xml
new file mode 100644
index 0000000000..4781e5d4f4
--- /dev/null
+++ b/c2/c2-client-bundle/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ 4.0.0
+
+
+ c2
+ org.apache.nifi
+ 1.17.0-SNAPSHOT
+
+
+ c2-client-bundle
+ pom
+
+
+ c2-client-api
+ c2-client-base
+ c2-client-http
+ c2-client-service
+
+
diff --git a/c2/c2-protocol/c2-protocol-api/pom.xml b/c2/c2-protocol/c2-protocol-api/pom.xml
index 5fb3fb7ccd..b48d5c93c8 100644
--- a/c2/c2-protocol/c2-protocol-api/pom.xml
+++ b/c2/c2-protocol/c2-protocol-api/pom.xml
@@ -33,5 +33,9 @@ limitations under the License.
c2-protocol-component-api1.17.0-SNAPSHOT
+
+ org.apache.commons
+ commons-lang3
+
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index bac9403d76..389d3bed5a 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -33,5 +33,10 @@ public enum OperandType {
.filter(operandType -> operandType.name().equalsIgnoreCase(value))
.findAny();
}
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase();
+ }
}
diff --git a/c2/pom.xml b/c2/pom.xml
index 37feecdaa4..712d69e6db 100644
--- a/c2/pom.xml
+++ b/c2/pom.xml
@@ -29,6 +29,7 @@ limitations under the License.
c2-protocol
+ c2-client-bundle
diff --git a/minifi/minifi-assembly/pom.xml b/minifi/minifi-assembly/pom.xml
index 4e0f2965e2..6109eb8210 100644
--- a/minifi/minifi-assembly/pom.xml
+++ b/minifi/minifi-assembly/pom.xml
@@ -122,6 +122,10 @@ limitations under the License.
org.apache.nifi.minifiminifi-framework-api
+
+ org.apache.nifi
+ c2-client-api
+ org.apache.nifi.minifiminifi-framework-nar
@@ -210,6 +214,14 @@ limitations under the License.
commons-iocommons-io
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
diff --git a/minifi/minifi-bootstrap/pom.xml b/minifi/minifi-bootstrap/pom.xml
index 693d128072..315fa89373 100644
--- a/minifi/minifi-bootstrap/pom.xml
+++ b/minifi/minifi-bootstrap/pom.xml
@@ -40,6 +40,11 @@ limitations under the License.
nifi-apicompile
+
+ org.apache.nifi
+ c2-client-api
+ compile
+ org.apache.nifinifi-bootstrap-utils
@@ -95,6 +100,15 @@ limitations under the License.
commons-ioprovided
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
deleted file mode 100644
index 6e8ae91948..0000000000
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap;
-
-import org.apache.nifi.minifi.bootstrap.exception.InvalidCommandException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.Arrays;
-
-public class BootstrapCodec {
-
- private final RunMiNiFi runner;
- private final BufferedReader reader;
- private final BufferedWriter writer;
- private final Logger logger = LoggerFactory.getLogger(BootstrapCodec.class);
-
- public BootstrapCodec(final RunMiNiFi runner, final InputStream in, final OutputStream out) {
- this.runner = runner;
- this.reader = new BufferedReader(new InputStreamReader(in));
- this.writer = new BufferedWriter(new OutputStreamWriter(out));
- }
-
- public void communicate() throws IOException {
- final String line = reader.readLine();
- final String[] splits = line.split(" ");
- if (splits.length < 0) {
- throw new IOException("Received invalid command from MiNiFi: " + line);
- }
-
- final String cmd = splits[0];
- final String[] args;
- if (splits.length == 1) {
- args = new String[0];
- } else {
- args = Arrays.copyOfRange(splits, 1, splits.length);
- }
-
- try {
- processRequest(cmd, args);
- } catch (final InvalidCommandException ice) {
- throw new IOException("Received invalid command from MiNiFi: " + line + " : " + (ice.getMessage() == null ? "" : "Details: " + ice.toString()));
- }
- }
-
- private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
- switch (cmd) {
- case "PORT": {
- logger.debug("Received 'PORT' command from MINIFI");
- if (args.length != 2) {
- throw new InvalidCommandException();
- }
-
- final int port;
- try {
- port = Integer.parseInt(args[0]);
- } catch (final NumberFormatException nfe) {
- throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
- }
-
- if (port < 1 || port > 65535) {
- throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
- }
-
- final String secretKey = args[1];
-
- runner.setMiNiFiCommandControlPort(port, secretKey);
- writer.write("OK");
- writer.newLine();
- writer.flush();
- }
- break;
- case "STARTED": {
- logger.debug("Received 'STARTED' command from MINIFI");
- if (args.length != 1) {
- throw new InvalidCommandException("STARTED command must contain a status argument");
- }
-
- if (!"true".equals(args[0]) && !"false".equals(args[0])) {
- throw new InvalidCommandException("Invalid status for STARTED command; should be true or false, but was '" + args[0] + "'");
- }
-
- final boolean started = Boolean.parseBoolean(args[0]);
- runner.setNiFiStarted(started);
- writer.write("OK");
- writer.newLine();
- writer.flush();
- }
- break;
- case "SHUTDOWN": {
- logger.debug("Received 'SHUTDOWN' command from MINIFI");
- runner.shutdownChangeNotifier();
- runner.shutdownPeriodicStatusReporters();
- writer.write("OK");
- writer.newLine();
- writer.flush();
- }
- break;
- }
- }
-}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCommand.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCommand.java
new file mode 100644
index 0000000000..43bb5bd870
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCommand.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap;
+
+import java.util.Optional;
+
+public enum BootstrapCommand {
+ START, RUN, STOP, STATUS, DUMP, RESTART, ENV, FLOWSTATUS, UNKNOWN;
+
+ public static Optional fromString(String val) {
+ return Optional.ofNullable(val).map(String::toUpperCase).map(BootstrapCommand::valueOf).filter(command -> command != UNKNOWN);
+ }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiListener.java
deleted file mode 100644
index ea760bec7b..0000000000
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiListener.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap;
-
-import org.apache.nifi.minifi.bootstrap.util.LimitingInputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-public class MiNiFiListener {
-
- private ServerSocket serverSocket;
- private volatile Listener listener;
-
- int start(final RunMiNiFi runner) throws IOException {
- serverSocket = new ServerSocket();
- serverSocket.bind(new InetSocketAddress("localhost", 0));
-
- final int localPort = serverSocket.getLocalPort();
- listener = new Listener(serverSocket, runner);
- final Thread listenThread = new Thread(listener);
- listenThread.setName("Listen to MiNiFi");
- listenThread.setDaemon(true);
- listenThread.start();
- return localPort;
- }
-
- public void stop() throws IOException {
- final Listener listener = this.listener;
- if (listener == null) {
- return;
- }
-
- listener.stop();
- }
-
- private class Listener implements Runnable {
-
- private final ServerSocket serverSocket;
- private final ExecutorService executor;
- private final RunMiNiFi runner;
- private volatile boolean stopped = false;
-
- public Listener(final ServerSocket serverSocket, final RunMiNiFi runner) {
- this.serverSocket = serverSocket;
- this.executor = Executors.newFixedThreadPool(2, new ThreadFactory() {
- @Override
- public Thread newThread(final Runnable runnable) {
- final Thread t = Executors.defaultThreadFactory().newThread(runnable);
- t.setDaemon(true);
- t.setName("MiNiFi Bootstrap Command Listener");
- return t;
- }
- });
-
- this.runner = runner;
- }
-
- public void stop() throws IOException {
- stopped = true;
-
- executor.shutdown();
- try {
- executor.awaitTermination(3, TimeUnit.SECONDS);
- } catch (final InterruptedException ie) {
- }
-
- serverSocket.close();
- }
-
- @Override
- public void run() {
- while (!serverSocket.isClosed()) {
- try {
- if (stopped) {
- return;
- }
-
- final Socket socket;
- try {
- socket = serverSocket.accept();
- } catch (final IOException ioe) {
- if (stopped) {
- return;
- }
-
- throw ioe;
- }
-
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- // we want to ensure that we don't try to read data from an InputStream directly
- // by a BufferedReader because any user on the system could open a socket and send
- // a multi-gigabyte file without any new lines in order to crash the Bootstrap,
- // which in turn may cause the Shutdown Hook to shutdown MiNiFi.
- // So we will limit the amount of data to read to 4 KB
- final InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096);
- final BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream());
- codec.communicate();
- } catch (final Throwable t) {
- System.out.println("Failed to communicate with MiNiFi due to " + t);
- t.printStackTrace();
- } finally {
- try {
- socket.close();
- } catch (final IOException ioe) {
- }
- }
- }
- });
- } catch (final Throwable t) {
- System.err.println("Failed to receive information from MiNiFi due to " + t);
- t.printStackTrace();
- }
- }
- }
- }
-}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiParameters.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiParameters.java
new file mode 100644
index 0000000000..9fa7bf666a
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiParameters.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap;
+
+import java.util.Objects;
+
+public class MiNiFiParameters {
+
+ private volatile int ccPort;
+ private volatile long minifiPid;
+ private volatile String secretKey;
+
+ public MiNiFiParameters(int ccPort, long minifiPid, String secretKey) {
+ this.ccPort = ccPort;
+ this.minifiPid = minifiPid;
+ this.secretKey = secretKey;
+ }
+
+ public int getMiNiFiPort() {
+ return ccPort;
+ }
+
+ public void setMiNiFiPort(int ccPort) {
+ this.ccPort = ccPort;
+ }
+
+ public long getMinifiPid() {
+ return minifiPid;
+ }
+
+ public void setMinifiPid(long minifiPid) {
+ this.minifiPid = minifiPid;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MiNiFiParameters that = (MiNiFiParameters) o;
+ return ccPort == that.ccPort && minifiPid == that.minifiPid && Objects.equals(secretKey, that.secretKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ccPort, minifiPid, secretKey);
+ }
+
+ @Override
+ public String toString() {
+ return "MiNiFiParameters{" +
+ "ccPort=" + ccPort +
+ ", minifiPid=" + minifiPid +
+ '}';
+ }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiStatus.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiStatus.java
new file mode 100644
index 0000000000..29b372dca4
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiStatus.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap;
+
+public class MiNiFiStatus {
+
+ private final Integer port;
+ private final Long pid;
+ private final boolean respondingToPing;
+ private final boolean processRunning;
+
+ public MiNiFiStatus() {
+ this.port = null;
+ this.pid = null;
+ this.respondingToPing = false;
+ this.processRunning = false;
+ }
+
+ public MiNiFiStatus(Integer port, Long pid, boolean respondingToPing, boolean processRunning) {
+ this.port = port;
+ this.pid = pid;
+ this.respondingToPing = respondingToPing;
+ this.processRunning = processRunning;
+ }
+
+ public Long getPid() {
+ return pid;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public boolean isRespondingToPing() {
+ return respondingToPing;
+ }
+
+ public boolean isProcessRunning() {
+ return processRunning;
+ }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index e9931d9f9e..061a0a7e36 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -16,70 +16,35 @@
*/
package org.apache.nifi.minifi.bootstrap;
-import org.apache.commons.io.input.TeeInputStream;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.bootstrap.util.OSUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
-import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
-import org.apache.nifi.minifi.commons.status.FlowStatusReport;
-import org.apache.nifi.util.Tuple;
-import org.apache.nifi.util.file.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.util.Collections.singleton;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.ObjectInputStream;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.attribute.PosixFilePermission;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.nifi.minifi.commons.schema.common.BootstrapPropertyKeys.STATUS_REPORTER_COMPONENTS_KEY;
+import org.apache.nifi.c2.client.api.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.command.CommandRunnerFactory;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
+import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
+import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
+import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
+import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
+import org.apache.nifi.minifi.bootstrap.service.ReloadService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*