NIFI-7114: Fix file leaks in StandardCommsSession and S2S Reporting components

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Matthew Burgess 2020-02-21 16:20:38 -05:00 committed by Joe Witt
parent eef04709b9
commit 1b0e3865e9
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
6 changed files with 55 additions and 17 deletions

View File

@ -137,11 +137,12 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
final JsonArray jsonArray = arrayBuilder.build();
// Send the JSON document for the current batch
Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
return;
@ -162,8 +163,15 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
} catch (final IOException e) {
throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
} catch (final Exception e) {
if (transaction != null) {
transaction.error();
}
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
}
}
lastSentBulletinId = currMaxId;

View File

@ -191,12 +191,13 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes);
}
Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
long start = System.nanoTime();
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
return;
@ -215,7 +216,14 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
} catch (final Exception e) {
throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e);
if (transaction != null) {
transaction.error();
}
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e);
}
}
} else {

View File

@ -304,11 +304,12 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final JsonArray jsonArray = arrayBuilder.build();
// Send the JSON document for the current batch
Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
// Throw an exception to avoid provenance event id will not proceed so that those can be consumed again.
throw new ProcessException("All destination nodes are penalized; will attempt to send data later");
@ -329,8 +330,15 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()});
} catch (final IOException e) {
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
} catch (final Exception e) {
if (transaction != null) {
transaction.error();
}
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
}
}
});

View File

@ -161,12 +161,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
while(!jsonBatch.isEmpty()) {
// Send the JSON document for the current batch
Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
long start = System.nanoTime();
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
return;
@ -197,8 +198,15 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
fromIndex = toIndex;
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
jsonBatch = jsonArray.subList(fromIndex, toIndex);
} catch (final IOException e) {
throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e);
} catch (final Exception e) {
if (transaction != null) {
transaction.error();
}
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e);
}
}
}
}

View File

@ -135,10 +135,10 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
@Override
public WriteResult sendData(final RecordSet recordSet, final Map<String,String> attributes, final boolean sendZeroResults) throws IOException {
Transaction transaction = null;
try {
WriteResult writeResult = null;
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
} else {
@ -166,12 +166,16 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
}
}
return writeResult;
} catch(IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e);
if (transaction != null) {
transaction.error();
}
if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e);
}
}
}
@OnDisabled

View File

@ -71,6 +71,8 @@ public class StandardCommsSession implements CommsSession {
@Override
public void close() throws IOException {
socketChannel.close();
bufferedIn.close();
bufferedOut.close();
}
@Override