HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
454e45f50d
commit
c8d133186b
|
@ -41,6 +41,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CanUnbuffer;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -474,6 +475,11 @@ public class HFile {
|
|||
|
||||
@VisibleForTesting
|
||||
boolean prefetchComplete();
|
||||
|
||||
/**
|
||||
* To close only the stream's socket. HBASE-9393
|
||||
*/
|
||||
void unbufferStream();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -490,8 +496,8 @@ public class HFile {
|
|||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
|
||||
justification="Intentional")
|
||||
private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
|
||||
long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
|
||||
private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
|
||||
CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
|
||||
FixedFileTrailer trailer = null;
|
||||
try {
|
||||
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
|
||||
|
@ -513,6 +519,22 @@ public class HFile {
|
|||
LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
|
||||
}
|
||||
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
|
||||
} finally {
|
||||
unbufferStream(fsdis);
|
||||
}
|
||||
}
|
||||
|
||||
static void unbufferStream(FSDataInputStreamWrapper fsdis) {
|
||||
boolean useHBaseChecksum = fsdis.shouldUseHBaseChecksum();
|
||||
final FSDataInputStream stream = fsdis.getStream(useHBaseChecksum);
|
||||
if (stream != null && stream.getWrappedStream() instanceof CanUnbuffer) {
|
||||
// Enclosing unbuffer() in try-catch just to be on defensive side.
|
||||
try {
|
||||
stream.unbuffer();
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to unbuffer the stream so possibly there may be a TCP socket connection "
|
||||
+ "left open in CLOSE_WAIT state.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -541,7 +563,7 @@ public class HFile {
|
|||
} else {
|
||||
hfs = (HFileSystem)fs;
|
||||
}
|
||||
return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
|
||||
return openReader(path, fsdis, size, cacheConf, hfs, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -556,8 +578,8 @@ public class HFile {
|
|||
FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
|
||||
Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
|
||||
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
|
||||
return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
|
||||
cacheConf, stream.getHfs(), conf);
|
||||
return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf, stream.getHfs(),
|
||||
conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -567,7 +589,7 @@ public class HFile {
|
|||
FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
|
||||
throws IOException {
|
||||
FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
|
||||
return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
|
||||
return openReader(path, wrapper, size, cacheConf, null, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,10 +33,10 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
|
||||
|
@ -1311,6 +1311,11 @@ public class HFileBlock implements Cacheable {
|
|||
|
||||
void setIncludesMemstoreTS(boolean includesMemstoreTS);
|
||||
void setDataBlockEncoder(HFileDataBlockEncoder encoder);
|
||||
|
||||
/**
|
||||
* To close only the stream's socket. HBASE-9393
|
||||
*/
|
||||
void unbufferStream();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1758,6 +1763,19 @@ public class HFileBlock implements Cacheable {
|
|||
public String toString() {
|
||||
return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unbufferStream() {
|
||||
// To handle concurrent reads, ensure that no other client is accessing the streams while we
|
||||
// unbuffer it.
|
||||
if (streamLock.tryLock()) {
|
||||
try {
|
||||
HFile.unbufferStream(this.streamWrapper);
|
||||
} finally {
|
||||
streamLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -575,6 +575,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!pread) {
|
||||
// For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393
|
||||
reader.unbufferStream();
|
||||
}
|
||||
this.returnBlocks(true);
|
||||
}
|
||||
|
||||
|
@ -1898,4 +1902,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
public int getMajorVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unbufferStream() {
|
||||
fsBlockReader.unbufferStream();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue