NIFI-2925: When swapping in FlowFiles, do not assume that its Resource Claim is 'in use' but instead look up the canonical representation of the resource claim

This closes #1150.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-10-19 16:48:55 -04:00 committed by Bryan Bende
parent 7fd2c42b19
commit e1f0ba5a43
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
12 changed files with 93 additions and 38 deletions

View File

@ -32,9 +32,21 @@ public interface ResourceClaimManager {
* @param container of claim
* @param section of claim
* @param lossTolerant of claim
* @param writable whether or not the claim should be made writable
* @return new claim
*/
ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable);
/**
* Returns the Resource Claim with the given id, container, and section, if one exists, <code>null</code> otherwise
*
* @param id of claim
* @param container of claim
* @param section of claim
* @return the existing resource claim or <code>null</code> if none exists
*/
ResourceClaim getResourceClaim(String container, String section, String id);
/**
* @param claim to obtain reference count for

View File

@ -504,7 +504,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
lossTolerant = false;
}
resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
resourceClaim = claimManager.getResourceClaim(container, section, claimId);
if (resourceClaim == null) {
logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
+ "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
+ "ability to properly clean up this resource", container, section, claimId);
resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
}
final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
claim.setLength(resourceLength);

View File

@ -3553,7 +3553,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return null;
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false);
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false, false);
return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
}
@ -3579,7 +3579,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
provEvent.getPreviousContentClaimIdentifier(), false);
provEvent.getPreviousContentClaimIdentifier(), false, false);
claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
size = provEvent.getPreviousFileSize();
@ -3589,7 +3589,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
provEvent.getContentClaimIdentifier(), false);
provEvent.getContentClaimIdentifier(), false, false);
claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
@ -3682,7 +3682,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
if (!contentRepository.isAccessible(contentClaim)) {
@ -3763,7 +3763,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Create the ContentClaim
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
// Increment Claimant Count, since we will now be referencing the Content Claim
resourceClaimManager.incrementClaimantCount(resourceClaim);

View File

@ -439,7 +439,7 @@ public class FileSystemRepository implements ContentRepository {
final String id = idPath.toFile().getName();
final String sectionName = sectionPath.toFile().getName();
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false);
if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
removeIncompleteContent(fileToRemove);
}
@ -537,7 +537,7 @@ public class FileSystemRepository implements ContentRepository {
final String section = String.valueOf(modulatedSectionIndex);
final String claimId = System.currentTimeMillis() + "-" + currentIndex;
resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true);
resourceOffset = 0L;
LOG.debug("Creating new Resource Claim {}", resourceClaim);
@ -949,6 +949,7 @@ public class FileSystemRepository implements ContentRepository {
LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this);
} else {
writableClaimStreams.remove(scc.getResourceClaim());
resourceClaimManager.freeze(scc.getResourceClaim());
bcos.close();

View File

@ -217,7 +217,7 @@ public class VolatileContentRepository implements ContentRepository {
private ContentClaim createLossTolerant() {
final long id = idGenerator.getAndIncrement();
final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true, false);
final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
claimManager.incrementClaimantCount(resourceClaim, true);

View File

@ -826,7 +826,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lossTolerant = false;
}
final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
contentClaim.setLength(resourceLength);

View File

@ -29,14 +29,25 @@ import org.slf4j.LoggerFactory;
public class StandardResourceClaimManager implements ResourceClaimManager {
private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
@Override
public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
return new StandardResourceClaim(this, container, section, id, lossTolerant);
public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) {
final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant);
if (!writable) {
claim.freeze();
}
return claim;
}
@Override
public ResourceClaim getResourceClaim(final String container, final String section, final String id) {
final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false);
final ClaimCount count = claimantCounts.get(tempClaim);
return (count == null) ? null : count.getClaim();
}
private static AtomicInteger getCounter(final ResourceClaim claim) {
@ -44,14 +55,14 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
return null;
}
AtomicInteger counter = claimantCounts.get(claim);
ClaimCount counter = claimantCounts.get(claim);
if (counter != null) {
return counter;
return counter.getCount();
}
counter = new AtomicInteger(0);
final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
return existingCounter == null ? counter : existingCounter;
counter = new ClaimCount(claim, new AtomicInteger(0));
final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter);
return existingCounter == null ? counter.getCount() : existingCounter.getCount();
}
@Override
@ -61,8 +72,8 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
}
synchronized (claim) {
final AtomicInteger counter = claimantCounts.get(claim);
return counter == null ? 0 : counter.get();
final ClaimCount counter = claimantCounts.get(claim);
return counter == null ? 0 : counter.getCount().get();
}
}
@ -73,13 +84,13 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
}
synchronized (claim) {
final AtomicInteger counter = claimantCounts.get(claim);
final ClaimCount counter = claimantCounts.get(claim);
if (counter == null) {
logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
return -1;
}
final int newClaimantCount = counter.decrementAndGet();
final int newClaimantCount = counter.getCount().decrementAndGet();
if (newClaimantCount < 0) {
logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount);
} else {
@ -178,4 +189,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
((StandardResourceClaim) claim).freeze();
}
private static final class ClaimCount {
private final ResourceClaim claim;
private final AtomicInteger count;
public ClaimCount(final ResourceClaim claim, final AtomicInteger count) {
this.claim = claim;
this.count = count;
}
public AtomicInteger getCount() {
return count;
}
public ResourceClaim getClaim() {
return claim;
}
}
}

View File

@ -115,7 +115,12 @@ public class TestFileSystemSwapManager {
public class NopResourceClaimManager implements ResourceClaimManager {
@Override
public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) {
public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
return null;
}
@Override
public ResourceClaim getResourceClaim(String container, String section, String id) {
return null;
}

View File

@ -817,7 +817,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.size(1L)
.build();
flowFileQueue.put(flowFileRecord);
@ -964,7 +964,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.size(1L)
.build();
flowFileQueue.put(flowFileRecord);
@ -988,7 +988,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.build();
flowFileQueue.put(flowFileRecord);
@ -1004,7 +1004,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(1000L)
.size(1000L)
.build();
@ -1029,7 +1029,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.build();
flowFileQueue.put(flowFileRecord);
@ -1046,7 +1046,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
@ -1113,7 +1113,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord);
@ -1150,7 +1150,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord);
@ -1477,7 +1477,7 @@ public class TestStandardProcessSession {
final Set<ContentClaim> claims = new HashSet<>();
for (long i = 0; i < idGenerator.get(); i++) {
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false);
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
if (getClaimantCount(contentClaim) > 0) {
claims.add(contentClaim);
@ -1489,7 +1489,7 @@ public class TestStandardProcessSession {
@Override
public ContentClaim create(boolean lossTolerant) throws IOException {
final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
claimantCounts.put(contentClaim, new AtomicInteger(1));

View File

@ -83,7 +83,7 @@ public class TestVolatileContentRepository {
final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
contentRepo.setBackupRepository(mockRepo);
final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true);
final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);

View File

@ -87,10 +87,10 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false);
final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false);
final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
// Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,

View File

@ -52,7 +52,7 @@ public class TestStandardResourceClaimManager {
}
};
final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false);
final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false, false);
assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1.
assertEquals(1, manager.getClaimantCount(resourceClaim));