HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Ashish Singhi 2017-06-06 12:50:59 +05:30 committed by Andrew Purtell
parent ee0f148c73
commit 1950acc67a
4 changed files with 121 additions and 6 deletions

View File

@ -19,8 +19,13 @@ package org.apache.hadoop.hbase.io;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -36,6 +41,8 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
public class FSDataInputStreamWrapper implements Closeable {
private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
private final HFileSystem hfs;
private final Path path;
private final FileLink link;
@ -80,6 +87,11 @@ public class FSDataInputStreamWrapper implements Closeable {
// reads without hbase checksum verification.
private volatile int hbaseChecksumOffCount = -1;
private Boolean instanceOfCanUnbuffer = null;
// Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
private Method unbuffer = null;
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, path, false, -1L);
}
@ -232,4 +244,61 @@ public class FSDataInputStreamWrapper implements Closeable {
public HFileSystem getHfs() {
return this.hfs;
}
}
/**
* This will free sockets and file descriptors held by the stream only when the stream implements
* org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
* using this stream to read the blocks have finished reading. If by chance the stream is
* unbuffered and there are clients still holding this stream for read then on next client read
* request a new socket will be opened by Datanode without client knowing about it and will serve
* its read request. Note: If this socket is idle for some time then the DataNode will close the
* socket and the socket will move into CLOSE_WAIT state and on the next client request on this
* stream, the current socket will be closed and a new socket will be opened to serve the
* requests.
*/
@SuppressWarnings({ "rawtypes" })
public void unbuffer() {
FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
if (stream != null) {
InputStream wrappedStream = stream.getWrappedStream();
// CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
// 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
// CanUnbuffer interface or not and based on that call the unbuffer api.
final Class<? extends InputStream> streamClass = wrappedStream.getClass();
if (this.instanceOfCanUnbuffer == null) {
// To ensure we compute whether the stream is instance of CanUnbuffer only once.
this.instanceOfCanUnbuffer = false;
Class<?>[] streamInterfaces = streamClass.getInterfaces();
for (Class c : streamInterfaces) {
if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
try {
this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
} catch (NoSuchMethodException | SecurityException e) {
LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+ " . So there may be a TCP socket connection "
+ "left open in CLOSE_WAIT state.",
e);
return;
}
this.instanceOfCanUnbuffer = true;
break;
}
}
}
if (this.instanceOfCanUnbuffer) {
try {
this.unbuffer.invoke(wrappedStream);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
LOG.warn("Failed to invoke 'unbuffer' method in class " + streamClass
+ " . So there may be a TCP socket connection left open in CLOSE_WAIT state.",
e);
}
} else {
LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+ " . So there may be a TCP socket connection "
+ "left open in CLOSE_WAIT state. For more details check "
+ "https://issues.apache.org/jira/browse/HBASE-9393");
}
}
}
}

View File

@ -499,6 +499,12 @@ public class HFile {
@VisibleForTesting
boolean prefetchComplete();
/**
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
* implementation should take care of thread safety.
*/
void unbufferStream();
}
/**
@ -516,7 +522,7 @@ public class HFile {
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
justification="Intentional")
private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, long size,
private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf)
throws IOException {
FixedFileTrailer trailer = null;
@ -537,10 +543,15 @@ public class HFile {
} catch (Throwable t) {
IOUtils.closeQuietly(fsdis);
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
} finally {
fsdis.unbuffer();
}
}
/**
* The sockets and the file descriptors held by the method parameter
* {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
* that no other threads have access to the same passed reference.
* @param fs A file system
* @param path Path to HFile
* @param fsdis a stream of path's file
@ -565,7 +576,7 @@ public class HFile {
} else {
hfs = (HFileSystem) fs;
}
return pickReaderVersion(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
return openReader(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
}
/**
@ -597,18 +608,21 @@ public class HFile {
boolean primaryReplicaReader, Configuration conf) throws IOException {
Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
stream.getHfs(), primaryReplicaReader, conf);
}
/**
* This factory method is used only by unit tests
* This factory method is used only by unit tests. <br/>
* The sockets and the file descriptors held by the method parameter
* {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
* that no other threads have access to the same passed reference.
*/
@VisibleForTesting
static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size,
CacheConfig cacheConf, Configuration conf) throws IOException {
FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
return pickReaderVersion(path, wrapper, size, cacheConf, null, true, conf);
return openReader(path, wrapper, size, cacheConf, null, true, conf);
}
/**

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1391,6 +1393,12 @@ public class HFileBlock implements Cacheable {
void setIncludesMemstoreTS(boolean includesMemstoreTS);
void setDataBlockEncoder(HFileDataBlockEncoder encoder);
/**
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
* implementation should take care of thread safety.
*/
void unbufferStream();
}
/**
@ -1449,6 +1457,8 @@ public class HFileBlock implements Cacheable {
// Cache the fileName
private String pathName;
private final Lock streamLock = new ReentrantLock();
FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
HFileContext fileContext) throws IOException {
this.fileSize = fileSize;
@ -1847,6 +1857,19 @@ public class HFileBlock implements Cacheable {
streamWrapper.close();
}
@Override
public void unbufferStream() {
// To handle concurrent reads, ensure that no other client is accessing the streams while we
// unbuffer it.
if (streamLock.tryLock()) {
try {
this.streamWrapper.unbuffer();
} finally {
streamLock.unlock();
}
}
}
@Override
public String toString() {
return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;

View File

@ -588,6 +588,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public void close() {
if (!pread) {
// For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393
reader.unbufferStream();
}
this.returnBlocks(true);
}
@ -1858,4 +1862,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
public int getMajorVersion() {
return 3;
}
@Override
public void unbufferStream() {
fsBlockReader.unbufferStream();
}
}