diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index b5286628df1..b84f1be1c4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -329,7 +329,9 @@ public class WALFactory { reader = lrClass.getDeclaredConstructor().newInstance(); reader.init(fs, path, conf, null); return reader; - } catch (IOException e) { + } catch (Exception e) { + // catch Exception so that we close reader for all exceptions. If we don't + // close the reader, we leak a socket. if (reader != null) { try { reader.close(); @@ -339,34 +341,38 @@ public class WALFactory { } } - String msg = e.getMessage(); - if (msg != null - && (msg.contains("Cannot obtain block length") - || msg.contains("Could not obtain the last block") || msg - .matches("Blocklist for [^ ]* has changed.*"))) { - if (++nbAttempt == 1) { - LOG.warn("Lease should have recovered. This is not expected. Will retry", e); - } - if (reporter != null && !reporter.progress()) { - throw new InterruptedIOException("Operation is cancelled"); - } - if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.error("Can't open after " + nbAttempt + " attempts and " - + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); - } else { - try { - Thread.sleep(nbAttempt < 3 ? 500 : 1000); - continue; // retry - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; + // Only inspect the Exception to consider retry when it's an IOException + if (e instanceof IOException) { + String msg = e.getMessage(); + if (msg != null + && (msg.contains("Cannot obtain block length") + || msg.contains("Could not obtain the last block") || msg + .matches("Blocklist for [^ ]* has changed.*"))) { + if (++nbAttempt == 1) { + LOG.warn("Lease should have recovered. This is not expected. Will retry", e); } + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } + if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.error("Can't open after " + nbAttempt + " attempts and " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); + } else { + try { + Thread.sleep(nbAttempt < 3 ? 500 : 1000); + continue; // retry + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + throw new LeaseNotRecoveredException(e); } - throw new LeaseNotRecoveredException(e); - } else { - throw e; } + + // Rethrow the original exception if we are not retrying due to HDFS-isms. + throw e; } } } catch (IOException ie) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java new file mode 100644 index 00000000000..fb729f55bef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +/** + * Create a non-abstract "proxy" for FileSystem because FileSystem is an + * abstract class and not an interface. Only interfaces can be used with the + * Java Proxy class to override functionality via an InvocationHandler. + * + */ +public class FileSystemProxy extends FileSystem { + private final FileSystem real; + + public FileSystemProxy(FileSystem real) { + this.real = real; + } + + @Override + public FSDataInputStream open(Path p) throws IOException { + return real.open(p); + } + + @Override + public URI getUri() { + return real.getUri(); + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return real.open(f, bufferSize); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) throws IOException { + return real.create(f, permission, overwrite, bufferSize, replication, blockSize, progress); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + return real.append(f, bufferSize, progress); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return real.rename(src, dst); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return real.delete(f, recursive); + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + return real.listStatus(f); + } + + @Override + public void setWorkingDirectory(Path new_dir) { + real.setWorkingDirectory(new_dir); + } + + @Override + public Path getWorkingDirectory() { + return real.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return real.mkdirs(f, permission); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return real.getFileStatus(f); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 74ab840fd95..f1ac464ec25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -26,11 +26,16 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Method; import java.net.BindException; +import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -51,10 +56,13 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -749,4 +757,125 @@ public class TestWALFactory { WALProvider metaWALProvider = walFactory.getMetaProvider(); assertEquals(IOTestProvider.class, metaWALProvider.getClass()); } + + @Test + public void testReaderClosedOnBadCodec() throws IOException { + // Create our own Configuration and WALFactory to avoid breaking other test methods + Configuration confWithCodec = new Configuration(conf); + confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class); + WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString()); + + // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by + // the FileSystem and know if close() was called on those InputStreams. + List openedReaders = new ArrayList<>(); + FileSystemProxy proxyFs = new FileSystemProxy(fs) { + @Override + public FSDataInputStream open(Path p) throws IOException { + InputStreamProxy is = new InputStreamProxy(super.open(p)); + openedReaders.add(is); + return is; + } + + @Override + public FSDataInputStream open(Path p, int blockSize) throws IOException { + InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize)); + openedReaders.add(is); + return is; + } + }; + + final TableDescriptor htd = + TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); + final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd.getColumnFamilyNames()) { + scopes.put(fam, 0); + } + byte[] row = Bytes.toBytes("row"); + WAL.Reader reader = null; + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); + try { + // Write one column in one edit. + WALEdit cols = new WALEdit(); + cols.add(new KeyValue(row, Bytes.toBytes("column"), + Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 })); + final WAL log = customFactory.getWAL(hri); + final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); + // Sync the edit to the WAL + log.sync(txid); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); + log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); + log.shutdown(); + + // Inject our failure, object is constructed via reflection. + BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true); + + // Now open a reader on the log which will throw an exception when + // we try to instantiate the custom Codec. + Path filename = AbstractFSWALProvider.getCurrentFileName(log); + try { + reader = customFactory.createReader(proxyFs, filename); + fail("Expected to see an exception when creating WAL reader"); + } catch (Exception e) { + // Expected that we get an exception + } + // We should have exactly one reader + assertEquals(1, openedReaders.size()); + // And that reader should be closed. + long unclosedReaders = openedReaders.stream() + .filter((r) -> !r.isClosed.get()) + .collect(Collectors.counting()); + assertEquals("Should not find any open readers", 0, (int) unclosedReaders); + } finally { + if (reader != null) { + reader.close(); + } + } + } + + /** + * A proxy around FSDataInputStream which can report if close() was called. + */ + private static class InputStreamProxy extends FSDataInputStream { + private final InputStream real; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + public InputStreamProxy(InputStream real) { + super(real); + this.real = real; + } + + @Override + public void close() throws IOException { + isClosed.set(true); + real.close(); + } + } + + /** + * A custom WALCellCodec in which we can inject failure. + */ + @SuppressWarnings("unused") + private static class BrokenWALCellCodec extends WALCellCodec { + static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false); + + static void maybeInjectFailure() { + if (THROW_FAILURE_ON_INIT.get()) { + throw new RuntimeException("Injected instantiation exception"); + } + } + + public BrokenWALCellCodec() { + super(); + maybeInjectFailure(); + } + + public BrokenWALCellCodec(Configuration conf, CompressionContext compression) { + super(conf, compression); + maybeInjectFailure(); + } + } }