NIFI-3818: PutHiveStreaming throws IllegalStateException

Changed from async append to sync as it breaks 'recursionSet' check in StandardProcessSession by updating it from multiple threads, resulting IllegalStateException to happen.

This closes #1761.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2017-05-05 18:33:38 +09:00 committed by Bryan Bende
parent 85405dae15
commit af6f63691c
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 46 additions and 76 deletions

View File

@ -21,6 +21,7 @@ import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@ -64,6 +65,7 @@ import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -75,8 +77,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Pattern;
/**
@ -383,16 +382,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
private AtomicReference<FlowFile> failureFlowFile;
private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
private byte[] successAvroHeader;
private byte[] failureAvroHeader;
private final AtomicInteger recordCount = new AtomicInteger(0);
private final AtomicInteger successfulRecordCount = new AtomicInteger(0);
private final AtomicInteger failedRecordCount = new AtomicInteger(0);
private volatile ExecutorService appendRecordThreadPool;
private volatile AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<List<HiveStreamingRecord>> successRecordQueue = new ArrayBlockingQueue<>(100);
private final BlockingQueue<List<HiveStreamingRecord>> failureRecordQueue = new ArrayBlockingQueue<>(100);
private final ComponentLog logger;
/**
@ -412,9 +408,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
this.failureFlowFile = new AtomicReference<>(failureFlowFile);
}
private void initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef,
BlockingQueue<List<HiveStreamingRecord>> queue, Function<Integer, Boolean> isCompleted) {
private byte[] initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef) {
writer.setCodec(CodecFactory.fromString(codec));
// Transfer metadata (this is a subset of the incoming file)
@ -424,25 +419,36 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
}
}
appendRecordThreadPool.submit(() -> {
final ByteArrayOutputStream avroHeader = new ByteArrayOutputStream();
flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
// Create writer so that records can be appended.
writer.create(reader.getSchema(), out);
// Create writer so that records can be appended later.
writer.create(reader.getSchema(), avroHeader);
writer.close();
try {
int writtenCount = 0;
while (true) {
final byte[] header = avroHeader.toByteArray();
out.write(header);
}));
if (closed.get() && isCompleted.apply(writtenCount)) {
break;
// Capture the Avro header byte array that is just written to the FlowFile.
// This is needed when Avro records are appended to the same FlowFile.
return avroHeader.toByteArray();
}
final List<HiveStreamingRecord> hRecords = queue.poll(100, TimeUnit.MILLISECONDS);
private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) {
successAvroHeader = initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile);
failureAvroHeader = initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile);
}
private void appendAvroRecords(ProcessSession session, byte[] avroHeader, DataFileWriter<GenericRecord> writer,
AtomicReference<FlowFile> flowFileRef, List<HiveStreamingRecord> hRecords) {
flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
if (hRecords != null) {
// Initialize the writer again as append mode, so that Avro header is written only once.
writer.appendTo(new SeekableByteArrayInput(avroHeader), out);
try {
for (HiveStreamingRecord hRecord : hRecords) {
writer.append(hRecord.getRecord());
writtenCount++;
}
} catch (IOException ioe) {
// The records were put to Hive Streaming successfully, but there was an error while writing the
@ -450,45 +456,22 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe);
}
}
}
writer.flush();
} catch (InterruptedException e) {
logger.warn("Append record thread is interrupted, " + e, e);
}
writer.close();
}));
});
}
private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) {
appendRecordThreadPool = Executors.newFixedThreadPool(2);
initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get());
initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get());
// No new task.
appendRecordThreadPool.shutdown();
}
private void appendRecordsToSuccess(List<HiveStreamingRecord> records) {
appendRecordsToFlowFile(records, successRecordQueue);
private void appendRecordsToSuccess(ProcessSession session, List<HiveStreamingRecord> records) {
appendAvroRecords(session, successAvroHeader, successAvroWriter, successFlowFile, records);
successfulRecordCount.addAndGet(records.size());
}
private void appendRecordsToFailure(List<HiveStreamingRecord> records) {
appendRecordsToFlowFile(records, failureRecordQueue);
private void appendRecordsToFailure(ProcessSession session, List<HiveStreamingRecord> records) {
appendAvroRecords(session, failureAvroHeader, failureAvroWriter, failureFlowFile, records);
failedRecordCount.addAndGet(records.size());
}
private void appendRecordsToFlowFile(List<HiveStreamingRecord> records, BlockingQueue<List<HiveStreamingRecord>> queue) {
if (!queue.add(records)) {
throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size()));
}
}
private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) {
closeAvroWriters();
if (successfulRecordCount.get() > 0) {
// Transfer the flow file with successful records
successFlowFile.set(
@ -513,19 +496,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
});
}
private void closeAvroWriters() {
closed.set(true);
if (appendRecordThreadPool != null) {
// Having null thread pool means the input FlowFile was not processed at all, due to illegal format.
try {
if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout.");
}
} catch (InterruptedException e) {
logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted.");
}
}
}
}
private static class ShouldRetryException extends RuntimeException {
@ -545,7 +515,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
case Failure:
// Add the failed record to the failure flow file
getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e);
fc.appendRecordsToFailure(input);
fc.appendRecordsToFailure(session, input);
break;
case Retry:
@ -670,7 +640,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
Runnable flushSuccessfulRecords = () -> {
// Now send the records to the successful FlowFile and update the success count
functionContext.appendRecordsToSuccess(successfulRecords.get());
functionContext.appendRecordsToSuccess(session, successfulRecords.get());
// Clear the list of successful records, we'll use it at the end when we flush whatever records are left
successfulRecords.set(new ArrayList<>());
};