diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index aad20e0bec3..79072f40494 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -228,7 +228,7 @@ class DFSClientCache { RemovalNotification notification) { try { notification.getValue().close(); - } catch (IOException e) { + } catch (IOException ignored) { } } }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 5e5818786b8..b48a0153f89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -175,7 +175,10 @@ class OpenFileCtx { private volatile boolean enabledDump; private FileOutputStream dumpOut; + + /** Tracks the data buffered in memory related to non sequential writes */ private AtomicLong nonSequentialWriteInMemory; + private RandomAccessFile raf; private final String dumpFilePath; private Daemon dumpThread; @@ -205,7 +208,7 @@ class OpenFileCtx { return (pendingWrites.size() != 0 || pendingCommits.size() != 0); } - // Increase or decrease the memory occupation of non-sequential writes + /** Increase or decrease the memory occupation of non-sequential writes */ private long updateNonSequentialWriteInMemory(long count) { long newValue = nonSequentialWriteInMemory.addAndGet(count); if (LOG.isDebugEnabled()) { @@ -214,8 +217,8 @@ class OpenFileCtx { } Preconditions.checkState(newValue >= 0, - "nonSequentialWriteInMemory is negative after update with count " - + count); + "nonSequentialWriteInMemory is negative " + newValue + + " after update with count " + count); return newValue; } @@ -248,7 +251,7 @@ class OpenFileCtx { nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; - enabledDump = dumpFilePath == null ? false: true; + enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); try { @@ -271,7 +274,7 @@ class OpenFileCtx { } // Check if need to dump the new writes - private void checkDump() { + private void waitForDump() { if (!enabledDump) { if (LOG.isDebugEnabled()) { LOG.debug("Do nothing, dump is disabled."); @@ -296,6 +299,14 @@ class OpenFileCtx { this.notifyAll(); } } + + while (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + try { + this.wait(); + } catch (InterruptedException ignored) { + } + } + } } @@ -382,6 +393,7 @@ class OpenFileCtx { } synchronized (OpenFileCtx.this) { if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { + OpenFileCtx.this.notifyAll(); try { OpenFileCtx.this.wait(); if (LOG.isDebugEnabled()) { @@ -398,8 +410,13 @@ class OpenFileCtx { + " enabledDump: " + enabledDump); } } catch (Throwable t) { + // unblock threads with new request + synchronized (OpenFileCtx.this) { + OpenFileCtx.this.notifyAll(); + } LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: " + OpenFileCtx.this.dumpFilePath, t); + activeState = false; } } } @@ -563,10 +580,15 @@ class OpenFileCtx { // check if there is a WriteCtx with the same range in pendingWrites WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); if (oldWriteCtx == null) { - addWrite(writeCtx); + pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); + if (LOG.isDebugEnabled()) { + LOG.debug("New write buffered with xid " + xid + " nextOffset " + + cachedOffset + " req offset=" + offset + " mapsize=" + + pendingWrites.size()); + } } else { - LOG.warn("Got a repeated request, same range, with xid:" - + writeCtx.getXid()); + LOG.warn("Got a repeated request, same range, with xid:" + xid + + " nextOffset " + +cachedOffset + " req offset=" + offset); } return writeCtx; } @@ -648,7 +670,7 @@ class OpenFileCtx { boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); if (!startWriting) { // offset > nextOffset. check if we need to dump data - checkDump(); + waitForDump(); // In test, noticed some Linux client sends a batch (e.g., 1MB) // of reordered writes and won't send more writes until it gets @@ -683,7 +705,7 @@ class OpenFileCtx { private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, long offset, int count, WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdUserGroup iug) { - WRITE3Response response = null; + WRITE3Response response; // Read the content back byte[] readbuffer = new byte[count]; @@ -890,13 +912,6 @@ class OpenFileCtx { return COMMIT_STATUS.COMMIT_WAIT; } - private void addWrite(WriteCtx writeCtx) { - long offset = writeCtx.getOffset(); - int count = writeCtx.getCount(); - // For the offset range (min, max), min is inclusive, and max is exclusive - pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); - } - /** * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream @@ -1191,7 +1206,7 @@ class OpenFileCtx { dumpThread.interrupt(); try { dumpThread.join(3000); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java index 01c519c3d18..7bf93ada901 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java @@ -220,12 +220,12 @@ class OpenFileCtxCache { void shutdown() { // stop the dump thread - if (streamMonitor != null && streamMonitor.isAlive()) { + if (streamMonitor.isAlive()) { streamMonitor.shouldRun(false); streamMonitor.interrupt(); try { streamMonitor.join(3000); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index b1076f28942..0f89b83bf8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -282,7 +282,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - GETATTR3Request request = null; + GETATTR3Request request; try { request = GETATTR3Request.deserialize(xdr); } catch (IOException e) { @@ -374,7 +374,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - SETATTR3Request request = null; + SETATTR3Request request; try { request = SETATTR3Request.deserialize(xdr); } catch (IOException e) { @@ -459,7 +459,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - LOOKUP3Request request = null; + LOOKUP3Request request; try { request = LOOKUP3Request.deserialize(xdr); } catch (IOException e) { @@ -527,7 +527,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - ACCESS3Request request = null; + ACCESS3Request request; try { request = ACCESS3Request.deserialize(xdr); } catch (IOException e) { @@ -594,7 +594,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - READLINK3Request request = null; + READLINK3Request request; try { request = READLINK3Request.deserialize(xdr); @@ -668,7 +668,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - READ3Request request = null; + READ3Request request; try { request = READ3Request.deserialize(xdr); @@ -710,7 +710,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { securityHandler.getUid(), securityHandler.getGid(), securityHandler.getAuxGids(), attrs); if ((access & Nfs3Constant.ACCESS3_READ) != 0) { - eof = offset < attrs.getSize() ? false : true; + eof = offset >= attrs.getSize(); return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof, ByteBuffer.wrap(new byte[0])); } else { @@ -749,7 +749,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } catch (IOException e) { // TODO: A cleaner way is to throw a new type of exception // which requires incompatible changes. - if (e.getMessage() == "Stream closed") { + if (e.getMessage().equals("Stream closed")) { clientCache.invalidateDfsInputStream(userName, Nfs3Utils.getFileIdPath(handle)); continue; @@ -769,7 +769,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { if (readCount < 0) { readCount = 0; } - eof = (offset + readCount) < attrs.getSize() ? false : true; + eof = (offset + readCount) >= attrs.getSize(); return new READ3Response(Nfs3Status.NFS3_OK, attrs, readCount, eof, ByteBuffer.wrap(readbuffer)); @@ -801,7 +801,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - WRITE3Request request = null; + WRITE3Request request; try { request = WRITE3Request.deserialize(xdr); @@ -883,7 +883,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - CREATE3Request request = null; + CREATE3Request request; try { request = CREATE3Request.deserialize(xdr); @@ -1017,7 +1017,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - MKDIR3Request request = null; + MKDIR3Request request; try { request = MKDIR3Request.deserialize(xdr); @@ -1114,7 +1114,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - REMOVE3Request request = null; + REMOVE3Request request; try { request = REMOVE3Request.deserialize(xdr); } catch (IOException e) { @@ -1194,7 +1194,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - RMDIR3Request request = null; + RMDIR3Request request; try { request = RMDIR3Request.deserialize(xdr); } catch (IOException e) { @@ -1375,7 +1375,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - SYMLINK3Request request = null; + SYMLINK3Request request; try { request = SYMLINK3Request.deserialize(xdr); } catch (IOException e) { @@ -1431,7 +1431,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { */ private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath, byte[] startAfter) throws IOException { - DirectoryListing dlisting = null; + DirectoryListing dlisting; try { dlisting = dfsClient.listPaths(dirFileIdPath, startAfter); } catch (RemoteException e) { @@ -1468,7 +1468,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - READDIR3Request request = null; + READDIR3Request request; try { request = READDIR3Request.deserialize(xdr); } catch (IOException e) { @@ -1492,9 +1492,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { + cookie + " count: " + count); } - HdfsFileStatus dirStatus = null; - DirectoryListing dlisting = null; - Nfs3FileAttributes postOpAttr = null; + HdfsFileStatus dirStatus; + DirectoryListing dlisting; + Nfs3FileAttributes postOpAttr; long dotdotFileId = 0; try { String dirFileIdPath = Nfs3Utils.getFileIdPath(handle); @@ -1657,8 +1657,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } HdfsFileStatus dirStatus; - DirectoryListing dlisting = null; - Nfs3FileAttributes postOpDirAttr = null; + DirectoryListing dlisting; + Nfs3FileAttributes postOpDirAttr; long dotdotFileId = 0; HdfsFileStatus dotdotStatus = null; try { @@ -1803,7 +1803,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - FSSTAT3Request request = null; + FSSTAT3Request request; try { request = FSSTAT3Request.deserialize(xdr); } catch (IOException e) { @@ -1877,7 +1877,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - FSINFO3Request request = null; + FSINFO3Request request; try { request = FSINFO3Request.deserialize(xdr); } catch (IOException e) { @@ -1941,7 +1941,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - PATHCONF3Request request = null; + PATHCONF3Request request; try { request = PATHCONF3Request.deserialize(xdr); } catch (IOException e) { @@ -1992,7 +1992,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - COMMIT3Request request = null; + COMMIT3Request request; try { request = COMMIT3Request.deserialize(xdr); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index 3b5885ed897..758fd3998b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -47,14 +47,20 @@ class WriteCtx { public static enum DataState { ALLOW_DUMP, NO_DUMP, - DUMPED; + DUMPED } private final FileHandle handle; private final long offset; private final int count; - //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache() + /** + * Some clients can send a write that includes previously written data along + * with new data. In such case the write request is changed to write from only + * the new data. {@code originalCount} tracks the number of bytes sent in the + * request before it was modified to write only the new data. + * @see OpenFileCtx#addWritesToCache for more details + */ private final int originalCount; public static final int INVALID_ORIGINAL_COUNT = -1; @@ -173,7 +179,7 @@ class WriteCtx { public void writeData(HdfsDataOutputStream fos) throws IOException { Preconditions.checkState(fos != null); - ByteBuffer dataBuffer = null; + ByteBuffer dataBuffer; try { dataBuffer = getData(); } catch (Exception e1) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 149c6a101a5..b6ff34e390e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -646,6 +646,8 @@ Release 2.6.0 - UNRELEASED HDFS-7215.Add JvmPauseMonitor to NFS gateway (brandonli) + HDFS-7180. NFSv3 gateway frequently gets stuck due to GC (brandonli) + BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)