diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index 2754f9cc4f..e7d85cde5d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -21,6 +21,7 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -64,6 +65,7 @@ import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveUtils; import org.apache.nifi.util.hive.HiveWriter; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -75,8 +77,6 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.regex.Pattern; /** @@ -383,16 +382,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { private AtomicReference failureFlowFile; private final DataFileWriter successAvroWriter = new DataFileWriter<>(new GenericDatumWriter()); private final DataFileWriter failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter()); + private byte[] successAvroHeader; + private byte[] failureAvroHeader; private final AtomicInteger recordCount = new AtomicInteger(0); private final AtomicInteger successfulRecordCount = new AtomicInteger(0); private final AtomicInteger failedRecordCount = new AtomicInteger(0); - private volatile ExecutorService appendRecordThreadPool; - private volatile AtomicBoolean closed = new AtomicBoolean(false); - private final BlockingQueue> successRecordQueue = new ArrayBlockingQueue<>(100); - private final BlockingQueue> failureRecordQueue = new ArrayBlockingQueue<>(100); - private final ComponentLog logger; /** @@ -412,9 +408,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { this.failureFlowFile = new AtomicReference<>(failureFlowFile); } - private void initAvroWriter(ProcessSession session, String codec, DataFileStream reader, - DataFileWriter writer, AtomicReference flowFileRef, - BlockingQueue> queue, Function isCompleted) { + private byte[] initAvroWriter(ProcessSession session, String codec, DataFileStream reader, + DataFileWriter writer, AtomicReference flowFileRef) { writer.setCodec(CodecFactory.fromString(codec)); // Transfer metadata (this is a subset of the incoming file) @@ -424,71 +419,59 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } } - appendRecordThreadPool.submit(() -> { - flowFileRef.set(session.append(flowFileRef.get(), (out) -> { - // Create writer so that records can be appended. - writer.create(reader.getSchema(), out); + final ByteArrayOutputStream avroHeader = new ByteArrayOutputStream(); + flowFileRef.set(session.append(flowFileRef.get(), (out) -> { + // Create writer so that records can be appended later. + writer.create(reader.getSchema(), avroHeader); + writer.close(); - try { - int writtenCount = 0; - while (true) { + final byte[] header = avroHeader.toByteArray(); + out.write(header); + })); - if (closed.get() && isCompleted.apply(writtenCount)) { - break; - } - - final List hRecords = queue.poll(100, TimeUnit.MILLISECONDS); - if (hRecords != null) { - try { - for (HiveStreamingRecord hRecord : hRecords) { - writer.append(hRecord.getRecord()); - writtenCount++; - } - } catch (IOException ioe) { - // The records were put to Hive Streaming successfully, but there was an error while writing the - // Avro records to the flow file. Log as an error and move on. - logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe); - } - } - } - writer.flush(); - } catch (InterruptedException e) { - logger.warn("Append record thread is interrupted, " + e, e); - } - - })); - }); + // Capture the Avro header byte array that is just written to the FlowFile. + // This is needed when Avro records are appended to the same FlowFile. + return avroHeader.toByteArray(); } private void initAvroWriters(ProcessSession session, String codec, DataFileStream reader) { - appendRecordThreadPool = Executors.newFixedThreadPool(2); - initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get()); - initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get()); - - // No new task. - appendRecordThreadPool.shutdown(); + successAvroHeader = initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile); + failureAvroHeader = initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile); } - private void appendRecordsToSuccess(List records) { - appendRecordsToFlowFile(records, successRecordQueue); + private void appendAvroRecords(ProcessSession session, byte[] avroHeader, DataFileWriter writer, + AtomicReference flowFileRef, List hRecords) { + + flowFileRef.set(session.append(flowFileRef.get(), (out) -> { + if (hRecords != null) { + // Initialize the writer again as append mode, so that Avro header is written only once. + writer.appendTo(new SeekableByteArrayInput(avroHeader), out); + try { + for (HiveStreamingRecord hRecord : hRecords) { + writer.append(hRecord.getRecord()); + } + } catch (IOException ioe) { + // The records were put to Hive Streaming successfully, but there was an error while writing the + // Avro records to the flow file. Log as an error and move on. + logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe); + } + } + writer.close(); + })); + } + + private void appendRecordsToSuccess(ProcessSession session, List records) { + appendAvroRecords(session, successAvroHeader, successAvroWriter, successFlowFile, records); successfulRecordCount.addAndGet(records.size()); } - private void appendRecordsToFailure(List records) { - appendRecordsToFlowFile(records, failureRecordQueue); + private void appendRecordsToFailure(ProcessSession session, List records) { + appendAvroRecords(session, failureAvroHeader, failureAvroWriter, failureFlowFile, records); failedRecordCount.addAndGet(records.size()); } - private void appendRecordsToFlowFile(List records, BlockingQueue> queue) { - if (!queue.add(records)) { - throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size())); - } - } - private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) { - closeAvroWriters(); - if (successfulRecordCount.get() > 0) { // Transfer the flow file with successful records successFlowFile.set( @@ -513,19 +496,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { }); } - private void closeAvroWriters() { - closed.set(true); - if (appendRecordThreadPool != null) { - // Having null thread pool means the input FlowFile was not processed at all, due to illegal format. - try { - if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) { - logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout."); - } - } catch (InterruptedException e) { - logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted."); - } - } - } } private static class ShouldRetryException extends RuntimeException { @@ -545,7 +515,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { case Failure: // Add the failed record to the failure flow file getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e); - fc.appendRecordsToFailure(input); + fc.appendRecordsToFailure(session, input); break; case Retry: @@ -670,7 +640,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { Runnable flushSuccessfulRecords = () -> { // Now send the records to the successful FlowFile and update the success count - functionContext.appendRecordsToSuccess(successfulRecords.get()); + functionContext.appendRecordsToSuccess(session, successfulRecords.get()); // Clear the list of successful records, we'll use it at the end when we flush whatever records are left successfulRecords.set(new ArrayList<>()); };