mirror of https://github.com/apache/nifi.git
NIFI-7412: Fixed provenance event types in Azure Fetch/Delete processors
This closes #4245. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
aa986e0bfb
commit
2a44f0ff2f
|
@ -95,7 +95,7 @@ public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor {
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
|
session.getProvenanceReporter().invokeRemoteProcess(flowFile, blob.getSnapshotQualifiedUri().toString(), "Blob deleted");
|
||||||
} catch ( StorageException | URISyntaxException e) {
|
} catch ( StorageException | URISyntaxException e) {
|
||||||
getLogger().error("Failed to delete the specified blob {} from Azure Storage. Routing to failure", new Object[]{blobPath}, e);
|
getLogger().error("Failed to delete the specified blob {} from Azure Storage. Routing to failure", new Object[]{blobPath}, e);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
|
session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Failed to delete the specified file from Azure Data Lake Storage, due to {}", e);
|
getLogger().error("Failed to delete the specified file from Azure Data Lake Storage, due to {}", e);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
|
session.getProvenanceReporter().fetch(flowFile, fileClient.getFileUrl(), transferMillis);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
|
getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
|
|
Loading…
Reference in New Issue