mirror of https://github.com/apache/nifi.git
NIFI-5172 Adding the ability to specify a record writer for PutElasticsearchHttpRecord in order to individually handle failed records
Addressing PR feedback Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #3299
This commit is contained in:
parent
be6cd4f0f2
commit
cd7edb1c04
|
@ -55,6 +55,8 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
|
|||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.SimpleDateFormatValidator;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
@ -72,6 +74,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.math.BigInteger;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
|
@ -121,6 +124,31 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||
.name("put-es-record-record-writer")
|
||||
.displayName("Record Writer")
|
||||
.description("After sending a batch of records, Elasticsearch will report if individual records failed to insert. As an example, this can happen if the record doesn't match the mapping" +
|
||||
"for the index it is being inserted into. This property specifies the Controller Service to use for writing out those individual records sent to 'failure'. If this is not set, " +
|
||||
"then the whole FlowFile will be routed to failure (including any records which may have been inserted successfully). Note that this will only be used if Elasticsearch reports " +
|
||||
"that individual records failed and that in the event that the entire FlowFile fails (e.g. in the event ES is down), the FF will be routed to failure without being interpreted " +
|
||||
"by this record writer. If there is an error while attempting to route the failures, the entire FlowFile will be routed to Failure. Also if every record failed individually, " +
|
||||
"the entire FlowFile will be routed to Failure without being parsed by the writer.")
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor LOG_ALL_ERRORS = new PropertyDescriptor.Builder()
|
||||
.name("put-es-record-log-all-errors")
|
||||
.displayName("Log all errors in batch")
|
||||
.description("After sending a batch of records, Elasticsearch will report if individual records failed to insert. As an example, this can happen if the record doesn't match the mapping " +
|
||||
"for the index it is being inserted into. If this is set to true, the processor will log the failure reason for the every failed record. When set to false only the first error " +
|
||||
"in the batch will be logged.")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.required(false)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
|
||||
.name("put-es-record-id-path")
|
||||
.displayName("Identifier Record Path")
|
||||
|
@ -222,6 +250,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
private volatile String dateFormat;
|
||||
private volatile String timeFormat;
|
||||
private volatile String timestampFormat;
|
||||
private volatile Boolean logAllErrors;
|
||||
|
||||
static {
|
||||
final Set<Relationship> _rels = new HashSet<>();
|
||||
|
@ -232,6 +261,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
|
||||
descriptors.add(RECORD_READER);
|
||||
descriptors.add(RECORD_WRITER);
|
||||
descriptors.add(LOG_ALL_ERRORS);
|
||||
descriptors.add(ID_RECORD_PATH);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
|
@ -299,6 +330,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
if (this.timestampFormat == null) {
|
||||
this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
|
||||
}
|
||||
|
||||
logAllErrors = context.getProperty(LOG_ALL_ERRORS).asBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -310,6 +343,13 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
}
|
||||
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
final Optional<RecordSetWriterFactory> writerFactoryOptional;
|
||||
|
||||
if (context.getProperty(RECORD_WRITER).isSet()) {
|
||||
writerFactoryOptional = Optional.of(context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class));
|
||||
} else {
|
||||
writerFactoryOptional = Optional.empty();
|
||||
}
|
||||
|
||||
// Authentication
|
||||
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
@ -429,14 +469,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
}
|
||||
final int statusCode = getResponse.code();
|
||||
|
||||
final Set<Integer> failures = new HashSet<>();
|
||||
|
||||
if (isSuccess(statusCode)) {
|
||||
ResponseBody responseBody = getResponse.body();
|
||||
try {
|
||||
try (ResponseBody responseBody = getResponse.body()) {
|
||||
final byte[] bodyBytes = responseBody.bytes();
|
||||
|
||||
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
|
||||
boolean errors = responseJson.get("errors").asBoolean(false);
|
||||
int failureCount = 0;
|
||||
// ES has no rollback, so if errors occur, log them and route the whole flow file to failure
|
||||
if (errors) {
|
||||
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
|
||||
|
@ -450,7 +490,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
JsonNode itemNode = itemNodeArray.get(i);
|
||||
int status = itemNode.findPath("status").asInt();
|
||||
if (!isSuccess(status)) {
|
||||
if (errorReason == null) {
|
||||
if (errorReason == null || logAllErrors) {
|
||||
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
|
||||
String reason = itemNode.findPath("result").asText();
|
||||
if (StringUtils.isEmpty(reason)) {
|
||||
|
@ -458,20 +498,21 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
reason = itemNode.findPath("reason").asText();
|
||||
}
|
||||
errorReason = reason;
|
||||
logger.error("Failed to process {} due to {}, transferring to failure",
|
||||
new Object[]{flowFile, errorReason});
|
||||
|
||||
logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
|
||||
new Object[]{i, flowFile, errorReason});
|
||||
}
|
||||
failureCount++;
|
||||
failures.add(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
flowFile = session.putAttribute(flowFile, "failure.count", Integer.toString(failureCount));
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} else {
|
||||
// Everything succeeded, route FF and end
|
||||
flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().send(flowFile, url.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
} catch (IOException ioe) {
|
||||
|
@ -479,6 +520,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
context.yield();
|
||||
return;
|
||||
} finally {
|
||||
getResponse.close();
|
||||
}
|
||||
} else if (statusCode / 100 == 5) {
|
||||
// 5xx -> RETRY, but a server error might last a while, so yield
|
||||
|
@ -486,11 +530,76 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||
new Object[]{statusCode, getResponse.message()});
|
||||
session.transfer(flowFile, REL_RETRY);
|
||||
context.yield();
|
||||
return;
|
||||
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
|
||||
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
// If everything failed or we don't have a writer factory, route the entire original FF to failure.
|
||||
if ((!failures.isEmpty() && failures.size() == recordCount ) || !writerFactoryOptional.isPresent()) {
|
||||
flowFile = session.putAttribute(flowFile, "failure.count", Integer.toString(failures.size()));
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
||||
} else if (!failures.isEmpty()) {
|
||||
// Some of the records failed and we have a writer, handle the failures individually.
|
||||
final RecordSetWriterFactory writerFactory = writerFactoryOptional.get();
|
||||
|
||||
// We know there are a mixture of successes and failures, create FFs for each and rename input FF to avoid confusion.
|
||||
final FlowFile inputFlowFile = flowFile;
|
||||
final FlowFile successFlowFile = session.create(inputFlowFile);
|
||||
final FlowFile failedFlowFile = session.create(inputFlowFile);
|
||||
|
||||
// Set up the reader and writers
|
||||
try (final OutputStream successOut = session.write(successFlowFile);
|
||||
final OutputStream failedOut = session.write(failedFlowFile);
|
||||
final InputStream in = session.read(inputFlowFile);
|
||||
final RecordReader reader = readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
|
||||
|
||||
final RecordSchema schema = writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
|
||||
|
||||
try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut);
|
||||
final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut)) {
|
||||
|
||||
successWriter.beginRecordSet();
|
||||
failedWriter.beginRecordSet();
|
||||
|
||||
// For each record, if it's in the failure set write it to the failure FF, otherwise it succeeded.
|
||||
Record record;
|
||||
int i = 0;
|
||||
while ((record = reader.nextRecord(false, false)) != null) {
|
||||
if (failures.contains(i)) {
|
||||
failedWriter.write(record);
|
||||
} else {
|
||||
successWriter.write(record);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));
|
||||
|
||||
// Normal behavior is to output with record.count. In order to not break backwards compatibility, set both here.
|
||||
session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
|
||||
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
|
||||
|
||||
session.transfer(successFlowFile, REL_SUCCESS);
|
||||
session.transfer(failedFlowFile, REL_FAILURE);
|
||||
session.remove(inputFlowFile);
|
||||
|
||||
} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
|
||||
// We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
|
||||
getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
|
||||
session.transfer(inputFlowFile, REL_FAILURE);
|
||||
if (successFlowFile != null) {
|
||||
session.remove(successFlowFile);
|
||||
}
|
||||
if (failedFlowFile != null) {
|
||||
session.remove(failedFlowFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
getResponse.close();
|
||||
}
|
||||
|
||||
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
|
|||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -76,21 +77,21 @@ public class TestPutElasticsearchHttpRecord {
|
|||
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||
}, record -> {
|
||||
assertEquals(2, record.get("id"));
|
||||
assertEquals("ræc2", record.get("name"));
|
||||
assertEquals("reç2", record.get("name"));
|
||||
assertEquals(102, record.get("code"));
|
||||
assertEquals("20/12/2018", record.get("date"));
|
||||
assertEquals("6:55 PM", record.get("time"));
|
||||
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||
}, record -> {
|
||||
assertEquals(3, record.get("id"));
|
||||
assertEquals("rèc3", record.get("name"));
|
||||
assertEquals("reç3", record.get("name"));
|
||||
assertEquals(103, record.get("code"));
|
||||
assertEquals("20/12/2018", record.get("date"));
|
||||
assertEquals("6:55 PM", record.get("time"));
|
||||
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||
}, record -> {
|
||||
assertEquals(4, record.get("id"));
|
||||
assertEquals("rëc4", record.get("name"));
|
||||
assertEquals("reç4", record.get("name"));
|
||||
assertEquals(104, record.get("code"));
|
||||
assertEquals("20/12/2018", record.get("date"));
|
||||
assertEquals("6:55 PM", record.get("time"));
|
||||
|
@ -397,11 +398,76 @@ public class TestPutElasticsearchHttpRecord {
|
|||
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerFailureWithWriter() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
|
||||
generateTestData(1);
|
||||
generateWriter();
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
|
||||
MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
|
||||
flowFileFailure.assertAttributeEquals("failure.count", "1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerFailureWithWriterMultipleRecords() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
|
||||
generateTestData();
|
||||
generateWriter();
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
|
||||
MockFlowFile flowFileSuccess = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
|
||||
flowFileSuccess.assertAttributeEquals("record.count", "2");
|
||||
MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
|
||||
flowFileFailure.assertAttributeEquals("record.count", "2");
|
||||
flowFileFailure.assertAttributeEquals("failure.count", "2");
|
||||
|
||||
assertEquals(1, runner.getLogger().getErrorMessages().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerFailureWithWriterMultipleRecordsLogging() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
|
||||
generateTestData();
|
||||
generateWriter();
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.LOG_ALL_ERRORS, "true");
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
|
||||
MockFlowFile flowFileSuccess = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
|
||||
flowFileSuccess.assertAttributeEquals("record.count", "2");
|
||||
MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
|
||||
flowFileFailure.assertAttributeEquals("record.count", "2");
|
||||
flowFileFailure.assertAttributeEquals("failure.count", "2");
|
||||
|
||||
assertEquals(2, runner.getLogger().getErrorMessages().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* A Test class that extends the processor in order to inject/mock behavior
|
||||
*/
|
||||
private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord {
|
||||
boolean responseHasFailures = false;
|
||||
int numResponseFailures = 0;
|
||||
OkHttpClient client;
|
||||
int statusCode = 200;
|
||||
String statusMessage = "OK";
|
||||
|
@ -409,7 +475,11 @@ public class TestPutElasticsearchHttpRecord {
|
|||
Consumer<Map>[] recordChecks;
|
||||
|
||||
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
|
||||
this.responseHasFailures = responseHasFailures;
|
||||
this.numResponseFailures = responseHasFailures ? 1 : 0;
|
||||
}
|
||||
|
||||
PutElasticsearchHttpRecordTestProcessor(int numResponseFailures) {
|
||||
this.numResponseFailures = numResponseFailures;
|
||||
}
|
||||
|
||||
void setStatus(int code, String message) {
|
||||
|
@ -454,9 +524,9 @@ public class TestPutElasticsearchHttpRecord {
|
|||
}
|
||||
}
|
||||
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
|
||||
sb.append(responseHasFailures);
|
||||
sb.append(numResponseFailures > 0);
|
||||
sb.append("\", \"items\": [");
|
||||
if (responseHasFailures) {
|
||||
for (int i = 0; i < numResponseFailures; i ++) {
|
||||
// This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
|
||||
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
|
||||
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
|
||||
|
@ -569,6 +639,10 @@ public class TestPutElasticsearchHttpRecord {
|
|||
}
|
||||
|
||||
private void generateTestData() throws IOException {
|
||||
generateTestData(4);
|
||||
}
|
||||
|
||||
private void generateTestData(int numRecords) throws IOException {
|
||||
|
||||
final MockRecordParser parser = new MockRecordParser();
|
||||
try {
|
||||
|
@ -586,9 +660,19 @@ public class TestPutElasticsearchHttpRecord {
|
|||
parser.addSchemaField("time", RecordFieldType.TIME);
|
||||
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
|
||||
|
||||
parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||
parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||
parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||
parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||
for(int i=1; i<=numRecords; i++) {
|
||||
parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||
}
|
||||
}
|
||||
|
||||
private void generateWriter() throws IOException {
|
||||
final MockRecordWriter writer = new MockRecordWriter();
|
||||
try {
|
||||
runner.addControllerService("writer", writer);
|
||||
} catch (InitializationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
runner.enableControllerService(writer);
|
||||
runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue