NIFI-2636 resolve thread safety problem in UnpackContent

Signed-off-by: Joe Skora <jskora@apache.org>
This commit is contained in:
Mike Moser 2016-08-23 16:41:13 -04:00 committed by Joe Skora
parent b34de74db2
commit 5a3d00c7bb
2 changed files with 62 additions and 36 deletions

View File

@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private Unpacker unpacker;
private boolean addFragmentAttrs;
private Pattern fileFilter;
private Unpacker tarUnpacker;
@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor {
@OnStopped
public void onStopped() {
unpacker = null;
fileFilter = null;
}
@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor {
}
}
public void initUnpacker(PackageFormat packagingFormat) {
switch (packagingFormat) {
case TAR_FORMAT:
case X_TAR_FORMAT:
unpacker = tarUnpacker;
addFragmentAttrs = true;
break;
case ZIP_FORMAT:
unpacker = zipUnpacker;
addFragmentAttrs = true;
break;
case FLOWFILE_STREAM_FORMAT_V2:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false;
break;
case FLOWFILE_STREAM_FORMAT_V3:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false;
break;
case FLOWFILE_TAR_FORMAT:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false;
break;
case AUTO_DETECT_FORMAT:
// The format of the unpacker should be known before initialization
throw new ProcessException(packagingFormat + " is not a valid packaging format");
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor {
logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
session.transfer(flowFile, REL_SUCCESS);
return;
} else {
initUnpacker(packagingFormat);
}
} else {
initUnpacker(packagingFormat);
}
// set the Unpacker to use for this FlowFile. FlowFileUnpackager objects maintain state and are not reusable.
final Unpacker unpacker;
final boolean addFragmentAttrs;
switch (packagingFormat) {
case TAR_FORMAT:
case X_TAR_FORMAT:
unpacker = tarUnpacker;
addFragmentAttrs = true;
break;
case ZIP_FORMAT:
unpacker = zipUnpacker;
addFragmentAttrs = true;
break;
case FLOWFILE_STREAM_FORMAT_V2:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false;
break;
case FLOWFILE_STREAM_FORMAT_V3:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false;
break;
case FLOWFILE_TAR_FORMAT:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false;
break;
case AUTO_DETECT_FORMAT:
default:
// The format of the unpacker should be known before initialization
throw new ProcessException(packagingFormat + " is not a valid packaging format");
}
final List<FlowFile> unpacked = new ArrayList<>();

View File

@ -341,4 +341,35 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 1);
}
/*
* This test checks for thread safety problems when PackageFormat.AUTO_DETECT_FORMAT is used.
* It won't always fail if there is a issue with the code, but it will fail often enough to eventually be noticed.
* If this test fails at all, then it needs to be investigated.
*/
@Test
public void testThreadSafetyUsingAutoDetect() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
Map<String, String> attrsTar = new HashMap<>(1);
Map<String, String> attrsFFv3 = new HashMap<>(1);
attrsTar.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
attrsFFv3.put("mime.type", UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.getMimeType());
int numThreads = 50;
runner.setThreadCount(numThreads);
for (int i=0; i<numThreads; i++) {
if (i%2 == 0) {
runner.enqueue(dataPath.resolve("data.tar"), attrsTar);
} else {
runner.enqueue(dataPath.resolve("data.flowfilev3"), attrsFFv3);
}
}
runner.run(numThreads);
runner.assertTransferCount(UnpackContent.REL_SUCCESS, numThreads*2);
}
}