mirror of https://github.com/apache/nifi.git
NIFI-3659: This closes #1707. Added Processor to Split a FlowFile consisting of Record-oriented data into multiple FlowFiles, each containing a subset of the original FlowFile's records
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
55b8c7ddad
commit
a1bffbcc87
|
@ -0,0 +1,206 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriter;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
|
import org.apache.nifi.serialization.WriteResult;
|
||||||
|
import org.apache.nifi.serialization.record.PushBackRecordSet;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSet;
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@SupportsBatching
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@SideEffectFree
|
||||||
|
@Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."),
|
||||||
|
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship.")
|
||||||
|
})
|
||||||
|
@CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
|
||||||
|
public class SplitRecord extends AbstractProcessor {
|
||||||
|
|
||||||
|
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||||
|
.name("Record Reader")
|
||||||
|
.description("Specifies the Controller Service to use for reading incoming data")
|
||||||
|
.identifiesControllerService(RecordReaderFactory.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||||
|
.name("Record Writer")
|
||||||
|
.description("Specifies the Controller Service to use for writing out the records")
|
||||||
|
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor RECORDS_PER_SPLIT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Records Per Split")
|
||||||
|
.description("Specifies how many records should be written to each 'split' or 'segment' FlowFile")
|
||||||
|
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final Relationship REL_SPLITS = new Relationship.Builder()
|
||||||
|
.name("splits")
|
||||||
|
.description("The individual 'segments' of the original FlowFile will be routed to this relationship.")
|
||||||
|
.build();
|
||||||
|
static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||||
|
.name("original")
|
||||||
|
.description("Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship.")
|
||||||
|
.build();
|
||||||
|
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
|
.name("failure")
|
||||||
|
.description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
|
||||||
|
+ "the unchanged FlowFile will be routed to this relationship.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(RECORD_READER);
|
||||||
|
properties.add(RECORD_WRITER);
|
||||||
|
properties.add(RECORDS_PER_SPLIT);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SPLITS);
|
||||||
|
relationships.add(REL_ORIGINAL);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
FlowFile original = session.get();
|
||||||
|
if (original == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||||
|
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||||
|
final RecordSetWriter writer;
|
||||||
|
try (final InputStream rawIn = session.read(original);
|
||||||
|
final InputStream in = new BufferedInputStream(rawIn)) {
|
||||||
|
writer = writerFactory.createWriter(getLogger(), original, in);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e});
|
||||||
|
session.transfer(original, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
|
||||||
|
|
||||||
|
final List<FlowFile> splits = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
session.read(original, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final InputStream in) throws IOException {
|
||||||
|
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
|
||||||
|
|
||||||
|
final RecordSet recordSet = reader.createRecordSet();
|
||||||
|
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
|
||||||
|
|
||||||
|
while (pushbackSet.isAnotherRecord()) {
|
||||||
|
FlowFile split = session.create(original);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
|
||||||
|
split = session.write(split, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final OutputStream out) throws IOException {
|
||||||
|
if (maxRecords == 1) {
|
||||||
|
final Record record = pushbackSet.next();
|
||||||
|
writeResultRef.set(writer.write(record, out));
|
||||||
|
} else {
|
||||||
|
final RecordSet limitedSet = pushbackSet.limit(maxRecords);
|
||||||
|
writeResultRef.set(writer.write(limitedSet, out));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final WriteResult writeResult = writeResultRef.get();
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||||
|
attributes.putAll(writeResult.getAttributes());
|
||||||
|
|
||||||
|
session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
|
||||||
|
split = session.putAllAttributes(split, attributes);
|
||||||
|
} finally {
|
||||||
|
splits.add(split);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final SchemaNotFoundException | MalformedRecordException e) {
|
||||||
|
throw new ProcessException("Failed to parse incoming data", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (final ProcessException pe) {
|
||||||
|
getLogger().error("Failed to split {}", new Object[] {original, pe});
|
||||||
|
session.remove(splits);
|
||||||
|
session.transfer(original, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
session.transfer(original, REL_ORIGINAL);
|
||||||
|
session.transfer(splits, REL_SPLITS);
|
||||||
|
getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -88,6 +88,7 @@ org.apache.nifi.processors.standard.ScanContent
|
||||||
org.apache.nifi.processors.standard.SegmentContent
|
org.apache.nifi.processors.standard.SegmentContent
|
||||||
org.apache.nifi.processors.standard.SplitContent
|
org.apache.nifi.processors.standard.SplitContent
|
||||||
org.apache.nifi.processors.standard.SplitJson
|
org.apache.nifi.processors.standard.SplitJson
|
||||||
|
org.apache.nifi.processors.standard.SplitRecord
|
||||||
org.apache.nifi.processors.standard.SplitText
|
org.apache.nifi.processors.standard.SplitText
|
||||||
org.apache.nifi.processors.standard.SplitXml
|
org.apache.nifi.processors.standard.SplitXml
|
||||||
org.apache.nifi.processors.standard.TailFile
|
org.apache.nifi.processors.standard.TailFile
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.standard.util.record.MockRecordParser;
|
||||||
|
import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestSplitRecord {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndividualRecordPerSplit() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(SplitRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "1");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_SPLITS, 3);
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
|
||||||
|
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
|
||||||
|
|
||||||
|
for (final MockFlowFile mff : out) {
|
||||||
|
mff.assertAttributeEquals("record.count", "1");
|
||||||
|
mff.assertAttributeEquals("mime.type", "text/plain");
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\n")).count());
|
||||||
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJane Doe,47\n")).count());
|
||||||
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleRecordsPerSplit() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(SplitRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "2");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_SPLITS, 2);
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
|
||||||
|
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
|
||||||
|
|
||||||
|
assertEquals(1, out.stream().filter(mff -> mff.getAttribute("record.count").equals("1")).count());
|
||||||
|
assertTrue(out.stream().allMatch(mff -> mff.getAttribute("mime.type").equals("text/plain")));
|
||||||
|
|
||||||
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\nJane Doe,47\n")).count());
|
||||||
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllSplitsOneDesintation() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(SplitRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "3");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_SPLITS, 1);
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS).get(0);
|
||||||
|
|
||||||
|
out.assertAttributeEquals("record.count", "3");
|
||||||
|
out.assertAttributeEquals("mime.type", "text/plain");
|
||||||
|
|
||||||
|
out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadFailure() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser(2);
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(SplitRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "1");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
final MockFlowFile original = runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(SplitRecord.REL_FAILURE, 1);
|
||||||
|
final MockFlowFile failed = runner.getFlowFilesForRelationship(SplitRecord.REL_FAILURE).get(0);
|
||||||
|
assertTrue(original == failed);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -96,7 +96,28 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WriteResult write(Record record, OutputStream out) throws IOException {
|
public WriteResult write(Record record, OutputStream out) throws IOException {
|
||||||
return null;
|
out.write(header.getBytes());
|
||||||
|
out.write("\n".getBytes());
|
||||||
|
|
||||||
|
final int numCols = record.getSchema().getFieldCount();
|
||||||
|
int i = 0;
|
||||||
|
for (final String fieldName : record.getSchema().getFieldNames()) {
|
||||||
|
final String val = record.getAsString(fieldName);
|
||||||
|
if (quoteValues) {
|
||||||
|
out.write("\"".getBytes());
|
||||||
|
out.write(val.getBytes());
|
||||||
|
out.write("\"".getBytes());
|
||||||
|
} else {
|
||||||
|
out.write(val.getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i++ < numCols - 1) {
|
||||||
|
out.write(",".getBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.write("\n".getBytes());
|
||||||
|
|
||||||
|
return WriteResult.of(1, Collections.emptyMap());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.serialization.record;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class PushBackRecordSet implements RecordSet {
|
||||||
|
private final RecordSet original;
|
||||||
|
private Record pushback;
|
||||||
|
|
||||||
|
public PushBackRecordSet(final RecordSet original) {
|
||||||
|
this.original = original;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordSchema getSchema() throws IOException {
|
||||||
|
return original.getSchema();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Record next() throws IOException {
|
||||||
|
if (pushback != null) {
|
||||||
|
final Record record = pushback;
|
||||||
|
pushback = null;
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
return original.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void pushback(final Record record) {
|
||||||
|
if (pushback != null) {
|
||||||
|
throw new IllegalStateException("RecordSet already has a Record pushed back. Cannot push back more than one record at a time.");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.pushback = record;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAnotherRecord() throws IOException {
|
||||||
|
if (pushback != null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Record nextRecord = next();
|
||||||
|
if (nextRecord == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pushback(nextRecord);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,6 +31,44 @@ public interface RecordSet {
|
||||||
*/
|
*/
|
||||||
Record next() throws IOException;
|
Record next() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new Record Set that will return no more than {@code maxRecords} records from this
|
||||||
|
* RecordSet. Any Records that are pulled from this newly created RecordSet will also advance
|
||||||
|
* the cursor in this Record Set and vice versa.
|
||||||
|
*
|
||||||
|
* @param maxRecords the maximum number of records to return from the new RecordSet
|
||||||
|
* @return a view of this RecordSet that limits the number of records returned
|
||||||
|
*/
|
||||||
|
default RecordSet limit(final int maxRecords) {
|
||||||
|
if (maxRecords < 0) {
|
||||||
|
throw new IllegalArgumentException("Cannot limit number of records to " + maxRecords + ". Limit must be a non-negative integer");
|
||||||
|
}
|
||||||
|
|
||||||
|
final RecordSet original = this;
|
||||||
|
return new RecordSet() {
|
||||||
|
private int count = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordSchema getSchema() throws IOException {
|
||||||
|
return original.getSchema();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Record next() throws IOException {
|
||||||
|
if (count >= maxRecords) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Record record = original.next();
|
||||||
|
if (record != null) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
public static RecordSet of(final RecordSchema schema, final Record... records) {
|
public static RecordSet of(final RecordSchema schema, final Record... records) {
|
||||||
return new RecordSet() {
|
return new RecordSet() {
|
||||||
private int index = 0;
|
private int index = 0;
|
||||||
|
|
Loading…
Reference in New Issue