diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java index 8ef39c4766e..3878cbc74a2 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java @@ -52,6 +52,7 @@ abstract public class MountdBase { private void startUDPServer() { SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(), rpcProgram, 1); + rpcProgram.startDaemons(); udpServer.run(); } @@ -59,6 +60,7 @@ abstract public class MountdBase { private void startTCPServer() { SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), rpcProgram, 1); + rpcProgram.startDaemons(); tcpServer.run(); } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index a519ddd8416..0089bb0f3e9 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -20,7 +20,6 @@ package org.apache.hadoop.nfs.nfs3; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.portmap.PortmapMapping; @@ -32,34 +31,27 @@ import org.apache.hadoop.portmap.PortmapMapping; */ public abstract class Nfs3Base { public static final Log LOG = LogFactory.getLog(Nfs3Base.class); - private final MountdBase mountd; private final RpcProgram rpcProgram; private final int nfsPort; - - public MountdBase getMountBase() { - return mountd; - } - + public RpcProgram getRpcProgram() { return rpcProgram; } - protected Nfs3Base(MountdBase mountd, RpcProgram program, Configuration conf) { - this.mountd = mountd; - this.rpcProgram = program; + protected Nfs3Base(RpcProgram rpcProgram, Configuration conf) { + this.rpcProgram = rpcProgram; this.nfsPort = conf.getInt("nfs3.server.port", Nfs3Constant.PORT); - LOG.info("NFS server port set to: "+nfsPort); + LOG.info("NFS server port set to: " + nfsPort); } - protected Nfs3Base(MountdBase mountd, RpcProgram program) { - this.mountd = mountd; - this.rpcProgram = program; + protected Nfs3Base(RpcProgram rpcProgram) { + this.rpcProgram = rpcProgram; this.nfsPort = Nfs3Constant.PORT; } public void start(boolean register) { - mountd.start(register); // Start mountd startTCPServer(); // Start TCP server + if (register) { rpcProgram.register(PortmapMapping.TRANSPORT_TCP); } @@ -68,6 +60,7 @@ public abstract class Nfs3Base { private void startTCPServer() { SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort, rpcProgram, 0); + rpcProgram.startDaemons(); tcpServer.run(); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java index bedb58f0c4a..d7a0b037355 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java @@ -205,6 +205,11 @@ public class Nfs3Constant { public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs"; public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump"; public static final boolean ENABLE_FILE_DUMP_DEFAULT = true; + public static final String MAX_OPEN_FILES = "dfs.nfs3.max.open.files"; + public static final int MAX_OPEN_FILES_DEFAULT = 256; + public static final String OUTPUT_STREAM_TIMEOUT = "dfs.nfs3.stream.timeout"; + public static final long OUTPUT_STREAM_TIMEOUT_DEFAULT = 10 * 60 * 1000; // 10 minutes + public static final long OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT = 10 * 1000; //10 seconds public final static String UNKNOWN_USER = "nobody"; public final static String UNKNOWN_GROUP = "nobody"; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java index b04e7fca74d..8a1ff8a1d5a 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java @@ -83,4 +83,10 @@ public class WRITE3Request extends RequestWithHandle { xdr.writeInt(count); xdr.writeFixedOpaque(data.array(), count); } + + @Override + public String toString() { + return String.format("fileId: %d offset: %d count: %d stableHow: %s", + handle.getFileId(), offset, count, stableHow.name()); + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java index 36348980056..a8024172988 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -100,6 +100,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { } } + // Start extra daemons + public void startDaemons() {} + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java index 06b9890bf9a..b81504bddc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java @@ -23,33 +23,47 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.nfs.mount.Mountd; +import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.nfs.nfs3.Nfs3Base; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}. * Currently Mountd program is also started inside this class. * Only TCP server is supported and UDP is not supported. */ public class Nfs3 extends Nfs3Base { + private Mountd mountd; + static { Configuration.addDefaultResource("hdfs-default.xml"); Configuration.addDefaultResource("hdfs-site.xml"); } public Nfs3(List exports) throws IOException { - super(new Mountd(exports), new RpcProgramNfs3()); + super(new RpcProgramNfs3()); + mountd = new Mountd(exports); } + @VisibleForTesting public Nfs3(List exports, Configuration config) throws IOException { - super(new Mountd(exports, config), new RpcProgramNfs3(config), config); + super(new RpcProgramNfs3(config), config); + mountd = new Mountd(exports, config); } + public Mountd getMountd() { + return mountd; + } + public static void main(String[] args) throws IOException { StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); List exports = new ArrayList(); exports.add("/"); + final Nfs3 nfsServer = new Nfs3(exports); + nfsServer.mountd.start(true); // Start mountd nfsServer.start(true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index a6fb97bac5f..e999615d0e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.security.InvalidParameterException; import java.util.EnumSet; import java.util.Iterator; import java.util.Map.Entry; @@ -96,7 +95,7 @@ class OpenFileCtx { // It's updated after each sync to HDFS private Nfs3FileAttributes latestAttr; - + private final ConcurrentNavigableMap pendingWrites; private final ConcurrentNavigableMap pendingCommits; @@ -165,10 +164,22 @@ class OpenFileCtx { return System.currentTimeMillis() - lastAccessTime > streamTimeout; } + long getLastAccessTime() { + return lastAccessTime; + } + public long getNextOffset() { return nextOffset.get(); } + boolean getActiveState() { + return this.activeState; + } + + boolean hasPendingWork() { + return (pendingWrites.size() != 0 || pendingCommits.size() != 0); + } + // Increase or decrease the memory occupation of non-sequential writes private long updateNonSequentialWriteInMemory(long count) { long newValue = nonSequentialWriteInMemory.addAndGet(count); @@ -792,19 +803,18 @@ class OpenFileCtx { * @return true, remove stream; false, keep stream */ public synchronized boolean streamCleanup(long fileId, long streamTimeout) { - if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { - throw new InvalidParameterException("StreamTimeout" + streamTimeout - + "ms is less than MINIMIUM_STREAM_TIMEOUT " - + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms"); + Preconditions + .checkState(streamTimeout >= Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + if (!activeState) { + return true; } boolean flag = false; // Check the stream timeout if (checkStreamTimeout(streamTimeout)) { if (LOG.isDebugEnabled()) { - LOG.debug("closing stream for fileId:" + fileId); + LOG.debug("stream can be closed for fileId:" + fileId); } - cleanup(); flag = true; } return flag; @@ -975,7 +985,7 @@ class OpenFileCtx { FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " - + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + + offset + " length:" + count + " stableHow:" + stableHow.name()); } try { @@ -1056,7 +1066,7 @@ class OpenFileCtx { } } - private synchronized void cleanup() { + synchronized void cleanup() { if (!activeState) { LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); return; @@ -1064,7 +1074,7 @@ class OpenFileCtx { activeState = false; // stop the dump thread - if (dumpThread != null) { + if (dumpThread != null && dumpThread.isAlive()) { dumpThread.interrupt(); try { dumpThread.join(3000); @@ -1146,4 +1156,10 @@ class OpenFileCtx { void setActiveStatusForTest(boolean activeState) { this.activeState = activeState; } + + @Override + public String toString() { + return String.format("activeState: %b asyncStatus: %b nextOffset: %d", + activeState, asyncStatus, nextOffset.get()); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java new file mode 100644 index 00000000000..d255e3d9b99 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; +import org.apache.hadoop.util.Daemon; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * A cache saves OpenFileCtx objects for different users. Each cache entry is + * used to maintain the writing context for a single file. + */ +class OpenFileCtxCache { + private static final Log LOG = LogFactory.getLog(OpenFileCtxCache.class); + // Insert and delete with openFileMap are synced + private final ConcurrentMap openFileMap = Maps + .newConcurrentMap(); + + private final int maxStreams; + private final long streamTimeout; + private final StreamMonitor streamMonitor; + + OpenFileCtxCache(Configuration config, long streamTimeout) { + maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES, + Nfs3Constant.MAX_OPEN_FILES_DEFAULT); + LOG.info("Maximum open streams is " + maxStreams); + this.streamTimeout = streamTimeout; + streamMonitor = new StreamMonitor(); + } + + /** + * The entry to be evicted is based on the following rules:
+ * 1. if the OpenFileCtx has any pending task, it will not be chosen.
+ * 2. if there is inactive OpenFileCtx, the first found one is to evict.
+ * 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one + * is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it + * will be evicted. Otherwise, the whole eviction request is failed. + */ + @VisibleForTesting + Entry getEntryToEvict() { + Iterator> it = openFileMap.entrySet() + .iterator(); + if (LOG.isTraceEnabled()) { + LOG.trace("openFileMap size:" + openFileMap.size()); + } + + Entry idlest = null; + + while (it.hasNext()) { + Entry pairs = it.next(); + OpenFileCtx ctx = pairs.getValue(); + if (!ctx.getActiveState()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got one inactive stream: " + ctx); + } + return pairs; + } + if (ctx.hasPendingWork()) { + // Always skip files with pending work. + continue; + } + if (idlest == null) { + idlest = pairs; + } else { + if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) { + idlest = pairs; + } + } + } + + if (idlest == null) { + LOG.warn("No eviction candidate. All streams have pending work."); + return null; + } else { + long idleTime = System.currentTimeMillis() + - idlest.getValue().getLastAccessTime(); + if (idleTime < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) { + if (LOG.isDebugEnabled()) { + LOG.debug("idlest stream's idle time:" + idleTime); + } + LOG.warn("All opened streams are busy, can't remove any from cache."); + return null; + } else { + return idlest; + } + } + } + + boolean put(FileHandle h, OpenFileCtx context) { + OpenFileCtx toEvict = null; + synchronized (this) { + Preconditions.checkState(openFileMap.size() <= this.maxStreams, + "stream cache size " + openFileMap.size() + + " is larger than maximum" + this.maxStreams); + if (openFileMap.size() == this.maxStreams) { + Entry pairs = getEntryToEvict(); + if (pairs ==null) { + return false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Evict stream ctx: " + pairs.getValue()); + } + toEvict = openFileMap.remove(pairs.getKey()); + Preconditions.checkState(toEvict == pairs.getValue(), + "The deleted entry is not the same as odlest found."); + } + } + openFileMap.put(h, context); + } + + // Cleanup the old stream outside the lock + if (toEvict != null) { + toEvict.cleanup(); + } + return true; + } + + @VisibleForTesting + void scan(long streamTimeout) { + ArrayList ctxToRemove = new ArrayList(); + Iterator> it = openFileMap.entrySet() + .iterator(); + if (LOG.isTraceEnabled()) { + LOG.trace("openFileMap size:" + openFileMap.size()); + } + + while (it.hasNext()) { + Entry pairs = it.next(); + FileHandle handle = pairs.getKey(); + OpenFileCtx ctx = pairs.getValue(); + if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) { + continue; + } + + // Check it again inside lock before removing + synchronized (this) { + OpenFileCtx ctx2 = openFileMap.get(handle); + if (ctx2 != null) { + if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) { + openFileMap.remove(handle); + if (LOG.isDebugEnabled()) { + LOG.debug("After remove stream " + handle.getFileId() + + ", the stream number:" + openFileMap.size()); + } + ctxToRemove.add(ctx2); + } + } + } + } + + // Invoke the cleanup outside the lock + for (OpenFileCtx ofc : ctxToRemove) { + ofc.cleanup(); + } + } + + OpenFileCtx get(FileHandle key) { + return openFileMap.get(key); + } + + int size() { + return openFileMap.size(); + } + + void start() { + streamMonitor.start(); + } + + // Evict all entries + void cleanAll() { + ArrayList cleanedContext = new ArrayList(); + synchronized (this) { + Iterator> it = openFileMap.entrySet() + .iterator(); + if (LOG.isTraceEnabled()) { + LOG.trace("openFileMap size:" + openFileMap.size()); + } + + while (it.hasNext()) { + Entry pairs = it.next(); + OpenFileCtx ctx = pairs.getValue(); + it.remove(); + cleanedContext.add(ctx); + } + } + + // Invoke the cleanup outside the lock + for (OpenFileCtx ofc : cleanedContext) { + ofc.cleanup(); + } + } + + void shutdown() { + // stop the dump thread + if (streamMonitor != null && streamMonitor.isAlive()) { + streamMonitor.shouldRun(false); + streamMonitor.interrupt(); + try { + streamMonitor.join(3000); + } catch (InterruptedException e) { + } + } + + cleanAll(); + } + + /** + * StreamMonitor wakes up periodically to find and closes idle streams. + */ + class StreamMonitor extends Daemon { + private final static int rotation = 5 * 1000; // 5 seconds + private long lastWakeupTime = 0; + private boolean shouldRun = true; + + void shouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + + @Override + public void run() { + while (shouldRun) { + scan(streamTimeout); + + // Check if it can sleep + try { + long workedTime = System.currentTimeMillis() - lastWakeupTime; + if (workedTime < rotation) { + if (LOG.isTraceEnabled()) { + LOG.trace("StreamMonitor can still have a sleep:" + + ((rotation - workedTime) / 1000)); + } + Thread.sleep(rotation - workedTime); + } + lastWakeupTime = System.currentTimeMillis(); + + } catch (InterruptedException e) { + LOG.info("StreamMonitor got interrupted"); + return; + } + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 7ee9f067661..59412382573 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -214,6 +214,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } } + @Override + public void startDaemons() { + writeManager.startAsyncDataSerivce(); + } + /****************************************************** * RPC call handlers ******************************************************/ @@ -778,7 +783,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { int createMode = request.getMode(); if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE) - && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) { + && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE) + && request.getObjAttr().getSize() != 0) { LOG.error("Setting file size is not supported when creating file: " + fileName + " dir fileId:" + dirHandle.getFileId()); return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL); @@ -831,6 +837,23 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug); + + // Add open stream + OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, + writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug); + fileHandle = new FileHandle(postOpObjAttr.getFileId()); + if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) { + LOG.warn("Can't add more stream, close it." + + " Future write will become append"); + fos.close(); + fos = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Opened stream for file:" + fileName + ", fileId:" + + fileHandle.getFileId()); + } + } + } catch (IOException e) { LOG.error("Exception", e); if (fos != null) { @@ -859,16 +882,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } } - // Add open stream - OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir - + "/" + postOpObjAttr.getFileId(), dfsClient, iug); - fileHandle = new FileHandle(postOpObjAttr.getFileId()); - writeManager.addOpenFileStream(fileHandle, openFileCtx); - if (LOG.isDebugEnabled()) { - LOG.debug("open stream for file:" + fileName + ", fileId:" - + fileHandle.getFileId()); - } - return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr, dirWcc); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index cb8bf865693..aa6a8a3650b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -29,11 +27,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.Nfs3Constant; -import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; @@ -56,69 +55,70 @@ public class WriteManager { private final Configuration config; private final IdUserGroup iug; - private final ConcurrentMap openFileMap = Maps - .newConcurrentMap(); - + private AsyncDataService asyncDataService; private boolean asyncDataServiceStarted = false; - private final StreamMonitor streamMonitor; - + private final int maxStreams; + /** * The time limit to wait for accumulate reordered sequential writes to the * same file before the write is considered done. */ private long streamTimeout; - - public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes - public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds - - void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { - openFileMap.put(h, ctx); - if (LOG.isDebugEnabled()) { - LOG.debug("After add the new stream " + h.getFileId() - + ", the stream number:" + openFileMap.size()); + + private final OpenFileCtxCache fileContextCache; + + static public class MultipleCachedStreamException extends IOException { + private static final long serialVersionUID = 1L; + + public MultipleCachedStreamException(String msg) { + super(msg); } } + boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) { + return fileContextCache.put(h, ctx); + } + WriteManager(IdUserGroup iug, final Configuration config) { this.iug = iug; this.config = config; - - streamTimeout = config.getLong("dfs.nfs3.stream.timeout", - DEFAULT_STREAM_TIMEOUT); + streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT, + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT); LOG.info("Stream timeout is " + streamTimeout + "ms."); - if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) { + if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) { LOG.info("Reset stream timeout to minimum value " - + MINIMIUM_STREAM_TIMEOUT + "ms."); - streamTimeout = MINIMIUM_STREAM_TIMEOUT; + + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms."); + streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT; } - - this.streamMonitor = new StreamMonitor(); + maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES, + Nfs3Constant.MAX_OPEN_FILES_DEFAULT); + LOG.info("Maximum open streams is "+ maxStreams); + this.fileContextCache = new OpenFileCtxCache(config, streamTimeout); } - private void startAsyncDataSerivce() { - streamMonitor.start(); + void startAsyncDataSerivce() { + if (asyncDataServiceStarted) { + return; + } + fileContextCache.start(); this.asyncDataService = new AsyncDataService(); asyncDataServiceStarted = true; } - private void shutdownAsyncDataService() { - asyncDataService.shutdown(); + void shutdownAsyncDataService() { + if (!asyncDataServiceStarted) { + return; + } asyncDataServiceStarted = false; - streamMonitor.interrupt(); + asyncDataService.shutdown(); + fileContextCache.shutdown(); } void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, Nfs3FileAttributes preOpAttr) throws IOException { - // First write request starts the async data service - if (!asyncDataServiceStarted) { - startAsyncDataSerivce(); - } - - long offset = request.getOffset(); int count = request.getCount(); - WriteStableHow stableHow = request.getStableHow(); byte[] data = request.getData().array(); if (data.length < count) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL); @@ -129,13 +129,12 @@ public class WriteManager { FileHandle handle = request.getHandle(); if (LOG.isDebugEnabled()) { - LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: " - + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + LOG.debug("handleWrite " + request); } // Check if there is a stream to write FileHandle fileHandle = request.getHandle(); - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId()); @@ -150,6 +149,15 @@ public class WriteManager { fos = dfsClient.append(fileIdPath, bufferSize, null, null); latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (RemoteException e) { + IOException io = e.unwrapRemoteException(); + if (io instanceof AlreadyBeingCreatedException) { + LOG.warn("Can't append file:" + fileIdPath + + ". Possibly the file is being closed. Drop the request:" + + request + ", wait for the client to retry..."); + return; + } + throw e; } catch (IOException e) { LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e); if (fos != null) { @@ -170,9 +178,26 @@ public class WriteManager { Nfs3Constant.FILE_DUMP_DIR_DEFAULT); openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" + fileHandle.getFileId(), dfsClient, iug); - addOpenFileStream(fileHandle, openFileCtx); + + if (!addOpenFileStream(fileHandle, openFileCtx)) { + LOG.info("Can't add new stream. Close it. Tell client to retry."); + try { + fos.close(); + } catch (IOException e) { + LOG.error("Can't close stream for fileId:" + handle.getFileId()); + } + // Notify client to retry + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX, + fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + return; + } + if (LOG.isDebugEnabled()) { - LOG.debug("opened stream for file:" + fileHandle.getFileId()); + LOG.debug("Opened stream for appending file:" + fileHandle.getFileId()); } } @@ -185,7 +210,7 @@ public class WriteManager { void handleCommit(DFSClient dfsClient, FileHandle fileHandle, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { int status; - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId() @@ -238,7 +263,7 @@ public class WriteManager { String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle); Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if (attr != null) { - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); attr.setUsed(openFileCtx.getNextOffset()); @@ -253,8 +278,8 @@ public class WriteManager { Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) { - OpenFileCtx openFileCtx = openFileMap - .get(new FileHandle(attr.getFileId())); + OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr + .getFileId())); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); @@ -263,56 +288,9 @@ public class WriteManager { } return attr; } - + @VisibleForTesting - ConcurrentMap getOpenFileMap() { - return this.openFileMap; - } - - /** - * StreamMonitor wakes up periodically to find and closes idle streams. - */ - class StreamMonitor extends Daemon { - private int rotation = 5 * 1000; // 5 seconds - private long lastWakeupTime = 0; - - @Override - public void run() { - while (true) { - Iterator> it = openFileMap.entrySet() - .iterator(); - if (LOG.isTraceEnabled()) { - LOG.trace("openFileMap size:" + openFileMap.size()); - } - while (it.hasNext()) { - Entry pairs = it.next(); - OpenFileCtx ctx = pairs.getValue(); - if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) { - it.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("After remove stream " + pairs.getKey().getFileId() - + ", the stream number:" + openFileMap.size()); - } - } - } - - // Check if it can sleep - try { - long workedTime = System.currentTimeMillis() - lastWakeupTime; - if (workedTime < rotation) { - if (LOG.isTraceEnabled()) { - LOG.trace("StreamMonitor can still have a sleep:" - + ((rotation - workedTime) / 1000)); - } - Thread.sleep(rotation - workedTime); - } - lastWakeupTime = System.currentTimeMillis(); - - } catch (InterruptedException e) { - LOG.info("StreamMonitor got interrupted"); - return; - } - } - } + OpenFileCtxCache getOpenFileCtxCache() { + return this.fileContextCache; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java index 7965a140d7f..977902a6845 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java @@ -51,7 +51,7 @@ public class TestMountd { Nfs3 nfs3 = new Nfs3(exports, config); nfs3.start(false); - RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase() + RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd() .getRpcProgram(); mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost")); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java index 2e8869e5a93..1f0ba43f39b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java @@ -135,6 +135,7 @@ public class TestOutOfOrderWrite { @Override protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { + @Override public ChannelPipeline getPipeline() { return Channels.pipeline( RpcUtil.constructRpcFrameDecoder(), diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java new file mode 100644 index 00000000000..0d36fad937d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx; +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.IdUserGroup; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; +import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestOpenFileCtxCache { + static boolean cleaned = false; + + @Test + public void testEviction() throws IOException, InterruptedException { + Configuration conf = new Configuration(); + + // Only two entries will be in the cache + conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2); + + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + + OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); + + boolean ret = cache.put(new FileHandle(1), context1); + assertTrue(ret); + Thread.sleep(1000); + ret = cache.put(new FileHandle(2), context2); + assertTrue(ret); + ret = cache.put(new FileHandle(3), context3); + assertFalse(ret); + assertTrue(cache.size() == 2); + + // Wait for the oldest stream to be evict-able, insert again + Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + assertTrue(cache.size() == 2); + + ret = cache.put(new FileHandle(3), context3); + assertTrue(ret); + assertTrue(cache.size() == 2); + assertTrue(cache.get(new FileHandle(1)) == null); + + // Test inactive entry is evicted immediately + context3.setActiveStatusForTest(false); + ret = cache.put(new FileHandle(4), context4); + assertTrue(ret); + + // Now the cache has context2 and context4 + // Test eviction failure if all entries have pending work. + context2.getPendingWritesForTest().put(new OffsetRange(0, 100), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + context4.getPendingCommitsForTest().put(new Long(100), + new CommitCtx(0, null, 0, attr)); + Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + ret = cache.put(new FileHandle(5), context5); + assertFalse(ret); + } + + @Test + public void testScan() throws IOException, InterruptedException { + Configuration conf = new Configuration(); + + // Only two entries will be in the cache + conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2); + + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + + OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); + + // Test cleaning expired entry + boolean ret = cache.put(new FileHandle(1), context1); + assertTrue(ret); + ret = cache.put(new FileHandle(2), context2); + assertTrue(ret); + Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1); + cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + assertTrue(cache.size() == 0); + + // Test cleaning inactive entry + ret = cache.put(new FileHandle(3), context3); + assertTrue(ret); + ret = cache.put(new FileHandle(4), context4); + assertTrue(ret); + context3.setActiveStatusForTest(false); + cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT); + assertTrue(cache.size() == 1); + assertTrue(cache.get(new FileHandle(3)) == null); + assertTrue(cache.get(new FileHandle(4)) != null); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 30b93959591..e721db69c16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -186,9 +186,8 @@ public class TestWrites { private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) throws InterruptedException { int waitedTime = 0; - ConcurrentMap openFileMap = nfsd.getWriteManager() - .getOpenFileMap(); - OpenFileCtx ctx = openFileMap.get(handle); + OpenFileCtx ctx = nfsd.getWriteManager() + .getOpenFileCtxCache().get(handle); assertTrue(ctx != null); do { Thread.sleep(3000); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 96c3b333db8..821e31e8b41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -596,6 +596,8 @@ Release 2.2.1 - UNRELEASED HDFS-5252. Stable write is not handled correctly in someplace. (brandonli) + HDFS-5364. Add OpenFileCtx cache. (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES