NIFI-1586 Implements autopurging of transaction log and snapshot files

When NiFi is clustered, and autopurge.purgeInterval is greater than 1, the DatadirCleanupManager will be started in order to automatically purge transaction log and snapshot files based on the autopurge settings in zookeeper.properties

This closes #1928.
This commit is contained in:
Jeff Storck 2017-06-19 12:18:53 -04:00 committed by Mark Payne
parent c99100c934
commit 668a64cd56

View File

@ -25,6 +25,7 @@ import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.server.DatadirCleanupManager;
import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZKDatabase;
@ -47,6 +48,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
private FileTxnSnapLog transactionLog; private FileTxnSnapLog transactionLog;
private ZooKeeperServer embeddedZkServer; private ZooKeeperServer embeddedZkServer;
private QuorumPeer quorumPeer; private QuorumPeer quorumPeer;
private DatadirCleanupManager datadirCleanupManager;
private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException {
quorumPeerConfig = new QuorumPeerConfig(); quorumPeerConfig = new QuorumPeerConfig();
@ -58,6 +60,13 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
return; return;
} }
if (quorumPeerConfig.isDistributed() && quorumPeerConfig.getPurgeInterval() > 0) {
datadirCleanupManager = new DatadirCleanupManager(quorumPeerConfig
.getDataDir(), quorumPeerConfig.getDataLogDir(), quorumPeerConfig
.getSnapRetainCount(), quorumPeerConfig.getPurgeInterval());
datadirCleanupManager.start();
}
if (quorumPeerConfig.isDistributed()) { if (quorumPeerConfig.isDistributed()) {
startDistributed(); startDistributed();
} else { } else {
@ -153,6 +162,10 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
if (embeddedZkServer != null && embeddedZkServer.isRunning()) { if (embeddedZkServer != null && embeddedZkServer.isRunning()) {
embeddedZkServer.shutdown(); embeddedZkServer.shutdown();
} }
if (datadirCleanupManager != null) {
datadirCleanupManager.shutdown();
}
} }
} }