From 65dbbc9e76d618c538f8fc79e4c0343998ca719d Mon Sep 17 00:00:00 2001 From: Matthieu Bertin Date: Tue, 3 May 2022 21:16:46 +0200 Subject: [PATCH] NIFI-9966 Corrects the registry loading of large flowfiles from git This closes #6012 Signed-off-by: Chris Sampson --- .../provider/flow/git/GitFlowMetaData.java | 5 +++- .../git/TestGitFlowPersistenceProvider.java | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java index 8ed146dfd2..4bac3284b3 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.registry.provider.flow.git; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.LsRemoteCommand; @@ -24,6 +25,7 @@ import org.eclipse.jgit.api.Status; import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.api.errors.NoHeadException; import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectStream; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.lib.RepositoryCache; @@ -522,7 +524,8 @@ class GitFlowMetaData { byte[] getContent(String objectId) throws IOException { final ObjectId flowSnapshotObjectId = gitRepo.resolve(objectId); - return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes(); + final ObjectStream objStream = gitRepo.newObjectReader().open(flowSnapshotObjectId).openStream(); + return IOUtils.toByteArray(objStream); } } diff --git a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java index 45351abc14..a8b050a37e 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.registry.provider.flow.git; +import org.apache.commons.lang3.RandomUtils; import org.apache.nifi.registry.flow.FlowPersistenceException; import org.apache.nifi.registry.provider.ProviderConfigurationContext; import org.apache.nifi.registry.provider.ProviderCreationException; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -287,4 +289,31 @@ public class TestGitFlowPersistenceProvider { } }, true); } + + @Test + public void testLoadLargeFlow() throws GitAPIException, IOException { + final Map properties = new HashMap<>(); + final byte[] largeByteContent = RandomUtils.nextBytes(60_000_000); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/repo-with-large-flow"); + + assertProvider(properties, g -> {}, p -> { + // Create some Flows and keep the directory. + final StandardFlowSnapshotContext.Builder contextBuilder = new StandardFlowSnapshotContext.Builder() + .bucketId("bucket-id-A") + .bucketName("C'est/Bucket A/です。") + .flowId("flow-id-1") + .flowName("テスト_用/フロー#1\\[contains invalid chars]") + .author("unit-test-user") + .comments("Initial commit.") + .snapshotTimestamp(new Date().getTime()) + .version(1); + p.saveFlowContent(contextBuilder.build(), largeByteContent); + }, false); + + assertProvider(properties, g -> {}, p -> { + // Should be able to load flow from commit histories. + final byte[] fromRepo = p.getFlowContent("bucket-id-A", "flow-id-1", 1); + assertArrayEquals(largeByteContent, fromRepo); + }, true); + } }