diff --git a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java index d201b17b68..2a93817e87 100644 --- a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java +++ b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java @@ -321,8 +321,8 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { for (final FlowFile flowFile : flowFileBatch) { final List> futures = new ArrayList<>(); - final List successes = new ArrayList<>(); - final List failures = new ArrayList<>(); + final List successes = Collections.synchronizedList(new ArrayList<>()); + final List failures = Collections.synchronizedList(new ArrayList<>()); if (flowFile.getSize() > maxMessageSize) { final String message = String.format("FlowFile size %d exceeds MAX_MESSAGE_SIZE", flowFile.getSize()); @@ -368,8 +368,8 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { for (final FlowFile flowFile : flowFileBatch) { final List> futures = new ArrayList<>(); - final List successes = new ArrayList<>(); - final List failures = new ArrayList<>(); + final List successes = Collections.synchronizedList(new ArrayList<>()); + final List failures = Collections.synchronizedList(new ArrayList<>()); final Map attributes = flowFile.getAttributes(); try (final RecordReader reader = readerFactory.createRecordReader(