NIFI-4341 - add provenance repository storage usage in UI

- addressing review comments
- This closes #2119
This commit is contained in:
Pierre Villard 2017-09-01 11:16:29 +02:00 committed by Matt Gilman
parent 897b8ab601
commit 5957f6ee95
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
18 changed files with 453 additions and 73 deletions

View File

@ -592,4 +592,22 @@ public class FileUtils {
return digest.digest(); return digest.digest();
} }
/**
* Returns the capacity for a given path
* @param path path
* @return total space
*/
public static long getContainerCapacity(final Path path) {
return path.toFile().getTotalSpace();
}
/**
* Returns the free capacity for a given path
* @param path path
* @return usable space
*/
public static long getContainerUsableSpace(final Path path) {
return path.toFile().getUsableSpace();
}
} }

View File

@ -27,6 +27,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;
public interface ProvenanceRepository extends ProvenanceEventRepository { public interface ProvenanceRepository extends ProvenanceEventRepository {
@ -181,4 +182,30 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
* {@link ProvenanceRepository#submitQuery(Query, NiFiUser)} method * {@link ProvenanceRepository#submitQuery(Query, NiFiUser)} method
*/ */
List<SearchableField> getSearchableAttributes(); List<SearchableField> getSearchableAttributes();
/**
* @return the names of all Containers that exist for this Provenance
* Repository
*/
Set<String> getContainerNames();
/**
* @param containerName name of container to check capacity on
* @return the maximum number of bytes that can be stored in the storage
* mechanism that backs the container with the given name
* @throws java.io.IOException if unable to check capacity
* @throws IllegalArgumentException if no container exists with the given
* name
*/
long getContainerCapacity(String containerName) throws IOException;
/**
* @param containerName to check space on
* @return the number of bytes available to be used used by the storage
* mechanism that backs the container with the given name
* @throws java.io.IOException if unable to check space
* @throws IllegalArgumentException if no container exists with the given
* name
*/
long getContainerUsableSpace(String containerName) throws IOException;
} }

View File

@ -19,7 +19,9 @@ package org.apache.nifi.provenance;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.Authorizer;
@ -151,4 +153,19 @@ public class MockProvenanceRepository implements ProvenanceRepository {
public ProvenanceEventRepository getProvenanceEventRepository() { public ProvenanceEventRepository getProvenanceEventRepository() {
return this; return this;
} }
@Override
public long getContainerCapacity(String containerName) throws IOException {
return 0;
}
@Override
public Set<String> getContainerNames() {
return new HashSet<String>();
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
return 0;
}
} }

View File

@ -62,6 +62,7 @@ public class SystemDiagnosticsSnapshotDTO implements Cloneable {
private StorageUsageDTO flowFileRepositoryStorageUsage; private StorageUsageDTO flowFileRepositoryStorageUsage;
private Set<StorageUsageDTO> contentRepositoryStorageUsage; private Set<StorageUsageDTO> contentRepositoryStorageUsage;
private Set<StorageUsageDTO> provenanceRepositoryStorageUsage;
private Set<GarbageCollectionDTO> garbageCollection; private Set<GarbageCollectionDTO> garbageCollection;
private Date statsLastRefreshed; private Date statsLastRefreshed;
@ -204,6 +205,15 @@ public class SystemDiagnosticsSnapshotDTO implements Cloneable {
this.contentRepositoryStorageUsage = contentRepositoryStorageUsage; this.contentRepositoryStorageUsage = contentRepositoryStorageUsage;
} }
@ApiModelProperty("The provenance repository storage usage.")
public Set<StorageUsageDTO> getProvenanceRepositoryStorageUsage() {
return provenanceRepositoryStorageUsage;
}
public void setProvenanceRepositoryStorageUsage(Set<StorageUsageDTO> provenanceRepositoryStorageUsage) {
this.provenanceRepositoryStorageUsage = provenanceRepositoryStorageUsage;
}
@ApiModelProperty("The flowfile repository storage usage.") @ApiModelProperty("The flowfile repository storage usage.")
public StorageUsageDTO getFlowFileRepositoryStorageUsage() { public StorageUsageDTO getFlowFileRepositoryStorageUsage() {
return flowFileRepositoryStorageUsage; return flowFileRepositoryStorageUsage;
@ -357,15 +367,27 @@ public class SystemDiagnosticsSnapshotDTO implements Cloneable {
final Set<StorageUsageDTO> contentRepoStorageUsage = new LinkedHashSet<>(); final Set<StorageUsageDTO> contentRepoStorageUsage = new LinkedHashSet<>();
other.setContentRepositoryStorageUsage(contentRepoStorageUsage); other.setContentRepositoryStorageUsage(contentRepoStorageUsage);
if (getContentRepositoryStorageUsage() != null) {
for (final StorageUsageDTO usage : getContentRepositoryStorageUsage()) { for (final StorageUsageDTO usage : getContentRepositoryStorageUsage()) {
contentRepoStorageUsage.add(usage.clone()); contentRepoStorageUsage.add(usage.clone());
} }
}
final Set<StorageUsageDTO> provenanceRepoStorageUsage = new LinkedHashSet<>();
other.setProvenanceRepositoryStorageUsage(provenanceRepoStorageUsage);
if (getProvenanceRepositoryStorageUsage() != null) {
for (final StorageUsageDTO usage : getProvenanceRepositoryStorageUsage()) {
provenanceRepoStorageUsage.add(usage.clone());
}
}
final Set<GarbageCollectionDTO> gcUsage = new LinkedHashSet<>(); final Set<GarbageCollectionDTO> gcUsage = new LinkedHashSet<>();
other.setGarbageCollection(gcUsage); other.setGarbageCollection(gcUsage);
if (getGarbageCollection() != null) {
for (final GarbageCollectionDTO gcDto : getGarbageCollection()) { for (final GarbageCollectionDTO gcDto : getGarbageCollection()) {
gcUsage.add(gcDto.clone()); gcUsage.add(gcDto.clone());
} }
}
other.setVersionInfo(getVersionInfo().clone()); other.setVersionInfo(getVersionInfo().clone());

View File

@ -608,6 +608,7 @@ public class StatusMerger {
target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes()); target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes());
merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage()); merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage());
merge(target.getProvenanceRepositoryStorageUsage(), toMerge.getProvenanceRepositoryStorageUsage());
merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage()); merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage());
mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection()); mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection());

View File

@ -1684,7 +1684,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public SystemDiagnostics getSystemDiagnostics() { public SystemDiagnostics getSystemDiagnostics() {
final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory(); final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory();
return factory.create(flowFileRepository, contentRepository); return factory.create(flowFileRepository, contentRepository, provenanceRepository);
} }
// //

View File

@ -404,12 +404,16 @@ public class FileSystemRepository implements ContentRepository {
@Override @Override
public long getContainerCapacity(final String containerName) throws IOException { public long getContainerCapacity(final String containerName) throws IOException {
final Path path = containers.get(containerName); final Path path = containers.get(containerName);
if (path == null) { if (path == null) {
throw new IllegalArgumentException("No container exists with name " + containerName); throw new IllegalArgumentException("No container exists with name " + containerName);
} }
long capacity = path.toFile().getTotalSpace();
long capacity = FileUtils.getContainerCapacity(path);
if(capacity==0) { if(capacity==0) {
throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. Nifi can not create a zero sized FileSystemRepository"); throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
+ "Nifi can not create a zero sized FileSystemRepository.");
} }
return capacity; return capacity;
@ -418,10 +422,12 @@ public class FileSystemRepository implements ContentRepository {
@Override @Override
public long getContainerUsableSpace(String containerName) throws IOException { public long getContainerUsableSpace(String containerName) throws IOException {
final Path path = containers.get(containerName); final Path path = containers.get(containerName);
if (path == null) { if (path == null) {
throw new IllegalArgumentException("No container exists with name " + containerName); throw new IllegalArgumentException("No container exists with name " + containerName);
} }
return path.toFile().getUsableSpace();
return FileUtils.getContainerUsableSpace(path);
} }
@Override @Override

View File

@ -43,6 +43,7 @@ public class SystemDiagnostics implements Cloneable {
private StorageUsage flowFileRepositoryStorageUsage; private StorageUsage flowFileRepositoryStorageUsage;
private Map<String, StorageUsage> contentRepositoryStorageUsage; private Map<String, StorageUsage> contentRepositoryStorageUsage;
private Map<String, StorageUsage> provenanceRepositoryStorageUsage;
private Map<String, GarbageCollection> garbageCollection; private Map<String, GarbageCollection> garbageCollection;
private long creationTimestamp; private long creationTimestamp;
@ -95,6 +96,10 @@ public class SystemDiagnostics implements Cloneable {
this.contentRepositoryStorageUsage = contentRepositoryStorageUsage; this.contentRepositoryStorageUsage = contentRepositoryStorageUsage;
} }
public void setProvenanceRepositoryStorageUsage(final Map<String, StorageUsage> provenanceRepositoryStorageUsage) {
this.provenanceRepositoryStorageUsage = provenanceRepositoryStorageUsage;
}
public long getTotalNonHeap() { public long getTotalNonHeap() {
return totalNonHeap; return totalNonHeap;
} }
@ -143,6 +148,10 @@ public class SystemDiagnostics implements Cloneable {
return contentRepositoryStorageUsage; return contentRepositoryStorageUsage;
} }
public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() {
return provenanceRepositoryStorageUsage;
}
public long getFreeNonHeap() { public long getFreeNonHeap() {
return totalNonHeap - usedNonHeap; return totalNonHeap - usedNonHeap;
} }
@ -206,6 +215,13 @@ public class SystemDiagnostics implements Cloneable {
clonedMap.put(entry.getKey(), entry.getValue().clone()); clonedMap.put(entry.getKey(), entry.getValue().clone());
} }
} }
if(provenanceRepositoryStorageUsage != null) {
final Map<String, StorageUsage> clonedMap = new LinkedHashMap<>();
clonedObj.setProvenanceRepositoryStorageUsage(clonedMap);
for (final Map.Entry<String, StorageUsage> entry : provenanceRepositoryStorageUsage.entrySet()) {
clonedMap.put(entry.getKey(), entry.getValue().clone());
}
}
if (garbageCollection != null) { if (garbageCollection != null) {
final Map<String, GarbageCollection> clonedMap = new LinkedHashMap<>(); final Map<String, GarbageCollection> clonedMap = new LinkedHashMap<>();
clonedObj.setGarbageCollection(clonedMap); clonedObj.setGarbageCollection(clonedMap);

View File

@ -32,7 +32,7 @@ import java.util.Set;
import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -44,7 +44,7 @@ public class SystemDiagnosticsFactory {
private final Logger logger = LoggerFactory.getLogger(SystemDiagnosticsFactory.class); private final Logger logger = LoggerFactory.getLogger(SystemDiagnosticsFactory.class);
public SystemDiagnostics create(final FlowFileRepository flowFileRepo, final ContentRepository contentRepo) { public SystemDiagnostics create(final FlowFileRepository flowFileRepo, final ContentRepository contentRepo, ProvenanceRepository provenanceRepository) {
final SystemDiagnostics systemDiagnostics = new SystemDiagnostics(); final SystemDiagnostics systemDiagnostics = new SystemDiagnostics();
final MemoryMXBean memory = ManagementFactory.getMemoryMXBean(); final MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
@ -119,6 +119,31 @@ public class SystemDiagnosticsFactory {
} }
systemDiagnostics.setContentRepositoryStorageUsage(fileRepositoryUsage); systemDiagnostics.setContentRepositoryStorageUsage(fileRepositoryUsage);
// get provenance repository disk usage
final Set<String> provContainerNames = provenanceRepository.getContainerNames();
final Map<String, StorageUsage> provRepositoryUsage = new LinkedHashMap<>(provContainerNames.size());
for (final String containerName : provContainerNames) {
long containerCapacity = -1L;
long containerFree = 0L;
try {
containerFree = provenanceRepository.getContainerUsableSpace(containerName);
containerCapacity = provenanceRepository.getContainerCapacity(containerName);
} catch (final IOException ioe) {
logger.warn("Unable to determine Provenance Repository usage for container {} due to {}", containerName, ioe.toString());
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
final StorageUsage storageUsage = new StorageUsage();
storageUsage.setIdentifier(containerName);
storageUsage.setFreeSpace(containerFree);
storageUsage.setTotalSpace(containerCapacity);
provRepositoryUsage.put(containerName, storageUsage);
}
systemDiagnostics.setProvenanceRepositoryStorageUsage(provRepositoryUsage);
// get the garbage collection statistics // get the garbage collection statistics
final Map<String, GarbageCollection> garbageCollection = new LinkedHashMap<>(garbageCollectors.size()); final Map<String, GarbageCollection> garbageCollection = new LinkedHashMap<>(garbageCollectors.size());
for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) { for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) {

View File

@ -2706,6 +2706,13 @@ public final class DtoFactory {
contentRepositoryStorageUsageDtos.add(createStorageUsageDTO(entry.getKey(), entry.getValue())); contentRepositoryStorageUsageDtos.add(createStorageUsageDTO(entry.getKey(), entry.getValue()));
} }
// provenance disk usage
final Set<SystemDiagnosticsSnapshotDTO.StorageUsageDTO> provenanceRepositoryStorageUsageDtos = new LinkedHashSet<>();
snapshot.setProvenanceRepositoryStorageUsage(provenanceRepositoryStorageUsageDtos);
for (final Map.Entry<String, StorageUsage> entry : sysDiagnostics.getProvenanceRepositoryStorageUsage().entrySet()) {
provenanceRepositoryStorageUsageDtos.add(createStorageUsageDTO(entry.getKey(), entry.getValue()));
}
// garbage collection // garbage collection
final Set<SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO> garbageCollectionDtos = new LinkedHashSet<>(); final Set<SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO> garbageCollectionDtos = new LinkedHashSet<>();
snapshot.setGarbageCollection(garbageCollectionDtos); snapshot.setGarbageCollection(garbageCollectionDtos);

View File

@ -45,6 +45,9 @@
<div id="cluster-content-tab-content" class="configuration-tab"> <div id="cluster-content-tab-content" class="configuration-tab">
<div id="cluster-content-table" class="cluster-tabbed-table"></div> <div id="cluster-content-table" class="cluster-tabbed-table"></div>
</div> </div>
<div id="cluster-provenance-tab-content" class="configuration-tab">
<div id="cluster-provenance-table" class="cluster-tabbed-table"></div>
</div>
<div id="cluster-version-tab-content" class="configuration-tab"> <div id="cluster-version-tab-content" class="configuration-tab">
<div id="cluster-version-table" class="cluster-tabbed-table"></div> <div id="cluster-version-table" class="cluster-tabbed-table"></div>
</div> </div>

View File

@ -74,7 +74,7 @@
<div class="setting"> <div class="setting">
<div class="setting-header">Non-heap <span id="utilization-non-heap"></span></div> <div class="setting-header">Non-heap <span id="utilization-non-heap"></span></div>
<div class="setting-field"> <div class="setting-field">
<table id="heap-table"> <table id="non-heap-table">
<tbody> <tbody>
<tr> <tr>
<td class="memory-header setting-name">Max:</td> <td class="memory-header setting-name">Max:</td>
@ -180,8 +180,13 @@
<div id="content-repository-storage-usage-container"></div> <div id="content-repository-storage-usage-container"></div>
</div> </div>
</div> </div>
<div class="setting">
<div class="setting-header">Provenance Repository Storage</div>
<div class="setting-field">
<div id="provenance-repository-storage-usage-container"></div>
</div>
</div>
</div> </div>
<div id="version-tab-content" class="configuration-tab"> <div id="version-tab-content" class="configuration-tab">
<div class="setting"> <div class="setting">
<div class="setting-header">NiFi</div> <div class="setting-header">NiFi</div>
@ -209,7 +214,6 @@
</dl> </dl>
</div> </div>
</div> </div>
</div> </div>
<div id="system-diagnostics-refresh-container"> <div id="system-diagnostics-refresh-container">
<button id="system-diagnostics-refresh-button" class="refresh-button pointer fa fa-refresh" title="Refresh"></button> <button id="system-diagnostics-refresh-button" class="refresh-button pointer fa fa-refresh" title="Refresh"></button>

View File

@ -152,15 +152,15 @@ input.search-nodes {
margin-top: 8px; margin-top: 8px;
} }
#heap-table { #heap-table, #non-heap-table {
border-collapse: collapse; border-collapse: collapse;
} }
#heap-table tr{ #heap-table tr, #non-heap-table tr {
height: 20px; height: 20px;
} }
#heap-table span { #heap-table span, #non-heap-table span {
font-weight: bold; font-weight: bold;
} }
@ -191,6 +191,17 @@ input.search-nodes {
margin-bottom: 10px; margin-bottom: 10px;
} }
#provenance-repository-storage-usage-container {
height: 179px;
overflow-y: scroll;
border: 1px solid #ccc;
padding: 4px;
}
#provenance-repository-storage-usage-container div.storage-usage {
margin-bottom: 10px;
}
.storage-usage-details { .storage-usage-details {
padding-top: 4px; padding-top: 4px;
} }

View File

@ -352,6 +352,89 @@
}] }]
}; };
var provenanceTab = {
name: 'Provenance Storage',
data: {
dataSet: 'systemDiagnostics',
update: updateProvenanceTableData
},
tabContentId: 'cluster-provenance-tab-content',
tableId: 'cluster-provenance-table',
tableColumnModel: [
{
id: 'node',
field: 'node',
name: 'Node Address',
sortable: true,
resizable: true,
formatter: nfCommon.genericValueFormatter
},
{
id: 'provenanceRepoId',
field: 'provenanceRepoId',
name: 'Provenance Repository',
sortable: true,
resizable: true,
formatter: nfCommon.genericValueFormatter
},
{
id: 'provenanceRepoTotal',
field: 'provenanceRepoTotal',
name: 'Total Space',
sortable: true,
resizable: true,
cssClass: 'cell-right',
headerCssClass: 'header-right',
formatter: nfCommon.genericValueFormatter
},
{
id: 'provenanceRepoUsed',
field: 'provenanceRepoUsed',
name: 'Used Space',
sortable: true,
resizable: true,
cssClass: 'cell-right',
headerCssClass: 'header-right',
formatter: nfCommon.genericValueFormatter
},
{
id: 'provenanceRepoFree',
field: 'provenanceRepoFree',
name: 'Free Space',
sortable: true,
resizable: true,
cssClass: 'cell-right',
headerCssClass: 'header-right',
formatter: nfCommon.genericValueFormatter
},
{
id: 'provenanceRepoUtil',
field: 'provenanceRepoUtil',
name: 'Utilization',
sortable: true,
resizable: true,
cssClass: 'cell-right',
headerCssClass: 'header-right',
formatter: nfCommon.genericValueFormatter
}
],
tableIdColumn: 'id',
tableOptions: commonTableOptions,
tableOnClick: null,
createTableOnEnter: null,
cleanUpTable: null,
init: commonTableInit,
onSort: sort,
onTabSelected: onSelectTab,
filterOptions: [{
text: 'by address',
value: 'node'
}, {
text: 'by repository',
value: 'contentRepoId'
}]
};
var versionTab = { var versionTab = {
name: 'Versions', name: 'Versions',
data: { data: {
@ -432,7 +515,7 @@
}] }]
}; };
var clusterTabs = [nodesTab, systemTab, jvmTab, flowFileTab, contentTab, versionTab]; var clusterTabs = [nodesTab, systemTab, jvmTab, flowFileTab, contentTab, provenanceTab, versionTab];
var tabsByName = {}; var tabsByName = {};
var dataSetHandlers = {}; var dataSetHandlers = {};
@ -1200,6 +1283,39 @@
} }
} }
/**
* Applies system diagnostics data to the Provenance Storage tab.
*/
function updateProvenanceTableData(systemDiagnosticsResponse) {
if (nfCommon.isDefinedAndNotNull(systemDiagnosticsResponse.systemDiagnostics)
&& nfCommon.isDefinedAndNotNull(systemDiagnosticsResponse.systemDiagnostics.nodeSnapshots)) {
var provenanceStorageTableRows = [];
systemDiagnosticsResponse.systemDiagnostics.nodeSnapshots.forEach(function (nodeSnapshot) {
var snapshot = nodeSnapshot.snapshot;
snapshot.provenanceRepositoryStorageUsage.forEach(function (provenanceRepoUsage) {
provenanceStorageTableRows.push({
id: nodeSnapshot.nodeId + ':' + provenanceRepoUsage.identifier,
address: nodeSnapshot.address,
node: nodeSnapshot.address + ':' + nodeSnapshot.apiPort,
provenanceRepoId: provenanceRepoUsage.identifier,
provenanceRepoTotal: provenanceRepoUsage.totalSpace,
provenanceRepoUsed: provenanceRepoUsage.usedSpace,
provenanceRepoFree: provenanceRepoUsage.freeSpace,
provenanceRepoUtil: provenanceRepoUsage.utilization
});
});
});
provenanceTab.rowCount = provenanceStorageTableRows.length;
provenanceTab.dataView.setItems(provenanceStorageTableRows);
provenanceTab.dataView.reSort();
provenanceTab.grid.invalidate();
} else {
provenanceTab.rowCount = 0;
}
}
/** /**
* Applies system diagnostics data to the Versions tab. * Applies system diagnostics data to the Versions tab.
*/ */

View File

@ -2415,6 +2415,19 @@
}); });
} }
// provenance repo storage usage
var provenanceRepositoryUsageContainer = $('#provenance-repository-storage-usage-container').empty();
if (nfCommon.isDefinedAndNotNull(aggregateSnapshot.provenanceRepositoryStorageUsage)) {
// sort the provenance repos
var sortedProvenanceRepositoryStorageUsage = aggregateSnapshot.provenanceRepositoryStorageUsage.sort(function (a, b) {
return a.identifier === b.identifier ? 0 : a.identifier > b.identifier ? 1 : -1;
});
// add each to the UI
$.each(sortedProvenanceRepositoryStorageUsage, function (_, provenanceRepository) {
addStorageUsage(provenanceRepositoryUsageContainer, provenanceRepository);
});
}
// Version // Version
var versionSpanSelectorToFieldMap = { var versionSpanSelectorToFieldMap = {
'#version-nifi': aggregateSnapshot.versionInfo.niFiVersion, '#version-nifi': aggregateSnapshot.versionInfo.niFiVersion,

View File

@ -68,6 +68,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.timebuffer.CountSizeEntityAccess; import org.apache.nifi.util.timebuffer.CountSizeEntityAccess;
@ -521,6 +522,40 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return configuration; return configuration;
} }
@Override
public Set<String> getContainerNames() {
return new HashSet<>(configuration.getStorageDirectories().keySet());
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
Map<String, File> map = configuration.getStorageDirectories();
File container = map.get(containerName);
if(container != null) {
long capacity = FileUtils.getContainerCapacity(container.toPath());
if(capacity==0) {
throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
+ "Nifi can not create a zero sized provenance repository.");
}
return capacity;
} else {
throw new IllegalArgumentException("There is no defined container with name " + containerName);
}
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
Map<String, File> map = configuration.getStorageDirectories();
File container = map.get(containerName);
if(container != null) {
return FileUtils.getContainerUsableSpace(container.toPath());
} else {
throw new IllegalArgumentException("There is no defined container with name " + containerName);
}
}
private void recover() throws IOException { private void recover() throws IOException {
long maxId = -1L; long maxId = -1L;
long maxIndexedId = -1L; long maxIndexedId = -1L;

View File

@ -17,10 +17,6 @@
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.Authorizable;
@ -50,9 +46,18 @@ import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.provenance.util.CloseableUtil; import org.apache.nifi.provenance.util.CloseableUtil;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* <p> * <p>
@ -293,4 +298,38 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
RepositoryConfiguration getConfig() { RepositoryConfiguration getConfig() {
return this.config; return this.config;
} }
@Override
public Set<String> getContainerNames() {
return new HashSet<>(config.getStorageDirectories().keySet());
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
Map<String, File> map = config.getStorageDirectories();
File container = map.get(containerName);
if(container != null) {
long capacity = FileUtils.getContainerCapacity(container.toPath());
if(capacity==0) {
throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
+ "Nifi can not create a zero sized provenance repository.");
}
return capacity;
} else {
throw new IllegalArgumentException("There is no defined container with name " + containerName);
}
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
Map<String, File> map = config.getStorageDirectories();
File container = map.get(containerName);
if(container != null) {
return FileUtils.getContainerUsableSpace(container.toPath());
} else {
throw new IllegalArgumentException("There is no defined container with name " + containerName);
}
}
} }

View File

@ -50,6 +50,7 @@ import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -70,7 +71,10 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
// default property values // default property values
public static final int DEFAULT_BUFFER_SIZE = 10000; public static final int DEFAULT_BUFFER_SIZE = 10000;
public static String CONTAINER_NAME = "in-memory";
private final RingBuffer<ProvenanceEventRecord> ringBuffer; private final RingBuffer<ProvenanceEventRecord> ringBuffer;
private final int maxSize;
private final List<SearchableField> searchableFields; private final List<SearchableField> searchableFields;
private final List<SearchableField> searchableAttributes; private final List<SearchableField> searchableAttributes;
private final ExecutorService queryExecService; private final ExecutorService queryExecService;
@ -95,12 +99,13 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
scheduledExecService = null; scheduledExecService = null;
authorizer = null; authorizer = null;
resourceFactory = null; resourceFactory = null;
maxSize = DEFAULT_BUFFER_SIZE;
} }
public VolatileProvenanceRepository(final NiFiProperties nifiProperties) { public VolatileProvenanceRepository(final NiFiProperties nifiProperties) {
final int bufferSize = nifiProperties.getIntegerProperty(BUFFER_SIZE, DEFAULT_BUFFER_SIZE); maxSize = nifiProperties.getIntegerProperty(BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
ringBuffer = new RingBuffer<>(bufferSize); ringBuffer = new RingBuffer<>(maxSize);
final String indexedFieldString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); final String indexedFieldString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
final String indexedAttrString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); final String indexedAttrString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
@ -593,6 +598,21 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
} }
} }
@Override
public long getContainerCapacity(final String containerName) throws IOException {
return maxSize;
}
@Override
public Set<String> getContainerNames() {
return Collections.singleton(CONTAINER_NAME);
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
return maxSize - ringBuffer.getSize();
}
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) { private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) {
final String userId = user.getIdentity(); final String userId = user.getIdentity();
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1, userId); final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1, userId);