NIFI-632 moving thread startup from constructor to initialize methods

This commit is contained in:
danbress 2015-06-13 12:22:38 -04:00
parent 53b86d7a6c
commit e1a59c47b7
2 changed files with 22 additions and 22 deletions

View File

@ -130,7 +130,6 @@ public class FileSystemRepository implements ContentRepository {
final String maxArchiveRetentionPeriod = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD); final String maxArchiveRetentionPeriod = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD);
final String maxArchiveSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE); final String maxArchiveSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE);
final String archiveBackPressureSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE); final String archiveBackPressureSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE);
final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
if ("true".equalsIgnoreCase(enableArchiving)) { if ("true".equalsIgnoreCase(enableArchiving)) {
archiveData = true; archiveData = true;
@ -193,24 +192,7 @@ public class FileSystemRepository implements ContentRepository {
LOG.info("Initializing FileSystemRepository with 'Always Sync' set to {}", alwaysSync); LOG.info("Initializing FileSystemRepository with 'Always Sync' set to {}", alwaysSync);
initializeRepository(); initializeRepository();
final long cleanupMillis;
if (archiveCleanupFrequency == null) {
cleanupMillis = 1000L;
} else {
try {
cleanupMillis = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
}
}
containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true); containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true);
for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) {
final String containerName = containerEntry.getKey();
final Path containerPath = containerEntry.getValue();
final Runnable cleanup = new DestroyExpiredArchiveClaims(containerName, containerPath);
containerCleanupExecutor.scheduleWithFixedDelay(cleanup, cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS);
}
} }
@Override @Override
@ -225,6 +207,24 @@ public class FileSystemRepository implements ContentRepository {
for (int i = 0; i < fileRespositoryPaths.size(); i++) { for (int i = 0; i < fileRespositoryPaths.size(); i++) {
executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS); executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
} }
final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
final long cleanupMillis;
if (archiveCleanupFrequency == null) {
cleanupMillis = 1000L;
} else {
try {
cleanupMillis = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
}
}
for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) {
final String containerName = containerEntry.getKey();
final Path containerPath = containerEntry.getValue();
final Runnable cleanup = new DestroyExpiredArchiveClaims(containerName, containerPath);
containerCleanupExecutor.scheduleWithFixedDelay(cleanup, cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS);
}
} }
@Override @Override

View File

@ -118,15 +118,15 @@ public class VolatileContentRepository implements ContentRepository {
} }
memoryManager = new MemoryManager(maxBytes, blockSize); memoryManager = new MemoryManager(maxBytes, blockSize);
for (int i = 0; i < 3; i++) {
executor.scheduleWithFixedDelay(new CleanupOldClaims(), 1000, 10, TimeUnit.MILLISECONDS);
}
} }
@Override @Override
public void initialize(final ContentClaimManager claimManager) { public void initialize(final ContentClaimManager claimManager) {
this.claimManager = claimManager; this.claimManager = claimManager;
for (int i = 0; i < 3; i++) {
executor.scheduleWithFixedDelay(new CleanupOldClaims(), 1000, 10, TimeUnit.MILLISECONDS);
}
} }
@Override @Override