HBASE-25712 Backport of HBASE-25692 to branch-1

HBASE-25692 ensures that we do not leak any InputStream (Socket)
which would otherwise remain as CLOSE_WAIT until the java process
exits. These orphaned sockets would eventually saturate Linux network
and file limits.

Closes #3104

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Josh Elser 2021-03-29 20:28:25 -04:00
parent 616c5bbec5
commit 7851df7927
3 changed files with 274 additions and 25 deletions

View File

@ -311,7 +311,7 @@ public class WALFactory {
reader.init(fs, path, conf, stream);
return reader;
}
} catch (IOException e) {
} catch (Exception e) {
if (stream != null) {
try {
stream.close();
@ -328,33 +328,39 @@ public class WALFactory {
LOG.debug("exception details", exception);
}
}
String msg = e.getMessage();
if (msg != null && (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block")
|| msg.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
if (e instanceof IOException) {
String msg = e.getMessage();
if (msg != null && (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block")
|| msg.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting)
+ "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
throw new LeaseNotRecoveredException(e);
} else {
throw e;
}
throw new LeaseNotRecoveredException(e);
} else {
throw e;
}
// Rethrow the original exception if we are not retrying due to HDFS-isms.
throw e;
}
}
} catch (IOException ie) {

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
/**
* Create a non-abstract "proxy" for FileSystem because FileSystem is an
* abstract class and not an interface. Only interfaces can be used with the
* Java Proxy class to override functionality via an InvocationHandler.
*
*/
public class FileSystemProxy extends FileSystem {
private final FileSystem real;
public FileSystemProxy(FileSystem real) {
this.real = real;
}
@Override
public FSDataInputStream open(Path p) throws IOException {
return real.open(p);
}
@Override
public URI getUri() {
return real.getUri();
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return real.open(f, bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return real.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
return real.append(f, bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return real.rename(src, dst);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return real.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
return real.listStatus(f);
}
@Override
public void setWorkingDirectory(Path new_dir) {
real.setWorkingDirectory(new_dir);
}
@Override
public Path getWorkingDirectory() {
return real.getWorkingDirectory();
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return real.mkdirs(f, permission);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return real.getFileStatus(f);
}
}

View File

@ -27,9 +27,14 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,13 +53,16 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -714,6 +722,135 @@ public class TestWALFactory {
}
}
@Test
public void testReaderClosedOnBadCodec() throws IOException {
// Create our own Configuration and WALFactory to avoid breaking other test methods
Configuration confWithCodec = new Configuration(conf);
confWithCodec.setClass(
WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class);
WALFactory customFactory = new WALFactory(confWithCodec, null, currentTest.getMethodName());
// Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
// the FileSystem and know if close() was called on those InputStreams.
final List<InputStreamProxy> openedReaders = new ArrayList<>();
FileSystemProxy proxyFs = new FileSystemProxy(fs) {
@Override
public FSDataInputStream open(Path p) throws IOException {
InputStreamProxy is = new InputStreamProxy(super.open(p));
openedReaders.add(is);
return is;
}
@Override
public FSDataInputStream open(Path p, int blockSize) throws IOException {
InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
openedReaders.add(is);
return is;
}
};
final HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf(currentTest.getMethodName()));
htd.addFamily(new HColumnDescriptor(Bytes.toBytes("column")));
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor colDesc : htd.getColumnFamilies()) {
scopes.put(colDesc.getName(), 0);
}
byte[] row = Bytes.toBytes("row");
WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
try {
// Write one column in one edit.
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 }));
final WAL log = customFactory.getWAL(
hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
final long txid = log.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
mvcc),
cols, true);
// Sync the edit to the WAL
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.shutdown();
// Inject our failure, object is constructed via reflection.
BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
// Now open a reader on the log which will throw an exception when
// we try to instantiate the custom Codec.
Path filename = DefaultWALProvider.getCurrentFileName(log);
try {
reader = customFactory.createReader(proxyFs, filename);
fail("Expected to see an exception when creating WAL reader");
} catch (Exception e) {
// Expected that we get an exception
}
// We should have exactly one reader
assertEquals(1, openedReaders.size());
// And that reader should be closed.
int numNotClosed = 0;
for (InputStreamProxy openedReader : openedReaders) {
if (!openedReader.isClosed.get()) {
numNotClosed++;
}
}
assertEquals("Should not find any open readers", 0, numNotClosed);
} finally {
if (reader != null) {
reader.close();
}
}
}
/**
* A proxy around FSDataInputStream which can report if close() was called.
*/
private static class InputStreamProxy extends FSDataInputStream {
private final InputStream real;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public InputStreamProxy(InputStream real) {
super(real);
this.real = real;
}
@Override
public void close() throws IOException {
isClosed.set(true);
real.close();
}
}
/**
* A custom WALCellCodec in which we can inject failure.
*/
public static class BrokenWALCellCodec extends WALCellCodec {
static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
static void maybeInjectFailure() {
if (THROW_FAILURE_ON_INIT.get()) {
throw new RuntimeException("Injected instantiation exception");
}
}
public BrokenWALCellCodec() {
super();
maybeInjectFailure();
}
public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
super(conf, compression);
maybeInjectFailure();
}
}
static class DumbWALActionsListener extends WALActionsListener.Base {
int increments = 0;