diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java index 4c4b418e06..6cf4f3413a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java @@ -25,6 +25,7 @@ import java.io.InputStream; import java.util.Properties; import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.server.DatadirCleanupManager; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZKDatabase; @@ -47,6 +48,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { private FileTxnSnapLog transactionLog; private ZooKeeperServer embeddedZkServer; private QuorumPeer quorumPeer; + private DatadirCleanupManager datadirCleanupManager; private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { quorumPeerConfig = new QuorumPeerConfig(); @@ -58,6 +60,13 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { return; } + if (quorumPeerConfig.isDistributed() && quorumPeerConfig.getPurgeInterval() > 0) { + datadirCleanupManager = new DatadirCleanupManager(quorumPeerConfig + .getDataDir(), quorumPeerConfig.getDataLogDir(), quorumPeerConfig + .getSnapRetainCount(), quorumPeerConfig.getPurgeInterval()); + datadirCleanupManager.start(); + } + if (quorumPeerConfig.isDistributed()) { startDistributed(); } else { @@ -153,6 +162,10 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { if (embeddedZkServer != null && embeddedZkServer.isRunning()) { embeddedZkServer.shutdown(); } + + if (datadirCleanupManager != null) { + datadirCleanupManager.shutdown(); + } } }