HDFS-7180. NFSv3 gateway frequently gets stuck due to GC. Contributed by Brandon Li

(cherry picked from commit d71d40a63d)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Brandon Li 2014-10-22 21:27:01 -07:00
parent 36515435ff
commit d27cd5ad77
6 changed files with 73 additions and 50 deletions

View File

@ -228,7 +228,7 @@ class DFSClientCache {
RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) { RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) {
try { try {
notification.getValue().close(); notification.getValue().close();
} catch (IOException e) { } catch (IOException ignored) {
} }
} }
}; };

View File

@ -175,7 +175,10 @@ class OpenFileCtx {
private volatile boolean enabledDump; private volatile boolean enabledDump;
private FileOutputStream dumpOut; private FileOutputStream dumpOut;
/** Tracks the data buffered in memory related to non sequential writes */
private AtomicLong nonSequentialWriteInMemory; private AtomicLong nonSequentialWriteInMemory;
private RandomAccessFile raf; private RandomAccessFile raf;
private final String dumpFilePath; private final String dumpFilePath;
private Daemon dumpThread; private Daemon dumpThread;
@ -205,7 +208,7 @@ class OpenFileCtx {
return (pendingWrites.size() != 0 || pendingCommits.size() != 0); return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
} }
// Increase or decrease the memory occupation of non-sequential writes /** Increase or decrease the memory occupation of non-sequential writes */
private long updateNonSequentialWriteInMemory(long count) { private long updateNonSequentialWriteInMemory(long count) {
long newValue = nonSequentialWriteInMemory.addAndGet(count); long newValue = nonSequentialWriteInMemory.addAndGet(count);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -214,8 +217,8 @@ class OpenFileCtx {
} }
Preconditions.checkState(newValue >= 0, Preconditions.checkState(newValue >= 0,
"nonSequentialWriteInMemory is negative after update with count " "nonSequentialWriteInMemory is negative " + newValue
+ count); + " after update with count " + count);
return newValue; return newValue;
} }
@ -248,7 +251,7 @@ class OpenFileCtx {
nonSequentialWriteInMemory = new AtomicLong(0); nonSequentialWriteInMemory = new AtomicLong(0);
this.dumpFilePath = dumpFilePath; this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true; enabledDump = dumpFilePath != null;
nextOffset = new AtomicLong(); nextOffset = new AtomicLong();
nextOffset.set(latestAttr.getSize()); nextOffset.set(latestAttr.getSize());
try { try {
@ -271,7 +274,7 @@ class OpenFileCtx {
} }
// Check if need to dump the new writes // Check if need to dump the new writes
private void checkDump() { private void waitForDump() {
if (!enabledDump) { if (!enabledDump) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Do nothing, dump is disabled."); LOG.debug("Do nothing, dump is disabled.");
@ -296,6 +299,14 @@ class OpenFileCtx {
this.notifyAll(); this.notifyAll();
} }
} }
while (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
try {
this.wait();
} catch (InterruptedException ignored) {
}
}
} }
} }
@ -382,6 +393,7 @@ class OpenFileCtx {
} }
synchronized (OpenFileCtx.this) { synchronized (OpenFileCtx.this) {
if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
OpenFileCtx.this.notifyAll();
try { try {
OpenFileCtx.this.wait(); OpenFileCtx.this.wait();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -398,8 +410,13 @@ class OpenFileCtx {
+ " enabledDump: " + enabledDump); + " enabledDump: " + enabledDump);
} }
} catch (Throwable t) { } catch (Throwable t) {
// unblock threads with new request
synchronized (OpenFileCtx.this) {
OpenFileCtx.this.notifyAll();
}
LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: " LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
+ OpenFileCtx.this.dumpFilePath, t); + OpenFileCtx.this.dumpFilePath, t);
activeState = false;
} }
} }
} }
@ -563,10 +580,15 @@ class OpenFileCtx {
// check if there is a WriteCtx with the same range in pendingWrites // check if there is a WriteCtx with the same range in pendingWrites
WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
if (oldWriteCtx == null) { if (oldWriteCtx == null) {
addWrite(writeCtx); pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
if (LOG.isDebugEnabled()) {
LOG.debug("New write buffered with xid " + xid + " nextOffset "
+ cachedOffset + " req offset=" + offset + " mapsize="
+ pendingWrites.size());
}
} else { } else {
LOG.warn("Got a repeated request, same range, with xid:" LOG.warn("Got a repeated request, same range, with xid:" + xid
+ writeCtx.getXid()); + " nextOffset " + +cachedOffset + " req offset=" + offset);
} }
return writeCtx; return writeCtx;
} }
@ -648,7 +670,7 @@ class OpenFileCtx {
boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
if (!startWriting) { if (!startWriting) {
// offset > nextOffset. check if we need to dump data // offset > nextOffset. check if we need to dump data
checkDump(); waitForDump();
// In test, noticed some Linux client sends a batch (e.g., 1MB) // In test, noticed some Linux client sends a batch (e.g., 1MB)
// of reordered writes and won't send more writes until it gets // of reordered writes and won't send more writes until it gets
@ -683,7 +705,7 @@ class OpenFileCtx {
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
long offset, int count, WriteStableHow stableHow, byte[] data, long offset, int count, WriteStableHow stableHow, byte[] data,
String path, WccData wccData, IdUserGroup iug) { String path, WccData wccData, IdUserGroup iug) {
WRITE3Response response = null; WRITE3Response response;
// Read the content back // Read the content back
byte[] readbuffer = new byte[count]; byte[] readbuffer = new byte[count];
@ -890,13 +912,6 @@ class OpenFileCtx {
return COMMIT_STATUS.COMMIT_WAIT; return COMMIT_STATUS.COMMIT_WAIT;
} }
private void addWrite(WriteCtx writeCtx) {
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
// For the offset range (min, max), min is inclusive, and max is exclusive
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
}
/** /**
* Check stream status to decide if it should be closed * Check stream status to decide if it should be closed
* @return true, remove stream; false, keep stream * @return true, remove stream; false, keep stream
@ -1191,7 +1206,7 @@ class OpenFileCtx {
dumpThread.interrupt(); dumpThread.interrupt();
try { try {
dumpThread.join(3000); dumpThread.join(3000);
} catch (InterruptedException e) { } catch (InterruptedException ignored) {
} }
} }

View File

@ -220,12 +220,12 @@ class OpenFileCtxCache {
void shutdown() { void shutdown() {
// stop the dump thread // stop the dump thread
if (streamMonitor != null && streamMonitor.isAlive()) { if (streamMonitor.isAlive()) {
streamMonitor.shouldRun(false); streamMonitor.shouldRun(false);
streamMonitor.interrupt(); streamMonitor.interrupt();
try { try {
streamMonitor.join(3000); streamMonitor.join(3000);
} catch (InterruptedException e) { } catch (InterruptedException ignored) {
} }
} }

View File

@ -282,7 +282,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
GETATTR3Request request = null; GETATTR3Request request;
try { try {
request = GETATTR3Request.deserialize(xdr); request = GETATTR3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -374,7 +374,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
SETATTR3Request request = null; SETATTR3Request request;
try { try {
request = SETATTR3Request.deserialize(xdr); request = SETATTR3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -459,7 +459,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
LOOKUP3Request request = null; LOOKUP3Request request;
try { try {
request = LOOKUP3Request.deserialize(xdr); request = LOOKUP3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -527,7 +527,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
ACCESS3Request request = null; ACCESS3Request request;
try { try {
request = ACCESS3Request.deserialize(xdr); request = ACCESS3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -594,7 +594,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
READLINK3Request request = null; READLINK3Request request;
try { try {
request = READLINK3Request.deserialize(xdr); request = READLINK3Request.deserialize(xdr);
@ -668,7 +668,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
READ3Request request = null; READ3Request request;
try { try {
request = READ3Request.deserialize(xdr); request = READ3Request.deserialize(xdr);
@ -710,7 +710,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
securityHandler.getUid(), securityHandler.getGid(), securityHandler.getUid(), securityHandler.getGid(),
securityHandler.getAuxGids(), attrs); securityHandler.getAuxGids(), attrs);
if ((access & Nfs3Constant.ACCESS3_READ) != 0) { if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
eof = offset < attrs.getSize() ? false : true; eof = offset >= attrs.getSize();
return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof, return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
ByteBuffer.wrap(new byte[0])); ByteBuffer.wrap(new byte[0]));
} else { } else {
@ -749,7 +749,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} catch (IOException e) { } catch (IOException e) {
// TODO: A cleaner way is to throw a new type of exception // TODO: A cleaner way is to throw a new type of exception
// which requires incompatible changes. // which requires incompatible changes.
if (e.getMessage() == "Stream closed") { if (e.getMessage().equals("Stream closed")) {
clientCache.invalidateDfsInputStream(userName, clientCache.invalidateDfsInputStream(userName,
Nfs3Utils.getFileIdPath(handle)); Nfs3Utils.getFileIdPath(handle));
continue; continue;
@ -769,7 +769,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
if (readCount < 0) { if (readCount < 0) {
readCount = 0; readCount = 0;
} }
eof = (offset + readCount) < attrs.getSize() ? false : true; eof = (offset + readCount) >= attrs.getSize();
return new READ3Response(Nfs3Status.NFS3_OK, attrs, readCount, eof, return new READ3Response(Nfs3Status.NFS3_OK, attrs, readCount, eof,
ByteBuffer.wrap(readbuffer)); ByteBuffer.wrap(readbuffer));
@ -801,7 +801,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
WRITE3Request request = null; WRITE3Request request;
try { try {
request = WRITE3Request.deserialize(xdr); request = WRITE3Request.deserialize(xdr);
@ -883,7 +883,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
CREATE3Request request = null; CREATE3Request request;
try { try {
request = CREATE3Request.deserialize(xdr); request = CREATE3Request.deserialize(xdr);
@ -1017,7 +1017,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
MKDIR3Request request = null; MKDIR3Request request;
try { try {
request = MKDIR3Request.deserialize(xdr); request = MKDIR3Request.deserialize(xdr);
@ -1114,7 +1114,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
REMOVE3Request request = null; REMOVE3Request request;
try { try {
request = REMOVE3Request.deserialize(xdr); request = REMOVE3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1194,7 +1194,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
RMDIR3Request request = null; RMDIR3Request request;
try { try {
request = RMDIR3Request.deserialize(xdr); request = RMDIR3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1375,7 +1375,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
SYMLINK3Request request = null; SYMLINK3Request request;
try { try {
request = SYMLINK3Request.deserialize(xdr); request = SYMLINK3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1431,7 +1431,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
*/ */
private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath, private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
byte[] startAfter) throws IOException { byte[] startAfter) throws IOException {
DirectoryListing dlisting = null; DirectoryListing dlisting;
try { try {
dlisting = dfsClient.listPaths(dirFileIdPath, startAfter); dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
} catch (RemoteException e) { } catch (RemoteException e) {
@ -1468,7 +1468,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
READDIR3Request request = null; READDIR3Request request;
try { try {
request = READDIR3Request.deserialize(xdr); request = READDIR3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1492,9 +1492,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
+ cookie + " count: " + count); + cookie + " count: " + count);
} }
HdfsFileStatus dirStatus = null; HdfsFileStatus dirStatus;
DirectoryListing dlisting = null; DirectoryListing dlisting;
Nfs3FileAttributes postOpAttr = null; Nfs3FileAttributes postOpAttr;
long dotdotFileId = 0; long dotdotFileId = 0;
try { try {
String dirFileIdPath = Nfs3Utils.getFileIdPath(handle); String dirFileIdPath = Nfs3Utils.getFileIdPath(handle);
@ -1657,8 +1657,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
HdfsFileStatus dirStatus; HdfsFileStatus dirStatus;
DirectoryListing dlisting = null; DirectoryListing dlisting;
Nfs3FileAttributes postOpDirAttr = null; Nfs3FileAttributes postOpDirAttr;
long dotdotFileId = 0; long dotdotFileId = 0;
HdfsFileStatus dotdotStatus = null; HdfsFileStatus dotdotStatus = null;
try { try {
@ -1803,7 +1803,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
FSSTAT3Request request = null; FSSTAT3Request request;
try { try {
request = FSSTAT3Request.deserialize(xdr); request = FSSTAT3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1877,7 +1877,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
FSINFO3Request request = null; FSINFO3Request request;
try { try {
request = FSINFO3Request.deserialize(xdr); request = FSINFO3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1941,7 +1941,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
PATHCONF3Request request = null; PATHCONF3Request request;
try { try {
request = PATHCONF3Request.deserialize(xdr); request = PATHCONF3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {
@ -1992,7 +1992,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
COMMIT3Request request = null; COMMIT3Request request;
try { try {
request = COMMIT3Request.deserialize(xdr); request = COMMIT3Request.deserialize(xdr);
} catch (IOException e) { } catch (IOException e) {

View File

@ -47,14 +47,20 @@ class WriteCtx {
public static enum DataState { public static enum DataState {
ALLOW_DUMP, ALLOW_DUMP,
NO_DUMP, NO_DUMP,
DUMPED; DUMPED
} }
private final FileHandle handle; private final FileHandle handle;
private final long offset; private final long offset;
private final int count; private final int count;
//Only needed for overlapped write, referring OpenFileCtx.addWritesToCache() /**
* Some clients can send a write that includes previously written data along
* with new data. In such case the write request is changed to write from only
* the new data. {@code originalCount} tracks the number of bytes sent in the
* request before it was modified to write only the new data.
* @see OpenFileCtx#addWritesToCache for more details
*/
private final int originalCount; private final int originalCount;
public static final int INVALID_ORIGINAL_COUNT = -1; public static final int INVALID_ORIGINAL_COUNT = -1;
@ -173,7 +179,7 @@ class WriteCtx {
public void writeData(HdfsDataOutputStream fos) throws IOException { public void writeData(HdfsDataOutputStream fos) throws IOException {
Preconditions.checkState(fos != null); Preconditions.checkState(fos != null);
ByteBuffer dataBuffer = null; ByteBuffer dataBuffer;
try { try {
dataBuffer = getData(); dataBuffer = getData();
} catch (Exception e1) { } catch (Exception e1) {

View File

@ -646,6 +646,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7215.Add JvmPauseMonitor to NFS gateway (brandonli) HDFS-7215.Add JvmPauseMonitor to NFS gateway (brandonli)
HDFS-7180. NFSv3 gateway frequently gets stuck due to GC (brandonli)
BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal) HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)