NIFI-4709 - Fixed ListAzureBlobStorage timestamp precision handling.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2354.
This commit is contained in:
Koji Kawamura 2017-12-19 18:25:23 +09:00 committed by Pierre Villard
parent e8f6ff440a
commit 62e388aa4f
2 changed files with 29 additions and 1 deletions

View File

@ -134,6 +134,14 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
return Scope.CLUSTER; return Scope.CLUSTER;
} }
@Override
protected String getDefaultTimePrecision() {
// User does not have to choose one.
// AUTO_DETECT can handle most cases, but it may incur longer latency
// when all listed files do not have SECOND part in their timestamps although Azure Blob Storage does support seconds.
return PRECISION_SECONDS.getValue();
}
@Override @Override
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue(); String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();

View File

@ -41,6 +41,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -124,6 +125,10 @@ import java.util.stream.Collectors;
* changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared. * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* </li> * </li>
* <li>
* Provide the target system timestamp precision. By either letting user to choose the right one by adding TARGET_SYSTEM_TIMESTAMP_PRECISION to the return value of
* getSupportedPropertyDescriptors method or, overriding getDefaultTimePrecision method in case the target system has a fixed time precision.
* </li>
* </ul> * </ul>
*/ */
@TriggerSerially @TriggerSerially
@ -438,7 +443,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey(); latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
// Determine target system time precision. // Determine target system time precision.
final String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue(); String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
if (StringUtils.isBlank(specifiedPrecision)) {
// If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value.
specifiedPrecision = getDefaultTimePrecision();
}
final TimeUnit targetSystemTimePrecision final TimeUnit targetSystemTimePrecision
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
@ -544,6 +553,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
} }
/**
* This method is intended to be overridden by SubClasses those do not support TARGET_SYSTEM_TIMESTAMP_PRECISION property.
* So that it use return different precisions than PRECISION_AUTO_DETECT.
* If TARGET_SYSTEM_TIMESTAMP_PRECISION is supported as a valid Processor property,
* then PRECISION_AUTO_DETECT will be the default value when not specified by a user.
* @return
*/
protected String getDefaultTimePrecision() {
return TARGET_SYSTEM_TIMESTAMP_PRECISION.getDefaultValue();
}
private void resetTimeStates() { private void resetTimeStates() {
lastListedLatestEntryTimestampMillis = null; lastListedLatestEntryTimestampMillis = null;
lastProcessedLatestEntryTimestampMillis = 0L; lastProcessedLatestEntryTimestampMillis = 0L;