Fix resource leak issues suggested by Amazon CodeGuru (#816)

* Address a kind of issue suggested by Amazon CodeGuru Reviewer:

* Add try-with-resources block to automatically close the resources after using to avoid resource leak, in `SymbolicLinkPreservingTarIT`, `LicenseAnalyzer`, `SymbolicLinkPreservingUntarTransform`, `ConcurrentSeqNoVersioningIT` in `VersionProperties`, `GeoFilterIT`, `XContentHelper`,  `Json` and `IndexShard` class

* Add try-finally block to close the resources after using to avoid resource leak, in `ServerChannelContext` class.

* Add try-catch block to close the resources when exception occurs in `FsBlobContainer` class (when XContentFactory.xContentType throws an exception).

* Close resources when assertion error occurs, in `ServerChannelContext` class.
This commit is contained in:
Tianli Feng 2021-06-15 09:31:36 -07:00 committed by GitHub
parent dff274414d
commit 110cef7882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 138 additions and 101 deletions

View File

@ -107,7 +107,10 @@ public class SymbolicLinkPreservingTarIT extends GradleIntegrationTestCase {
private void assertTar(final String extension, final FileInputStreamWrapper wrapper, boolean preserveFileTimestamps)
throws IOException {
try (TarArchiveInputStream tar = new TarArchiveInputStream(wrapper.apply(new FileInputStream(getOutputFile(extension))))) {
try (
FileInputStream fis = new FileInputStream(getOutputFile(extension));
TarArchiveInputStream tar = new TarArchiveInputStream(wrapper.apply(fis))
) {
TarArchiveEntry entry = tar.getNextTarEntry();
boolean realFolderEntry = false;
boolean fileEntry = false;

View File

@ -101,11 +101,10 @@ public class VersionProperties {
private static Properties getVersionProperties() {
Properties props = new Properties();
InputStream propsStream = VersionProperties.class.getResourceAsStream("/version.properties");
if (propsStream == null) {
throw new IllegalStateException("/version.properties resource missing");
}
try {
try (InputStream propsStream = VersionProperties.class.getResourceAsStream("/version.properties")) {
if (propsStream == null) {
throw new IllegalStateException("/version.properties resource missing");
}
props.load(propsStream);
} catch (IOException e) {
throw new IllegalStateException("Failed to load version properties", e);

View File

@ -209,8 +209,8 @@ public class LicenseAnalyzer {
}
public boolean matches(File licenseFile) {
try {
String content = String.join("\n", IOUtils.readLines(new FileInputStream(licenseFile), "UTF-8")).replaceAll("\\*", " ");
try (FileInputStream fis = new FileInputStream(licenseFile)) {
String content = String.join("\n", IOUtils.readLines(fis, "UTF-8")).replaceAll("\\*", " ");
return pattern.matcher(content).find();
} catch (IOException e) {
throw new UncheckedIOException(e);

View File

@ -57,37 +57,42 @@ public abstract class SymbolicLinkPreservingUntarTransform implements UnpackTran
.info("Unpacking " + tarFile.getName() + " using " + SymbolicLinkPreservingUntarTransform.class.getSimpleName() + ".");
Function<String, Path> pathModifier = pathResolver();
TarArchiveInputStream tar = new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(tarFile)));
final Path destinationPath = targetDir.toPath();
TarArchiveEntry entry = tar.getNextTarEntry();
while (entry != null) {
final Path relativePath = pathModifier.apply(entry.getName());
if (relativePath == null || relativePath.getFileName().equals(CURRENT_DIR_PATH)) {
entry = tar.getNextTarEntry();
continue;
}
final Path destination = destinationPath.resolve(relativePath);
final Path parent = destination.getParent();
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
if (entry.isDirectory()) {
Files.createDirectory(destination);
} else if (entry.isSymbolicLink()) {
Files.createSymbolicLink(destination, Paths.get(entry.getLinkName()));
} else {
// copy the file from the archive using a small buffer to avoid heaping
Files.createFile(destination);
try (FileOutputStream fos = new FileOutputStream(destination.toFile())) {
tar.transferTo(fos);
try (
FileInputStream fis = new FileInputStream(tarFile);
GzipCompressorInputStream gzip = new GzipCompressorInputStream(fis);
TarArchiveInputStream tar = new TarArchiveInputStream(gzip)
) {
final Path destinationPath = targetDir.toPath();
TarArchiveEntry entry = tar.getNextTarEntry();
while (entry != null) {
final Path relativePath = pathModifier.apply(entry.getName());
if (relativePath == null || relativePath.getFileName().equals(CURRENT_DIR_PATH)) {
entry = tar.getNextTarEntry();
continue;
}
final Path destination = destinationPath.resolve(relativePath);
final Path parent = destination.getParent();
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
if (entry.isDirectory()) {
Files.createDirectory(destination);
} else if (entry.isSymbolicLink()) {
Files.createSymbolicLink(destination, Paths.get(entry.getLinkName()));
} else {
// copy the file from the archive using a small buffer to avoid heaping
Files.createFile(destination);
try (FileOutputStream fos = new FileOutputStream(destination.toFile())) {
tar.transferTo(fos);
}
}
if (entry.isSymbolicLink() == false) {
// check if the underlying file system supports POSIX permissions
chmod(destination, entry.getMode());
}
entry = tar.getNextTarEntry();
}
if (entry.isSymbolicLink() == false) {
// check if the underlying file system supports POSIX permissions
chmod(destination, entry.getMode());
}
entry = tar.getNextTarEntry();
}
}

View File

@ -70,10 +70,15 @@ public class ServerChannelContext extends ChannelContext<ServerSocketChannel> {
}
public void acceptChannels(Supplier<NioSelector> selectorSupplier) throws IOException {
SocketChannel acceptedChannel;
while ((acceptedChannel = accept(rawChannel)) != null) {
NioSocketChannel nioChannel = channelFactory.acceptNioChannel(acceptedChannel, selectorSupplier);
acceptor.accept(nioChannel);
SocketChannel acceptedChannel = null;
try {
while ((acceptedChannel = accept(rawChannel)) != null) {
NioSocketChannel nioChannel = channelFactory.acceptNioChannel(acceptedChannel, selectorSupplier);
acceptor.accept(nioChannel);
}
} finally {
if (acceptedChannel != null)
acceptedChannel.close();
}
}
@ -124,7 +129,10 @@ public class ServerChannelContext extends ChannelContext<ServerSocketChannel> {
try {
assert serverSocketChannel.isBlocking() == false;
SocketChannel channel = AccessController.doPrivileged((PrivilegedExceptionAction<SocketChannel>) serverSocketChannel::accept);
assert serverSocketChannel.isBlocking() == false;
if (serverSocketChannel.isBlocking() == true) {
channel.close();
throw new AssertionError("serverSocketChannel is blocking.");
}
return channel;
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();

View File

@ -39,6 +39,7 @@ import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.json.JsonXContent;
import java.io.IOException;
import java.io.OutputStream;
public class Json {
/**
@ -83,6 +84,8 @@ public class Json {
}
builder.value(data);
builder.flush();
return builder.getOutputStream().toString();
try (OutputStream out = builder.getOutputStream()) {
return out.toString();
}
}
}

View File

@ -115,19 +115,20 @@ public class GeoFilterIT extends OpenSearchIntegTestCase {
}
private static byte[] unZipData(String path) throws IOException {
InputStream is = Streams.class.getResourceAsStream(path);
if (is == null) {
throw new FileNotFoundException("Resource [" + path + "] not found in classpath");
try (InputStream is = Streams.class.getResourceAsStream(path)) {
if (is == null) {
throw new FileNotFoundException("Resource [" + path + "] not found in classpath");
}
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPInputStream in = new GZIPInputStream(is)
) {
Streams.copy(in, out);
return out.toByteArray();
}
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPInputStream in = new GZIPInputStream(is);
Streams.copy(in, out);
is.close();
out.close();
return out.toByteArray();
}
public void testShapeBuilders() {

View File

@ -729,7 +729,9 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
if (args.length < 3) {
System.err.println("usage: <file> <primaryTerm> <seqNo>");
} else {
runLinearizabilityChecker(new FileInputStream(args[0]), Long.parseLong(args[1]), Long.parseLong(args[2]));
try (FileInputStream fis = new FileInputStream(args[0])) {
runLinearizabilityChecker(fis, Long.parseLong(args[1]), Long.parseLong(args[2]));
}
}
}

View File

@ -64,12 +64,19 @@ public class XContentHelper {
BytesReference bytes) throws IOException {
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
InputStream compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedInput.markSupported() == false) {
compressedInput = new BufferedInputStream(compressedInput);
InputStream compressedInput = null;
try {
compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedInput.markSupported() == false) {
compressedInput = new BufferedInputStream(compressedInput);
}
final XContentType contentType = XContentFactory.xContentType(compressedInput);
return XContentFactory.xContent(contentType).createParser(xContentRegistry, deprecationHandler, compressedInput);
} catch (Exception e) {
if(compressedInput != null)
compressedInput.close();
throw e;
}
final XContentType contentType = XContentFactory.xContentType(compressedInput);
return XContentFactory.xContent(contentType).createParser(xContentRegistry, deprecationHandler, compressedInput);
} else {
return XContentFactory.xContent(xContentType(bytes)).createParser(xContentRegistry, deprecationHandler, bytes.streamInput());
}
@ -83,11 +90,18 @@ public class XContentHelper {
Objects.requireNonNull(xContentType);
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
InputStream compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedInput.markSupported() == false) {
compressedInput = new BufferedInputStream(compressedInput);
InputStream compressedInput = null;
try {
compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedInput.markSupported() == false) {
compressedInput = new BufferedInputStream(compressedInput);
}
return XContentFactory.xContent(xContentType).createParser(xContentRegistry, deprecationHandler, compressedInput);
} catch (Exception e) {
if (compressedInput != null)
compressedInput.close();
throw e;
}
return XContentFactory.xContent(xContentType).createParser(xContentRegistry, deprecationHandler, compressedInput);
} else {
if (bytes instanceof BytesArray) {
final BytesArray array = (BytesArray) bytes;
@ -124,7 +138,6 @@ public class XContentHelper {
compressedStreamInput = new BufferedInputStream(compressedStreamInput);
}
input = compressedStreamInput;
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
} else if (bytes instanceof BytesArray) {
final BytesArray arr = (BytesArray) bytes;
final byte[] raw = arr.array();
@ -135,9 +148,9 @@ public class XContentHelper {
convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered));
} else {
input = bytes.streamInput();
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
}
try (InputStream stream = input) {
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(stream);
return new Tuple<>(Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), stream, ordered));
}

View File

@ -2535,44 +2535,47 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (!Lucene.indexExists(store.directory())) {
return;
}
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
if ("checksum".equals(checkIndexOnStartup)) {
// physical verification only: verify all checksums for the latest commit
IOException corrupt = null;
MetadataSnapshot metadata = snapshotStoreMetadata();
for (Map.Entry<String, StoreFileMetadata> entry : metadata.asMap().entrySet()) {
try {
Store.checkIntegrity(entry.getValue(), store.directory());
out.println("checksum passed: " + entry.getKey());
} catch (IOException exc) {
out.println("checksum failed: " + entry.getKey());
exc.printStackTrace(out);
corrupt = exc;
try (
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name())
) {
if ("checksum".equals(checkIndexOnStartup)) {
// physical verification only: verify all checksums for the latest commit
IOException corrupt = null;
MetadataSnapshot metadata = snapshotStoreMetadata();
for (Map.Entry<String, StoreFileMetadata> entry : metadata.asMap().entrySet()) {
try {
Store.checkIntegrity(entry.getValue(), store.directory());
out.println("checksum passed: " + entry.getKey());
} catch (IOException exc) {
out.println("checksum failed: " + entry.getKey());
exc.printStackTrace(out);
corrupt = exc;
}
}
out.flush();
if (corrupt != null) {
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
throw corrupt;
}
} else {
// full checkindex
final CheckIndex.Status status = store.checkIndex(out);
out.flush();
if (!status.clean) {
if (state == IndexShardState.CLOSED) {
// ignore if closed....
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
throw new IOException("index check failure");
}
}
out.flush();
if (corrupt != null) {
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
throw corrupt;
}
} else {
// full checkindex
final CheckIndex.Status status = store.checkIndex(out);
out.flush();
if (!status.clean) {
if (state == IndexShardState.CLOSED) {
// ignore if closed....
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
throw new IOException("index check failure");
}
}
if (logger.isDebugEnabled()) {
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
if (logger.isDebugEnabled()) {
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
}
}
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));