mirror of https://github.com/apache/nifi.git
NIFI-13213 Added Validation for Swap File Names
This closes 8812 Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
parent
62fbd8e8b1
commit
1c2469bf33
|
@ -65,6 +65,7 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -79,6 +80,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
|
||||
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
|
||||
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
|
||||
private static final Pattern UUID_PATTERN = Pattern.compile("([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})");
|
||||
|
||||
public static final String EVENT_CATEGORY = "Swap FlowFiles";
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
|
||||
|
@ -133,11 +135,11 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
return null;
|
||||
}
|
||||
|
||||
final String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString();
|
||||
final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
|
||||
final String swapFileName = swapFileBaseName + ".swap";
|
||||
final String swapFileName = getSwapFileName(flowFileQueue.getIdentifier(), partitionName);
|
||||
final Path storageDirectoryPath = storageDirectory.toPath();
|
||||
final Path swapFilePath = storageDirectoryPath.resolve(swapFileName).toAbsolutePath();
|
||||
|
||||
final File swapFile = new File(storageDirectory, swapFileName);
|
||||
final File swapFile = swapFilePath.toFile();
|
||||
final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
|
||||
final String swapLocation = swapFile.getAbsolutePath();
|
||||
|
||||
|
@ -482,4 +484,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
logger.debug("Changed Partition for Swap File by renaming from {} to {}", swapLocation, newPartitionName);
|
||||
return newFile.getAbsolutePath();
|
||||
}
|
||||
|
||||
private String getSwapFileName(final String flowFileQueueIdentifier, final String partitionName) {
|
||||
final UUID identifier;
|
||||
final Matcher identifierMatcher = UUID_PATTERN.matcher(flowFileQueueIdentifier);
|
||||
if (identifierMatcher.find()) {
|
||||
identifier = UUID.fromString(identifierMatcher.group(1));
|
||||
} else {
|
||||
throw new IllegalArgumentException("FlowFile Queue Identifier [%s] not valid".formatted(flowFileQueueIdentifier));
|
||||
}
|
||||
|
||||
final String swapFilePrefix = System.currentTimeMillis() + "-" + identifier + "-" + UUID.randomUUID();
|
||||
final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
|
||||
return swapFileBaseName + ".swap";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.nifi.events.EventReporter;
|
|||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.DataInputStream;
|
||||
|
@ -41,24 +40,44 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestFileSystemSwapManager {
|
||||
|
||||
@Test
|
||||
public void testFlowFileQueueIdentifierNotValid() {
|
||||
final String identifier = "invalid-identifier";
|
||||
|
||||
final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
|
||||
when(flowFileQueue.getIdentifier()).thenReturn(identifier);
|
||||
final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
|
||||
final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo);
|
||||
final List<FlowFileRecord> flowFileRecords = Collections.singletonList(new MockFlowFileRecord(0));
|
||||
|
||||
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
|
||||
() -> swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1"));
|
||||
|
||||
assertTrue(exception.getMessage().contains(identifier));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackwardCompatible() throws IOException {
|
||||
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/old-swap-file.swap"));
|
||||
final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
|
||||
try (final InputStream fis = new FileInputStream("src/test/resources/old-swap-file.swap");
|
||||
final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
|
||||
|
||||
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
|
||||
final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
|
||||
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
|
||||
|
||||
final FileSystemSwapManager swapManager = createSwapManager();
|
||||
|
@ -76,11 +95,11 @@ public class TestFileSystemSwapManager {
|
|||
|
||||
@Test
|
||||
public void testFailureOnRepoSwapOut() throws IOException {
|
||||
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
|
||||
final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
|
||||
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
|
||||
|
||||
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
|
||||
Mockito.doThrow(new IOException("Intentional IOException for unit test"))
|
||||
final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
|
||||
doThrow(new IOException("Intentional IOException for unit test"))
|
||||
.when(flowFileRepo).swapFlowFilesOut(any(), any(), any());
|
||||
|
||||
final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo);
|
||||
|
@ -96,7 +115,7 @@ public class TestFileSystemSwapManager {
|
|||
|
||||
@Test
|
||||
public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
|
||||
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
|
||||
final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
|
||||
when(flowFileQueue.getIdentifier()).thenReturn("");
|
||||
|
||||
final File targetDir = new File("target/swap");
|
||||
|
@ -111,7 +130,7 @@ public class TestFileSystemSwapManager {
|
|||
|
||||
final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target"));
|
||||
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
|
||||
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
|
||||
final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
|
||||
|
||||
swapManager.initialize(new SwapManagerInitializationContext() {
|
||||
@Override
|
||||
|
@ -134,7 +153,7 @@ public class TestFileSystemSwapManager {
|
|||
final List<String> recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null);
|
||||
assertEquals(1, recoveredLocations.size());
|
||||
|
||||
final String firstLocation = recoveredLocations.get(0);
|
||||
final String firstLocation = recoveredLocations.getFirst();
|
||||
final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue);
|
||||
assertEquals(0, emptyContents.getFlowFiles().size());
|
||||
|
||||
|
@ -144,8 +163,8 @@ public class TestFileSystemSwapManager {
|
|||
assertEquals(10000, contents.getFlowFiles().size());
|
||||
}
|
||||
|
||||
private FileSystemSwapManager createSwapManager() throws IOException {
|
||||
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
|
||||
private FileSystemSwapManager createSwapManager() {
|
||||
final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
|
||||
return createSwapManager(flowFileRepo);
|
||||
}
|
||||
|
||||
|
@ -175,7 +194,7 @@ public class TestFileSystemSwapManager {
|
|||
return swapManager;
|
||||
}
|
||||
|
||||
public class NopResourceClaimManager implements ResourceClaimManager {
|
||||
public static class NopResourceClaimManager implements ResourceClaimManager {
|
||||
@Override
|
||||
public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue