NIFI-44: Removed the swap directory property and always use the 'swap' directory under the flowfile repo

This commit is contained in:
Mark Payne 2014-12-30 16:30:13 -05:00
parent 43991777da
commit 394c7116d1
4 changed files with 5 additions and 14 deletions

View File

@ -235,7 +235,6 @@
<nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync> <nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync>
<nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation> <nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
<nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold> <nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
<nifi.swap.storage.directory>./flowfile_repository/swap</nifi.swap.storage.directory>
<nifi.swap.in.period>5 sec</nifi.swap.in.period> <nifi.swap.in.period>5 sec</nifi.swap.in.period>
<nifi.swap.in.threads>1</nifi.swap.in.threads> <nifi.swap.in.threads>1</nifi.swap.in.threads>
<nifi.swap.out.period>5 sec</nifi.swap.out.period> <nifi.swap.out.period>5 sec</nifi.swap.out.period>

View File

@ -84,7 +84,6 @@ public class NiFiProperties extends Properties {
public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval"; public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval";
public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation"; public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold"; public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
public static final String SWAP_STORAGE_LOCATION = "nifi.swap.storage.directory";
public static final String SWAP_IN_THREADS = "nifi.swap.in.threads"; public static final String SWAP_IN_THREADS = "nifi.swap.in.threads";
public static final String SWAP_IN_PERIOD = "nifi.swap.in.period"; public static final String SWAP_IN_PERIOD = "nifi.swap.in.period";
public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads"; public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads";
@ -314,15 +313,6 @@ public class NiFiProperties extends Properties {
} }
} }
public File getSwapStorageLocation() {
final String location = getProperty(SWAP_STORAGE_LOCATION);
if (location == null) {
return new File(DEFAULT_SWAP_STORAGE_LOCATION);
} else {
return new File(location);
}
}
public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) { public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
final String value = getProperty(propertyName); final String value = getProperty(propertyName);
if (value == null) { if (value == null) {

View File

@ -28,6 +28,7 @@ import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
@ -103,14 +104,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
public FileSystemSwapManager() { public FileSystemSwapManager() {
this.storageDirectory = NiFiProperties.getInstance().getSwapStorageLocation(); final NiFiProperties properties = NiFiProperties.getInstance();
final Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath()); throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath());
} }
swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping"); swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping");
final NiFiProperties properties = NiFiProperties.getInstance();
swapInMillis = FormatUtils.getTimeDuration(properties.getSwapInPeriod(), TimeUnit.MILLISECONDS); swapInMillis = FormatUtils.getTimeDuration(properties.getSwapInPeriod(), TimeUnit.MILLISECONDS);
swapOutMillis = FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), TimeUnit.MILLISECONDS); swapOutMillis = FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), TimeUnit.MILLISECONDS);
swapOutThreadCount = properties.getSwapOutThreads(); swapOutThreadCount = properties.getSwapOutThreads();

View File

@ -45,7 +45,6 @@ nifi.flowfile.repository.always.sync=${nifi.flowfile.repository.always.sync}
nifi.swap.manager.implementation=${nifi.swap.manager.implementation} nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
nifi.queue.swap.threshold=${nifi.queue.swap.threshold} nifi.queue.swap.threshold=${nifi.queue.swap.threshold}
nifi.swap.storage.directory=${nifi.swap.storage.directory}
nifi.swap.in.period=${nifi.swap.in.period} nifi.swap.in.period=${nifi.swap.in.period}
nifi.swap.in.threads=${nifi.swap.in.threads} nifi.swap.in.threads=${nifi.swap.in.threads}
nifi.swap.out.period=${nifi.swap.out.period} nifi.swap.out.period=${nifi.swap.out.period}