NIFI-9966 Corrects the registry loading of large flowfiles from git

This closes #6012

Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>
This commit is contained in:
Matthieu Bertin 2022-05-03 21:16:46 +02:00 committed by Chris Sampson
parent 8f3445d48f
commit 65dbbc9e76
2 changed files with 33 additions and 1 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.registry.provider.flow.git;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.LsRemoteCommand;
@ -24,6 +25,7 @@ import org.eclipse.jgit.api.Status;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.api.errors.NoHeadException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryCache;
@ -522,7 +524,8 @@ class GitFlowMetaData {
byte[] getContent(String objectId) throws IOException {
final ObjectId flowSnapshotObjectId = gitRepo.resolve(objectId);
return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes();
final ObjectStream objStream = gitRepo.newObjectReader().open(flowSnapshotObjectId).openStream();
return IOUtils.toByteArray(objStream);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.registry.provider.flow.git;
import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.registry.flow.FlowPersistenceException;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.apache.nifi.registry.provider.ProviderCreationException;
@ -39,6 +40,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@ -287,4 +289,31 @@ public class TestGitFlowPersistenceProvider {
}
}, true);
}
@Test
public void testLoadLargeFlow() throws GitAPIException, IOException {
final Map<String, String> properties = new HashMap<>();
final byte[] largeByteContent = RandomUtils.nextBytes(60_000_000);
properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/repo-with-large-flow");
assertProvider(properties, g -> {}, p -> {
// Create some Flows and keep the directory.
final StandardFlowSnapshotContext.Builder contextBuilder = new StandardFlowSnapshotContext.Builder()
.bucketId("bucket-id-A")
.bucketName("C'est/Bucket A/です。")
.flowId("flow-id-1")
.flowName("テスト_用/フロー#1\\[contains invalid chars]")
.author("unit-test-user")
.comments("Initial commit.")
.snapshotTimestamp(new Date().getTime())
.version(1);
p.saveFlowContent(contextBuilder.build(), largeByteContent);
}, false);
assertProvider(properties, g -> {}, p -> {
// Should be able to load flow from commit histories.
final byte[] fromRepo = p.getFlowContent("bucket-id-A", "flow-id-1", 1);
assertArrayEquals(largeByteContent, fromRepo);
}, true);
}
}