NIFI-12160 Kafka Connect: Check for NAR unpacking before starting

Check that required NAR files are unpacked completely before starting the Kafka Connector

This closes #7832

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Gyori 2023-10-03 14:42:29 +02:00 committed by exceptionfactory
parent 0a47157640
commit b2e3898e17
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 329 additions and 3 deletions

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.connect;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.io.CleanupMode.ALWAYS;
public class WorkingDirectoryUtilsTest {
@Test
public void testDeleteNonexistentFile(@TempDir(cleanup = ALWAYS) File tempDir) {
File nonexistentFile = new File(tempDir, "testFile");
WorkingDirectoryUtils.purgeDirectory(nonexistentFile);
assertFalse(nonexistentFile.exists());
}
@Test
public void testDeleteFlatFile(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File file = new File(tempDir, "testFile");
file.createNewFile();
WorkingDirectoryUtils.purgeDirectory(file);
assertFalse(file.exists());
}
@Test
public void testDeleteDirectoryWithContents(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File directory = new File(tempDir, "directory");
File subDirectory = new File(directory, "subDirectory");
File subDirectoryContent = new File(subDirectory, "subDirectoryContent");
File directoryContent = new File(directory, "directoryContent");
directory.mkdir();
subDirectory.mkdir();
subDirectoryContent.createNewFile();
directoryContent.createNewFile();
WorkingDirectoryUtils.purgeDirectory(directory);
assertFalse(directory.exists());
}
@Test
public void testPurgeUnpackedNarsEmptyRootDirectory(@TempDir(cleanup = ALWAYS) File tempDir) {
File rootDirectory = new File(tempDir, "rootDirectory");
rootDirectory.mkdir();
WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);
assertTrue(rootDirectory.exists());
}
@Test
public void testPurgeUnpackedNarsRootDirectoryWithFilesOnly(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File rootDirectory = new File(tempDir, "rootDirectory");
File directoryContent1 = new File(rootDirectory, "file1");
File directoryContent2 = new File(rootDirectory, "file2");
rootDirectory.mkdir();
directoryContent1.createNewFile();
directoryContent2.createNewFile();
WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);
assertTrue(rootDirectory.exists() && directoryContent1.exists() && directoryContent2.exists());
}
@Test
public void testPurgeUnpackedNars(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File rootDirectory = new File(tempDir, "rootDirectory");
rootDirectory.mkdir();
TestDirectoryStructure testDirectoryStructure = new TestDirectoryStructure(rootDirectory);
WorkingDirectoryUtils.purgeIncompleteUnpackedNars(testDirectoryStructure.getRootDirectory());
assertTrue(testDirectoryStructure.isConsistent());
}
@Test
public void testWorkingDirectoryIntegrityRestored(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
/*
workingDirectory
- nar
- extensions
- *TestDirectoryStructure*
- narDirectory
- narFile
- extensions
- *TestDirectoryStructure*
- additionalDirectory
- workingDirectoryFile
*/
File workingDirectory = new File(tempDir, "workingDirectory");
File nar = new File(workingDirectory, "nar");
File narExtensions = new File(nar, "extensions");
File narDirectory = new File(nar, "narDirectory");
File narFile = new File(nar, "narFile");
File extensions = new File(workingDirectory, "extensions");
File additionalDirectory = new File(workingDirectory, "additionalDirectory");
File workingDirectoryFile = new File(workingDirectory, "workingDirectoryFile");
workingDirectory.mkdir();
nar.mkdir();
narExtensions.mkdir();
narDirectory.mkdir();
narFile.createNewFile();
extensions.mkdir();
additionalDirectory.mkdir();
workingDirectoryFile.createNewFile();
TestDirectoryStructure narExtensionsStructure = new TestDirectoryStructure(narExtensions);
TestDirectoryStructure extensionsStructure = new TestDirectoryStructure(extensions);
WorkingDirectoryUtils.reconcileWorkingDirectory(workingDirectory);
assertTrue(workingDirectory.exists()
&& nar.exists()
&& narExtensionsStructure.isConsistent()
&& narDirectory.exists()
&& narFile.exists()
&& extensionsStructure.isConsistent()
&& additionalDirectory.exists()
&& workingDirectoryFile.exists()
);
}
private class TestDirectoryStructure {
/*
rootDirectory
- subDirectory1-nar-unpacked
- subDirectory1File1
- nar-digest
- subDirectory2
- subDirectory2File1
- subDirectory3-nar-unpacked
- subDirectory3Dir1
- subDirectory3Dir1File1
- subDirectory3File1
- fileInRoot
*/
File rootDirectory;
File subDirectory1;
File subDirectory2;
File subDirectory3;
File fileInRoot;
File subDirectory1File1;
File subDirectory1File2;
File subDirectory2File1;
File subDirectory3Dir1;
File subDirectory3File1;
File subDirectory3Dir1File1;
public TestDirectoryStructure(final File rootDirectory) throws IOException {
this.rootDirectory = rootDirectory;
subDirectory1 = new File(rootDirectory, "subDirectory1-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
subDirectory2 = new File(rootDirectory, "subDirector2");
subDirectory3 = new File(rootDirectory, "subDirector3-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
fileInRoot = new File(rootDirectory, "fileInRoot");
subDirectory1File1 = new File(subDirectory1, "subDirectory1File1");
subDirectory1File2 = new File(subDirectory1, WorkingDirectoryUtils.HASH_FILENAME);
subDirectory2File1 = new File(subDirectory2, "subDirectory2File1");
subDirectory3Dir1 = new File(subDirectory3, "subDirectory3Dir1");
subDirectory3File1 = new File(subDirectory3, "subDirectory3File1");
subDirectory3Dir1File1 = new File(subDirectory3Dir1, "subDirectory3Dir1File1");
subDirectory1.mkdir();
subDirectory2.mkdir();
subDirectory3.mkdir();
fileInRoot.createNewFile();
subDirectory1File1.createNewFile();
subDirectory1File2.createNewFile();
subDirectory2File1.createNewFile();
subDirectory3File1.createNewFile();
subDirectory3Dir1.mkdir();
subDirectory3Dir1File1.createNewFile();
}
public File getRootDirectory() {
return rootDirectory;
}
/**
* Checks if all directories ending in 'nar-unpacked' that have a file named 'nar-digest' within still exist,
* and the directory ending in 'nar-unpacked' without 'nar-digest' has been removed with all of its contents.
* @return true if the above is met.
*/
public boolean isConsistent() {
return (rootDirectory.exists()
&& subDirectory1.exists() && subDirectory1File1.exists() && subDirectory1File2.exists()
&& subDirectory2.exists() && subDirectory2File1.exists()
&& !(subDirectory3.exists() || subDirectory3Dir1.exists() || subDirectory3File1.exists() || subDirectory3Dir1File1.exists())
&& fileInRoot.exists());
}
}
}

View File

@ -84,6 +84,7 @@ public class StatelessKafkaConnectorUtil {
config.setFlowDefinition(dataflowDefinitionProperties);
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_FLOW_NAME, dataflowName);
MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
StatelessDataflow dataflow;
// Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
// We do this because the bootstrap() method will expand all NAR files into the working directory.
@ -91,13 +92,16 @@ public class StatelessKafkaConnectorUtil {
// unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
unpackNarLock.lock();
try {
WorkingDirectoryUtils.reconcileWorkingDirectory(engineConfiguration.getWorkingDirectory());
bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
dataflow = bootstrap.createDataflow(dataflowDefinition);
} finally {
unpackNarLock.unlock();
}
dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
return bootstrap.createDataflow(dataflowDefinition);
return dataflow;
} catch (final Exception e) {
throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.connect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
public class WorkingDirectoryUtils {
protected static final String NAR_UNPACKED_SUFFIX = "nar-unpacked";
protected static final String HASH_FILENAME = "nar-digest";
private static final Logger logger = LoggerFactory.getLogger(WorkingDirectoryUtils.class);
/**
* Goes through the nar/extensions and extensions directories within the working directory
* and deletes every directory whose name ends in "nar-unpacked" and does not have a
* "nar-digest" file in it.
* @param workingDirectory File object pointing to the working directory.
*/
public static void reconcileWorkingDirectory(final File workingDirectory) {
purgeIncompleteUnpackedNars(new File(new File(workingDirectory, "nar"), "extensions"));
purgeIncompleteUnpackedNars(new File(workingDirectory, "extensions"));
}
/**
* Receives a directory as parameter and goes through every directory within it that ends in
* "nar-unpacked". If a directory ending in "nar-unpacked" does not have a file named
* "nar-digest" within it, it gets deleted with all of its contents.
* @param directory A File object pointing to the directory that is supposed to contain
* further directories whose name ends in "nar-unpacked".
*/
public static void purgeIncompleteUnpackedNars(final File directory) {
final File[] unpackedDirs = directory.listFiles(file -> file.isDirectory() && file.getName().endsWith(NAR_UNPACKED_SUFFIX));
if (unpackedDirs == null || unpackedDirs.length == 0) {
logger.debug("Found no unpacked NARs in {}", directory);
if (logger.isDebugEnabled()) {
logger.debug("Directory contains: {}", Arrays.deepToString(directory.listFiles()));
}
return;
}
for (final File unpackedDir : unpackedDirs) {
final File narHashFile = new File(unpackedDir, HASH_FILENAME);
if (narHashFile.exists()) {
logger.debug("Already successfully unpacked {}", unpackedDir);
} else {
purgeDirectory(unpackedDir);
}
}
}
/**
* Delete a directory with all of its contents.
* @param directory The directory to be deleted.
*/
public static void purgeDirectory(final File directory) {
if (directory.exists()) {
deleteRecursively(directory);
logger.debug("Cleaned up {}", directory);
}
}
private static void deleteRecursively(final File fileOrDirectory) {
if (fileOrDirectory.isDirectory()) {
final File[] files = fileOrDirectory.listFiles();
if (files != null) {
for (final File file : files) {
deleteRecursively(file);
}
}
}
deleteQuietly(fileOrDirectory);
}
private static void deleteQuietly(final File file) {
final boolean deleted = file.delete();
if (!deleted) {
logger.debug("Failed to cleanup temporary file {}", file);
}
}
}