HBASE-25692 Always try to close the WAL reader when we catch any exception (#3090)

There are code paths in which we throw non-IOExceptions when
initializing a WAL reader. However, we only close the InputStream to the
WAL filesystem when the exception is an IOException. Close it if it is
open in all cases.

Co-authored-by: Josh Elser <jelser@cloudera.com>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Josh Elser 2021-03-29 15:15:58 -04:00 committed by stack
parent c1bda19694
commit 92f7be5da7
3 changed files with 266 additions and 26 deletions

View File

@ -329,7 +329,9 @@ public class WALFactory {
reader = lrClass.getDeclaredConstructor().newInstance();
reader.init(fs, path, conf, null);
return reader;
} catch (IOException e) {
} catch (Exception e) {
// catch Exception so that we close reader for all exceptions. If we don't
// close the reader, we leak a socket.
if (reader != null) {
try {
reader.close();
@ -339,34 +341,38 @@ public class WALFactory {
}
}
String msg = e.getMessage();
if (msg != null
&& (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block") || msg
.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
// Only inspect the Exception to consider retry when it's an IOException
if (e instanceof IOException) {
String msg = e.getMessage();
if (msg != null
&& (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block") || msg
.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
throw new LeaseNotRecoveredException(e);
}
throw new LeaseNotRecoveredException(e);
} else {
throw e;
}
// Rethrow the original exception if we are not retrying due to HDFS-isms.
throw e;
}
}
} catch (IOException ie) {

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
/**
* Create a non-abstract "proxy" for FileSystem because FileSystem is an
* abstract class and not an interface. Only interfaces can be used with the
* Java Proxy class to override functionality via an InvocationHandler.
*
*/
public class FileSystemProxy extends FileSystem {
private final FileSystem real;
public FileSystemProxy(FileSystem real) {
this.real = real;
}
@Override
public FSDataInputStream open(Path p) throws IOException {
return real.open(p);
}
@Override
public URI getUri() {
return real.getUri();
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return real.open(f, bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
return real.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
return real.append(f, bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return real.rename(src, dst);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return real.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
return real.listStatus(f);
}
@Override
public void setWorkingDirectory(Path new_dir) {
real.setWorkingDirectory(new_dir);
}
@Override
public Path getWorkingDirectory() {
return real.getWorkingDirectory();
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return real.mkdirs(f, permission);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return real.getFileStatus(f);
}
}

View File

@ -26,11 +26,16 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -51,10 +56,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -749,4 +757,125 @@ public class TestWALFactory {
WALProvider metaWALProvider = walFactory.getMetaProvider();
assertEquals(IOTestProvider.class, metaWALProvider.getClass());
}
@Test
public void testReaderClosedOnBadCodec() throws IOException {
// Create our own Configuration and WALFactory to avoid breaking other test methods
Configuration confWithCodec = new Configuration(conf);
confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class);
WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
// Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
// the FileSystem and know if close() was called on those InputStreams.
List<InputStreamProxy> openedReaders = new ArrayList<>();
FileSystemProxy proxyFs = new FileSystemProxy(fs) {
@Override
public FSDataInputStream open(Path p) throws IOException {
InputStreamProxy is = new InputStreamProxy(super.open(p));
openedReaders.add(is);
return is;
}
@Override
public FSDataInputStream open(Path p, int blockSize) throws IOException {
InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
openedReaders.add(is);
return is;
}
};
final TableDescriptor htd =
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
byte[] row = Bytes.toBytes("row");
WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
try {
// Write one column in one edit.
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 }));
final WAL log = customFactory.getWAL(hri);
final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
// Sync the edit to the WAL
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.shutdown();
// Inject our failure, object is constructed via reflection.
BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
// Now open a reader on the log which will throw an exception when
// we try to instantiate the custom Codec.
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
try {
reader = customFactory.createReader(proxyFs, filename);
fail("Expected to see an exception when creating WAL reader");
} catch (Exception e) {
// Expected that we get an exception
}
// We should have exactly one reader
assertEquals(1, openedReaders.size());
// And that reader should be closed.
long unclosedReaders = openedReaders.stream()
.filter((r) -> !r.isClosed.get())
.collect(Collectors.counting());
assertEquals("Should not find any open readers", 0, (int) unclosedReaders);
} finally {
if (reader != null) {
reader.close();
}
}
}
/**
* A proxy around FSDataInputStream which can report if close() was called.
*/
private static class InputStreamProxy extends FSDataInputStream {
private final InputStream real;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public InputStreamProxy(InputStream real) {
super(real);
this.real = real;
}
@Override
public void close() throws IOException {
isClosed.set(true);
real.close();
}
}
/**
* A custom WALCellCodec in which we can inject failure.
*/
@SuppressWarnings("unused")
private static class BrokenWALCellCodec extends WALCellCodec {
static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
static void maybeInjectFailure() {
if (THROW_FAILURE_ON_INIT.get()) {
throw new RuntimeException("Injected instantiation exception");
}
}
public BrokenWALCellCodec() {
super();
maybeInjectFailure();
}
public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
super(conf, compression);
maybeInjectFailure();
}
}
}