From 7851df7927f26f86f7aa9271496a0ac653c5b91e Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 29 Mar 2021 20:28:25 -0400 Subject: [PATCH] HBASE-25712 Backport of HBASE-25692 to branch-1 HBASE-25692 ensures that we do not leak any InputStream (Socket) which would otherwise remain as CLOSE_WAIT until the java process exits. These orphaned sockets would eventually saturate Linux network and file limits. Closes #3104 Signed-off-by: Wellington Chevreuil --- .../apache/hadoop/hbase/wal/WALFactory.java | 56 +++---- .../hadoop/hbase/wal/FileSystemProxy.java | 106 ++++++++++++++ .../hadoop/hbase/wal/TestWALFactory.java | 137 ++++++++++++++++++ 3 files changed, 274 insertions(+), 25 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8a82bd03b47..47d3c54fae6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -311,7 +311,7 @@ public class WALFactory { reader.init(fs, path, conf, stream); return reader; } - } catch (IOException e) { + } catch (Exception e) { if (stream != null) { try { stream.close(); @@ -328,33 +328,39 @@ public class WALFactory { LOG.debug("exception details", exception); } } - String msg = e.getMessage(); - if (msg != null && (msg.contains("Cannot obtain block length") - || msg.contains("Could not obtain the last block") - || msg.matches("Blocklist for [^ ]* has changed.*"))) { - if (++nbAttempt == 1) { - LOG.warn("Lease should have recovered. This is not expected. Will retry", e); - } - if (reporter != null && !reporter.progress()) { - throw new InterruptedIOException("Operation is cancelled"); - } - if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.error("Can't open after " + nbAttempt + " attempts and " - + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); - } else { - try { - Thread.sleep(nbAttempt < 3 ? 500 : 1000); - continue; // retry - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; + if (e instanceof IOException) { + String msg = e.getMessage(); + if (msg != null && (msg.contains("Cannot obtain block length") + || msg.contains("Could not obtain the last block") + || msg.matches("Blocklist for [^ ]* has changed.*"))) { + if (++nbAttempt == 1) { + LOG.warn("Lease should have recovered. This is not expected. Will retry", e); } + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } + if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.error("Can't open after " + nbAttempt + " attempts and " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + + "ms " + " for " + path); + } else { + try { + Thread.sleep(nbAttempt < 3 ? 500 : 1000); + continue; // retry + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + throw new LeaseNotRecoveredException(e); + } else { + throw e; } - throw new LeaseNotRecoveredException(e); - } else { - throw e; } + + // Rethrow the original exception if we are not retrying due to HDFS-isms. + throw e; } } } catch (IOException ie) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java new file mode 100644 index 00000000000..2346ef0df42 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +/** + * Create a non-abstract "proxy" for FileSystem because FileSystem is an + * abstract class and not an interface. Only interfaces can be used with the + * Java Proxy class to override functionality via an InvocationHandler. + * + */ +public class FileSystemProxy extends FileSystem { + private final FileSystem real; + + public FileSystemProxy(FileSystem real) { + this.real = real; + } + + @Override + public FSDataInputStream open(Path p) throws IOException { + return real.open(p); + } + + @Override + public URI getUri() { + return real.getUri(); + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return real.open(f, bufferSize); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + return real.create(f, permission, overwrite, bufferSize, replication, blockSize, progress); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) + throws IOException { + return real.append(f, bufferSize, progress); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return real.rename(src, dst); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return real.delete(f, recursive); + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + return real.listStatus(f); + } + + @Override + public void setWorkingDirectory(Path new_dir) { + real.setWorkingDirectory(new_dir); + } + + @Override + public Path getWorkingDirectory() { + return real.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return real.mkdirs(f, permission); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return real.getFileStatus(f); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 31717b61394..0b4e80bedb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -27,9 +27,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Method; import java.net.BindException; +import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,13 +53,16 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader; import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -714,6 +722,135 @@ public class TestWALFactory { } } + @Test + public void testReaderClosedOnBadCodec() throws IOException { + // Create our own Configuration and WALFactory to avoid breaking other test methods + Configuration confWithCodec = new Configuration(conf); + confWithCodec.setClass( + WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class); + WALFactory customFactory = new WALFactory(confWithCodec, null, currentTest.getMethodName()); + + // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by + // the FileSystem and know if close() was called on those InputStreams. + final List openedReaders = new ArrayList<>(); + FileSystemProxy proxyFs = new FileSystemProxy(fs) { + @Override + public FSDataInputStream open(Path p) throws IOException { + InputStreamProxy is = new InputStreamProxy(super.open(p)); + openedReaders.add(is); + return is; + } + + @Override + public FSDataInputStream open(Path p, int blockSize) throws IOException { + InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize)); + openedReaders.add(is); + return is; + } + }; + + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf(currentTest.getMethodName())); + htd.addFamily(new HColumnDescriptor(Bytes.toBytes("column"))); + + HRegionInfo hri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (HColumnDescriptor colDesc : htd.getColumnFamilies()) { + scopes.put(colDesc.getName(), 0); + } + byte[] row = Bytes.toBytes("row"); + WAL.Reader reader = null; + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); + try { + // Write one column in one edit. + WALEdit cols = new WALEdit(); + cols.add(new KeyValue(row, Bytes.toBytes("column"), + Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 })); + final WAL log = customFactory.getWAL( + hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); + final long txid = log.append(htd, hri, + new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), + mvcc), + cols, true); + // Sync the edit to the WAL + log.sync(txid); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); + log.completeCacheFlush(hri.getEncodedNameAsBytes()); + log.shutdown(); + + // Inject our failure, object is constructed via reflection. + BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true); + + // Now open a reader on the log which will throw an exception when + // we try to instantiate the custom Codec. + Path filename = DefaultWALProvider.getCurrentFileName(log); + try { + reader = customFactory.createReader(proxyFs, filename); + fail("Expected to see an exception when creating WAL reader"); + } catch (Exception e) { + // Expected that we get an exception + } + // We should have exactly one reader + assertEquals(1, openedReaders.size()); + // And that reader should be closed. + int numNotClosed = 0; + for (InputStreamProxy openedReader : openedReaders) { + if (!openedReader.isClosed.get()) { + numNotClosed++; + } + } + assertEquals("Should not find any open readers", 0, numNotClosed); + } finally { + if (reader != null) { + reader.close(); + } + } + } + + /** + * A proxy around FSDataInputStream which can report if close() was called. + */ + private static class InputStreamProxy extends FSDataInputStream { + private final InputStream real; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + public InputStreamProxy(InputStream real) { + super(real); + this.real = real; + } + + @Override + public void close() throws IOException { + isClosed.set(true); + real.close(); + } + } + + /** + * A custom WALCellCodec in which we can inject failure. + */ + public static class BrokenWALCellCodec extends WALCellCodec { + static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false); + + static void maybeInjectFailure() { + if (THROW_FAILURE_ON_INIT.get()) { + throw new RuntimeException("Injected instantiation exception"); + } + } + + public BrokenWALCellCodec() { + super(); + maybeInjectFailure(); + } + + public BrokenWALCellCodec(Configuration conf, CompressionContext compression) { + super(conf, compression); + maybeInjectFailure(); + } + } + static class DumbWALActionsListener extends WALActionsListener.Base { int increments = 0;