From 7db956fea7e0e82b0acd415b1076cdf2bb1223c2 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Mon, 17 Jul 2023 13:37:42 -0400 Subject: [PATCH] NIFI-11825 Fixed QueryRecord closing of resources This closes #7496 Signed-off-by: David Handermann --- .../nifi/processors/standard/QueryRecord.java | 48 ++------ .../RecordResultSetOutputStreamCallback.java | 87 +++++++++++++++ ...stRecordResultSetOutputStreamCallback.java | 104 ++++++++++++++++++ 3 files changed, 203 insertions(+), 36 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/calcite/RecordResultSetOutputStreamCallback.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/calcite/TestRecordResultSetOutputStreamCallback.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 185e742897..8be1276e8f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -49,24 +49,21 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.standard.calcite.RecordPathFunctions; +import org.apache.nifi.processors.standard.calcite.RecordResultSetOutputStreamCallback; import org.apache.nifi.queryrecord.FlowFileTable; -import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.Tuple; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -83,7 +80,6 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION; @@ -311,41 +307,20 @@ public class QueryRecord extends AbstractProcessor { try { final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue(); - final AtomicReference writeResultRef = new AtomicReference<>(); final QueryResult queryResult = query(session, original, readerSchema, sql, recordReaderFactory); - final AtomicReference mimeTypeRef = new AtomicReference<>(); - final FlowFile originalFlowFile = original; + final ResultSet rs = queryResult.getResultSet(); + final RecordResultSetOutputStreamCallback writer = new RecordResultSetOutputStreamCallback(getLogger(), + rs, writerSchema, defaultPrecision, defaultScale, recordSetWriterFactory, originalAttributes); try { - final ResultSet rs = queryResult.getResultSet(); - transformed = session.write(transformed, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - final ResultSetRecordSet recordSet; - final RecordSchema writeSchema; - - try { - recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale); - final RecordSchema resultSetSchema = recordSet.getSchema(); - writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema); - } catch (final SQLException | SchemaNotFoundException e) { - throw new ProcessException(e); - } - - try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, originalFlowFile)) { - writeResultRef.set(resultSetWriter.write(recordSet)); - mimeTypeRef.set(resultSetWriter.getMimeType()); - } catch (final Exception e) { - throw new IOException(e); - } - } - }); + transformed = session.write(transformed, writer); } finally { - closeQuietly(queryResult); + closeQuietly(rs, queryResult); } recordsRead = Math.max(recordsRead, queryResult.getRecordsRead()); - final WriteResult result = writeResultRef.get(); + final WriteResult result = writer.getWriteResult(); + final String mimeType = writer.getMimeType(); if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) { session.remove(transformed); flowFileRemoved = true; @@ -356,8 +331,9 @@ public class QueryRecord extends AbstractProcessor { if (result.getAttributes() != null) { attributesToAdd.putAll(result.getAttributes()); } - - attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); + if (StringUtils.isNotEmpty(mimeType)) { + attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType); + } attributesToAdd.put("record.count", String.valueOf(result.getRecordCount())); attributesToAdd.put(ROUTE_ATTRIBUTE_KEY, relationship.getName()); transformed = session.putAllAttributes(transformed, attributesToAdd); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/calcite/RecordResultSetOutputStreamCallback.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/calcite/RecordResultSetOutputStreamCallback.java new file mode 100644 index 0000000000..98a11eeaaf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/calcite/RecordResultSetOutputStreamCallback.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.calcite; + +import org.apache.commons.lang3.ObjectUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; + +public class RecordResultSetOutputStreamCallback implements OutputStreamCallback { + private final ComponentLog logger; + private final ResultSet rs; + private final RecordSchema writerSchema; + private final Integer defaultPrecision; + private final Integer defaultScale; + private final RecordSetWriterFactory recordSetWriterFactory; + private final Map originalAttributes; + + private WriteResult writeResult; + private String mimeType; + + public RecordResultSetOutputStreamCallback( + final ComponentLog logger, final ResultSet rs, final RecordSchema writerSchema, + final Integer defaultPrecision, final Integer defaultScale, + final RecordSetWriterFactory recordSetWriterFactory, final Map originalAttributes) { + this.logger = logger; + this.rs = rs; + this.writerSchema = writerSchema; + this.defaultPrecision = defaultPrecision; + this.defaultScale = defaultScale; + this.recordSetWriterFactory = recordSetWriterFactory; + this.originalAttributes = originalAttributes; + } + + public WriteResult getWriteResult() throws ProcessException { + return ObjectUtils.defaultIfNull(writeResult, WriteResult.EMPTY); + } + + public String getMimeType() { + return mimeType; + } + + @Override + public void process(OutputStream out) throws IOException { + final RecordSchema writeSchema; + + try (final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale)) { + final RecordSchema resultSetSchema = recordSet.getSchema(); + writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema); + + try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, out, originalAttributes)) { + writeResult = resultSetWriter.write(recordSet); + mimeType = resultSetWriter.getMimeType(); + } catch (final Exception e) { + throw new IOException("Writing result records failed", e); + } + } catch (final SQLException | SchemaNotFoundException e) { + throw new ProcessException("Reading query result records failed", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/calcite/TestRecordResultSetOutputStreamCallback.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/calcite/TestRecordResultSetOutputStreamCallback.java new file mode 100644 index 0000000000..8644903112 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/calcite/TestRecordResultSetOutputStreamCallback.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.calcite; + +import org.apache.calcite.adapter.java.ReflectiveSchema; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.nifi.csv.CSVRecordSetWriter; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.standard.QueryRecord; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.LinkedHashMap; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRecordResultSetOutputStreamCallback { + + @Test + void testResultSetClosed() throws IOException, SQLException, InitializationException { + final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class); + + final String writerId = "record-writer"; + runner.setProperty(writerId, writerId); + + final RecordSetWriterFactory writerService = new CSVRecordSetWriter(); + runner.addControllerService(writerId, writerService); + runner.setProperty(writerService, "schema-access-strategy", "inherit-record-schema"); + runner.enableControllerService(writerService); + + final ResultSet resultSet = getResultSet(); + + final RecordField fieldFirst = new RecordField("first", RecordFieldType.STRING.getDataType()); + final RecordField fieldLast = new RecordField("last", RecordFieldType.STRING.getDataType()); + final RecordSchema writerSchema = new SimpleRecordSchema(Arrays.asList(fieldFirst, fieldLast)); + + final FlowFile flowFile = mock(FlowFile.class); + when(flowFile.getAttributes()).thenReturn(new LinkedHashMap<>()); + + final RecordResultSetOutputStreamCallback writer = new RecordResultSetOutputStreamCallback(runner.getLogger(), + resultSet, writerSchema, 0, 0, writerService, flowFile.getAttributes()); + + final OutputStream os = new ByteArrayOutputStream(); + writer.process(os); + + assertTrue(resultSet.isClosed()); + } + + private ResultSet getResultSet() throws SQLException { + DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver()); + final Connection connection = DriverManager.getConnection("jdbc:calcite:"); + final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + calciteConnection.getRootSchema().add("TEST", new ReflectiveSchema(new CalciteTestSchema())); + final Statement statement = calciteConnection.createStatement(); + return statement.executeQuery("SELECT * FROM TEST.PERSONS"); + } + + public static class Person { + public final String first; + public final String last; + + public Person(final String first, final String last) { + this.first = first; + this.last = last; + } + } + + public static class CalciteTestSchema extends AbstractSchema { + public Person[] PERSONS = { new Person("Joe", "Smith"), new Person("Bob", "Jones") }; + } +}