NIFI-11825 Fixed QueryRecord closing of resources

This closes #7496

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-07-17 13:37:42 -04:00 committed by exceptionfactory
parent f6a14bc475
commit 7db956fea7
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 203 additions and 36 deletions

View File

@ -49,24 +49,21 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.calcite.RecordPathFunctions;
import org.apache.nifi.processors.standard.calcite.RecordResultSetOutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.Tuple;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@ -83,7 +80,6 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
@ -311,41 +307,20 @@ public class QueryRecord extends AbstractProcessor {
try {
final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
final QueryResult queryResult = query(session, original, readerSchema, sql, recordReaderFactory);
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
final FlowFile originalFlowFile = original;
final ResultSet rs = queryResult.getResultSet();
final RecordResultSetOutputStreamCallback writer = new RecordResultSetOutputStreamCallback(getLogger(),
rs, writerSchema, defaultPrecision, defaultScale, recordSetWriterFactory, originalAttributes);
try {
final ResultSet rs = queryResult.getResultSet();
transformed = session.write(transformed, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
final ResultSetRecordSet recordSet;
final RecordSchema writeSchema;
try {
recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale);
final RecordSchema resultSetSchema = recordSet.getSchema();
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
} catch (final SQLException | SchemaNotFoundException e) {
throw new ProcessException(e);
}
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, originalFlowFile)) {
writeResultRef.set(resultSetWriter.write(recordSet));
mimeTypeRef.set(resultSetWriter.getMimeType());
} catch (final Exception e) {
throw new IOException(e);
}
}
});
transformed = session.write(transformed, writer);
} finally {
closeQuietly(queryResult);
closeQuietly(rs, queryResult);
}
recordsRead = Math.max(recordsRead, queryResult.getRecordsRead());
final WriteResult result = writeResultRef.get();
final WriteResult result = writer.getWriteResult();
final String mimeType = writer.getMimeType();
if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) {
session.remove(transformed);
flowFileRemoved = true;
@ -356,8 +331,9 @@ public class QueryRecord extends AbstractProcessor {
if (result.getAttributes() != null) {
attributesToAdd.putAll(result.getAttributes());
}
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
if (StringUtils.isNotEmpty(mimeType)) {
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType);
}
attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
attributesToAdd.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
transformed = session.putAllAttributes(transformed, attributesToAdd);

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.calcite;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
public class RecordResultSetOutputStreamCallback implements OutputStreamCallback {
private final ComponentLog logger;
private final ResultSet rs;
private final RecordSchema writerSchema;
private final Integer defaultPrecision;
private final Integer defaultScale;
private final RecordSetWriterFactory recordSetWriterFactory;
private final Map<String, String> originalAttributes;
private WriteResult writeResult;
private String mimeType;
public RecordResultSetOutputStreamCallback(
final ComponentLog logger, final ResultSet rs, final RecordSchema writerSchema,
final Integer defaultPrecision, final Integer defaultScale,
final RecordSetWriterFactory recordSetWriterFactory, final Map<String, String> originalAttributes) {
this.logger = logger;
this.rs = rs;
this.writerSchema = writerSchema;
this.defaultPrecision = defaultPrecision;
this.defaultScale = defaultScale;
this.recordSetWriterFactory = recordSetWriterFactory;
this.originalAttributes = originalAttributes;
}
public WriteResult getWriteResult() throws ProcessException {
return ObjectUtils.defaultIfNull(writeResult, WriteResult.EMPTY);
}
public String getMimeType() {
return mimeType;
}
@Override
public void process(OutputStream out) throws IOException {
final RecordSchema writeSchema;
try (final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale)) {
final RecordSchema resultSetSchema = recordSet.getSchema();
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, out, originalAttributes)) {
writeResult = resultSetWriter.write(recordSet);
mimeType = resultSetWriter.getMimeType();
} catch (final Exception e) {
throw new IOException("Writing result records failed", e);
}
} catch (final SQLException | SchemaNotFoundException e) {
throw new ProcessException("Reading query result records failed", e);
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.calcite;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processors.standard.QueryRecord;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.LinkedHashMap;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestRecordResultSetOutputStreamCallback {
@Test
void testResultSetClosed() throws IOException, SQLException, InitializationException {
final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
final String writerId = "record-writer";
runner.setProperty(writerId, writerId);
final RecordSetWriterFactory writerService = new CSVRecordSetWriter();
runner.addControllerService(writerId, writerService);
runner.setProperty(writerService, "schema-access-strategy", "inherit-record-schema");
runner.enableControllerService(writerService);
final ResultSet resultSet = getResultSet();
final RecordField fieldFirst = new RecordField("first", RecordFieldType.STRING.getDataType());
final RecordField fieldLast = new RecordField("last", RecordFieldType.STRING.getDataType());
final RecordSchema writerSchema = new SimpleRecordSchema(Arrays.asList(fieldFirst, fieldLast));
final FlowFile flowFile = mock(FlowFile.class);
when(flowFile.getAttributes()).thenReturn(new LinkedHashMap<>());
final RecordResultSetOutputStreamCallback writer = new RecordResultSetOutputStreamCallback(runner.getLogger(),
resultSet, writerSchema, 0, 0, writerService, flowFile.getAttributes());
final OutputStream os = new ByteArrayOutputStream();
writer.process(os);
assertTrue(resultSet.isClosed());
}
private ResultSet getResultSet() throws SQLException {
DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver());
final Connection connection = DriverManager.getConnection("jdbc:calcite:");
final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
calciteConnection.getRootSchema().add("TEST", new ReflectiveSchema(new CalciteTestSchema()));
final Statement statement = calciteConnection.createStatement();
return statement.executeQuery("SELECT * FROM TEST.PERSONS");
}
public static class Person {
public final String first;
public final String last;
public Person(final String first, final String last) {
this.first = first;
this.last = last;
}
}
public static class CalciteTestSchema extends AbstractSchema {
public Person[] PERSONS = { new Person("Joe", "Smith"), new Person("Bob", "Jones") };
}
}