mirror of
https://github.com/apache/nifi.git
synced 2025-03-03 16:09:19 +00:00
NIFI-11068 Introduced ConflictResolutionStrategy enum
This closes #6861 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
d913f8046a
commit
96a6594680
@ -63,6 +63,11 @@
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-conflict-resolution</artifactId>
|
||||
<version>1.20.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
|
@ -18,6 +18,9 @@
|
||||
package org.apache.nifi.processors.dropbox;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.FAIL;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.REPLACE;
|
||||
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
|
||||
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC;
|
||||
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
|
||||
@ -58,6 +61,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
@ -100,10 +104,6 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
|
||||
public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
|
||||
|
||||
public static final String IGNORE_RESOLUTION = "ignore";
|
||||
public static final String REPLACE_RESOLUTION = "replace";
|
||||
public static final String FAIL_RESOLUTION = "fail";
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("Files that have been successfully written to Dropbox are transferred to this relationship.")
|
||||
@ -140,8 +140,8 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
.displayName("Conflict Resolution Strategy")
|
||||
.description("Indicates what should happen when a file with the same name already exists in the specified Dropbox folder.")
|
||||
.required(true)
|
||||
.defaultValue(FAIL_RESOLUTION)
|
||||
.allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, REPLACE_RESOLUTION)
|
||||
.defaultValue(FAIL.getValue())
|
||||
.allowableValues(ConflictResolutionStrategy.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder()
|
||||
@ -219,8 +219,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
.asDataSize(DataUnit.B)
|
||||
.longValue();
|
||||
|
||||
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
|
||||
|
||||
final ConflictResolutionStrategy conflictResolution = ConflictResolutionStrategy.forValue(context.getProperty(CONFLICT_RESOLUTION).getValue());
|
||||
final long size = flowFile.getSize();
|
||||
final String uploadPath = convertFolderName(folder) + "/" + filename;
|
||||
final long startNanos = System.nanoTime();
|
||||
@ -261,7 +260,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadErrorException e) {
|
||||
private void handleUploadError(final ConflictResolutionStrategy conflictResolution, final String uploadPath, final UploadErrorException e) {
|
||||
if (e.errorValue.isPath() && e.errorValue.getPathValue().getReason().isConflict()) {
|
||||
handleConflict(conflictResolution, uploadPath, e);
|
||||
} else {
|
||||
@ -269,7 +268,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadSessionFinishErrorException e) {
|
||||
private void handleUploadError(final ConflictResolutionStrategy conflictResolution, final String uploadPath, final UploadSessionFinishErrorException e) {
|
||||
if (e.errorValue.isPath() && e.errorValue.getPathValue().isConflict()) {
|
||||
handleConflict(conflictResolution, uploadPath, e);
|
||||
} else {
|
||||
@ -277,16 +276,17 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleConflict(final String conflictResolution, final String uploadPath, final DbxApiException e) {
|
||||
if (IGNORE_RESOLUTION.equals(conflictResolution)) {
|
||||
private void handleConflict(final ConflictResolutionStrategy conflictResolution, final String uploadPath, final DbxApiException e) {
|
||||
if (conflictResolution == IGNORE) {
|
||||
getLogger().info("File with the same name [{}] already exists. Remote file is not modified due to {} being set to '{}'.",
|
||||
uploadPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
|
||||
} else if (conflictResolution.equals(FAIL_RESOLUTION)) {
|
||||
} else if (conflictResolution == FAIL) {
|
||||
throw new ProcessException(format("File with the same name [%s] already exists.", uploadPath), e);
|
||||
}
|
||||
}
|
||||
|
||||
private FileMetadata uploadLargeFileInChunks(String path, InputStream rawIn, long size, long uploadChunkSize, String conflictResolution) throws DbxException, IOException {
|
||||
private FileMetadata uploadLargeFileInChunks(String path, InputStream rawIn, long size, long uploadChunkSize,
|
||||
ConflictResolutionStrategy conflictResolution) throws DbxException, IOException {
|
||||
final String sessionId;
|
||||
try (UploadSessionStartUploader uploader = createUploadSessionStartUploader()) {
|
||||
sessionId = uploader.uploadAndFinish(rawIn, uploadChunkSize).getSessionId();
|
||||
@ -317,15 +317,15 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
}
|
||||
}
|
||||
|
||||
private WriteMode getWriteMode(String conflictResolution) {
|
||||
if (REPLACE_RESOLUTION.equals(conflictResolution)) {
|
||||
private WriteMode getWriteMode(ConflictResolutionStrategy conflictResolution) {
|
||||
if (conflictResolution == REPLACE) {
|
||||
return WriteMode.OVERWRITE;
|
||||
} else {
|
||||
return WriteMode.ADD;
|
||||
}
|
||||
}
|
||||
|
||||
private UploadUploader createUploadUploader(String path, String conflictResolution) throws DbxException {
|
||||
private UploadUploader createUploadUploader(String path, ConflictResolutionStrategy conflictResolution) throws DbxException {
|
||||
return dropboxApiClient
|
||||
.files()
|
||||
.uploadBuilder(path)
|
||||
|
@ -17,6 +17,10 @@
|
||||
|
||||
package org.apache.nifi.processors.dropbox;
|
||||
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.FAIL;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.REPLACE;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -80,7 +84,7 @@ public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
|
||||
@Test
|
||||
void testUploadExistingFileFailStrategy() {
|
||||
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.FAIL_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, FAIL.getValue());
|
||||
|
||||
testRunner.enqueue(CONTENT);
|
||||
testRunner.run();
|
||||
@ -98,7 +102,7 @@ public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
|
||||
@Test
|
||||
void testUploadExistingFileWithSameContentFailStrategy() {
|
||||
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.FAIL_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, FAIL.getValue());
|
||||
|
||||
testRunner.enqueue(CONTENT);
|
||||
testRunner.run();
|
||||
@ -117,7 +121,7 @@ public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
|
||||
@Test
|
||||
void testUploadExistingFileReplaceStrategy() {
|
||||
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.REPLACE_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, REPLACE.getValue());
|
||||
|
||||
testRunner.enqueue(CONTENT);
|
||||
testRunner.run();
|
||||
@ -135,7 +139,7 @@ public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
|
||||
@Test
|
||||
void testUploadExistingFileIgnoreStrategy() {
|
||||
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, IGNORE.getValue());
|
||||
|
||||
testRunner.enqueue(CONTENT);
|
||||
testRunner.run();
|
||||
|
@ -20,6 +20,8 @@ package org.apache.nifi.processors.dropbox;
|
||||
import static com.dropbox.core.v2.files.UploadError.path;
|
||||
import static com.dropbox.core.v2.files.WriteConflictError.FILE;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
|
||||
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.REPLACE;
|
||||
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
|
||||
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
@ -166,7 +168,7 @@ public class PutDropboxTest extends AbstractDropboxTest {
|
||||
@Test
|
||||
void testFileUploadWithReplaceConflictResolutionStrategy() throws Exception {
|
||||
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.REPLACE_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, REPLACE.getValue());
|
||||
|
||||
mockFileUpload(TEST_FOLDER, FILENAME_1, WriteMode.OVERWRITE);
|
||||
|
||||
@ -193,7 +195,7 @@ public class PutDropboxTest extends AbstractDropboxTest {
|
||||
@Test
|
||||
void testFileUploadOtherExceptionIsNotIgnored() throws Exception {
|
||||
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, IGNORE.getValue());
|
||||
|
||||
mockFileUploadError(getException(WriteError.INSUFFICIENT_SPACE));
|
||||
|
||||
@ -205,7 +207,7 @@ public class PutDropboxTest extends AbstractDropboxTest {
|
||||
@Test
|
||||
void testFileUploadConflictIgnoredWithIgnoreResolutionStrategy() throws Exception {
|
||||
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION);
|
||||
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, IGNORE.getValue());
|
||||
|
||||
mockFileUploadError(getException(WriteError.conflict(FILE)));
|
||||
|
||||
|
@ -0,0 +1,32 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-extension-utils</artifactId>
|
||||
<version>1.20.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-conflict-resolution</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.conflict.resolution;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
public enum ConflictResolutionStrategy implements DescribedValue {
|
||||
FAIL( "fail", "Handle file conflict as failure."),
|
||||
IGNORE("ignore", "Ignore conflict, do not change the original file."),
|
||||
REPLACE( "replace", "Replace existing file in case of conflict.");
|
||||
|
||||
private static final Map<String, ConflictResolutionStrategy> ENUM_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (ConflictResolutionStrategy strategy : ConflictResolutionStrategy.values()) {
|
||||
ENUM_MAP.put(strategy.getValue(), strategy);
|
||||
}
|
||||
}
|
||||
|
||||
private final String value;
|
||||
private final String description;
|
||||
|
||||
ConflictResolutionStrategy(final String value, String description) {
|
||||
this.value = value;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public static ConflictResolutionStrategy forValue(String value) {
|
||||
return ENUM_MAP.get(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDisplayName() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return this.description;
|
||||
}
|
||||
}
|
@ -43,5 +43,6 @@
|
||||
<module>nifi-reporting-utils</module>
|
||||
<module>nifi-service-utils</module>
|
||||
<module>nifi-syslog-utils</module>
|
||||
<module>nifi-conflict-resolution</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
Loading…
x
Reference in New Issue
Block a user