HDFS-5364. Merging change r1539834 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1539891 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cbe493200b
commit
5f82524fe3
|
@ -52,6 +52,7 @@ abstract public class MountdBase {
|
|||
private void startUDPServer() {
|
||||
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
|
||||
rpcProgram, 1);
|
||||
rpcProgram.startDaemons();
|
||||
udpServer.run();
|
||||
}
|
||||
|
||||
|
@ -59,6 +60,7 @@ abstract public class MountdBase {
|
|||
private void startTCPServer() {
|
||||
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
|
||||
rpcProgram, 1);
|
||||
rpcProgram.startDaemons();
|
||||
tcpServer.run();
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.nfs.nfs3;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mount.MountdBase;
|
||||
import org.apache.hadoop.oncrpc.RpcProgram;
|
||||
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
||||
import org.apache.hadoop.portmap.PortmapMapping;
|
||||
|
@ -32,34 +31,27 @@ import org.apache.hadoop.portmap.PortmapMapping;
|
|||
*/
|
||||
public abstract class Nfs3Base {
|
||||
public static final Log LOG = LogFactory.getLog(Nfs3Base.class);
|
||||
private final MountdBase mountd;
|
||||
private final RpcProgram rpcProgram;
|
||||
private final int nfsPort;
|
||||
|
||||
public MountdBase getMountBase() {
|
||||
return mountd;
|
||||
}
|
||||
|
||||
|
||||
public RpcProgram getRpcProgram() {
|
||||
return rpcProgram;
|
||||
}
|
||||
|
||||
protected Nfs3Base(MountdBase mountd, RpcProgram program, Configuration conf) {
|
||||
this.mountd = mountd;
|
||||
this.rpcProgram = program;
|
||||
protected Nfs3Base(RpcProgram rpcProgram, Configuration conf) {
|
||||
this.rpcProgram = rpcProgram;
|
||||
this.nfsPort = conf.getInt("nfs3.server.port", Nfs3Constant.PORT);
|
||||
LOG.info("NFS server port set to: "+nfsPort);
|
||||
LOG.info("NFS server port set to: " + nfsPort);
|
||||
}
|
||||
|
||||
protected Nfs3Base(MountdBase mountd, RpcProgram program) {
|
||||
this.mountd = mountd;
|
||||
this.rpcProgram = program;
|
||||
protected Nfs3Base(RpcProgram rpcProgram) {
|
||||
this.rpcProgram = rpcProgram;
|
||||
this.nfsPort = Nfs3Constant.PORT;
|
||||
}
|
||||
|
||||
public void start(boolean register) {
|
||||
mountd.start(register); // Start mountd
|
||||
startTCPServer(); // Start TCP server
|
||||
|
||||
if (register) {
|
||||
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
|
||||
}
|
||||
|
@ -68,6 +60,7 @@ public abstract class Nfs3Base {
|
|||
private void startTCPServer() {
|
||||
SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
|
||||
rpcProgram, 0);
|
||||
rpcProgram.startDaemons();
|
||||
tcpServer.run();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -205,6 +205,11 @@ public class Nfs3Constant {
|
|||
public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs";
|
||||
public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump";
|
||||
public static final boolean ENABLE_FILE_DUMP_DEFAULT = true;
|
||||
public static final String MAX_OPEN_FILES = "dfs.nfs3.max.open.files";
|
||||
public static final int MAX_OPEN_FILES_DEFAULT = 256;
|
||||
public static final String OUTPUT_STREAM_TIMEOUT = "dfs.nfs3.stream.timeout";
|
||||
public static final long OUTPUT_STREAM_TIMEOUT_DEFAULT = 10 * 60 * 1000; // 10 minutes
|
||||
public static final long OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT = 10 * 1000; //10 seconds
|
||||
|
||||
public final static String UNKNOWN_USER = "nobody";
|
||||
public final static String UNKNOWN_GROUP = "nobody";
|
||||
|
|
|
@ -83,4 +83,10 @@ public class WRITE3Request extends RequestWithHandle {
|
|||
xdr.writeInt(count);
|
||||
xdr.writeFixedOpaque(data.array(), count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("fileId: %d offset: %d count: %d stableHow: %s",
|
||||
handle.getFileId(), offset, count, stableHow.name());
|
||||
}
|
||||
}
|
|
@ -100,6 +100,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
|||
}
|
||||
}
|
||||
|
||||
// Start extra daemons
|
||||
public void startDaemons() {}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||
throws Exception {
|
||||
|
|
|
@ -23,33 +23,47 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
||||
import org.apache.hadoop.mount.MountdBase;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
|
||||
* Currently Mountd program is also started inside this class.
|
||||
* Only TCP server is supported and UDP is not supported.
|
||||
*/
|
||||
public class Nfs3 extends Nfs3Base {
|
||||
private Mountd mountd;
|
||||
|
||||
static {
|
||||
Configuration.addDefaultResource("hdfs-default.xml");
|
||||
Configuration.addDefaultResource("hdfs-site.xml");
|
||||
}
|
||||
|
||||
public Nfs3(List<String> exports) throws IOException {
|
||||
super(new Mountd(exports), new RpcProgramNfs3());
|
||||
super(new RpcProgramNfs3());
|
||||
mountd = new Mountd(exports);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Nfs3(List<String> exports, Configuration config) throws IOException {
|
||||
super(new Mountd(exports, config), new RpcProgramNfs3(config), config);
|
||||
super(new RpcProgramNfs3(config), config);
|
||||
mountd = new Mountd(exports, config);
|
||||
}
|
||||
|
||||
public Mountd getMountd() {
|
||||
return mountd;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
|
||||
List<String> exports = new ArrayList<String>();
|
||||
exports.add("/");
|
||||
|
||||
final Nfs3 nfsServer = new Nfs3(exports);
|
||||
nfsServer.mountd.start(true); // Start mountd
|
||||
nfsServer.start(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.io.IOException;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.security.InvalidParameterException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -96,7 +95,7 @@ class OpenFileCtx {
|
|||
|
||||
// It's updated after each sync to HDFS
|
||||
private Nfs3FileAttributes latestAttr;
|
||||
|
||||
|
||||
private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
|
||||
|
||||
private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
|
||||
|
@ -165,10 +164,22 @@ class OpenFileCtx {
|
|||
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
|
||||
}
|
||||
|
||||
long getLastAccessTime() {
|
||||
return lastAccessTime;
|
||||
}
|
||||
|
||||
public long getNextOffset() {
|
||||
return nextOffset.get();
|
||||
}
|
||||
|
||||
boolean getActiveState() {
|
||||
return this.activeState;
|
||||
}
|
||||
|
||||
boolean hasPendingWork() {
|
||||
return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
|
||||
}
|
||||
|
||||
// Increase or decrease the memory occupation of non-sequential writes
|
||||
private long updateNonSequentialWriteInMemory(long count) {
|
||||
long newValue = nonSequentialWriteInMemory.addAndGet(count);
|
||||
|
@ -800,19 +811,18 @@ class OpenFileCtx {
|
|||
* @return true, remove stream; false, keep stream
|
||||
*/
|
||||
public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
|
||||
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
|
||||
throw new InvalidParameterException("StreamTimeout" + streamTimeout
|
||||
+ "ms is less than MINIMIUM_STREAM_TIMEOUT "
|
||||
+ WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
|
||||
Preconditions
|
||||
.checkState(streamTimeout >= Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
|
||||
if (!activeState) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean flag = false;
|
||||
// Check the stream timeout
|
||||
if (checkStreamTimeout(streamTimeout)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("closing stream for fileId:" + fileId);
|
||||
LOG.debug("stream can be closed for fileId:" + fileId);
|
||||
}
|
||||
cleanup();
|
||||
flag = true;
|
||||
}
|
||||
return flag;
|
||||
|
@ -985,7 +995,7 @@ class OpenFileCtx {
|
|||
FileHandle handle = writeCtx.getHandle();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
|
||||
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
|
||||
+ offset + " length:" + count + " stableHow:" + stableHow.name());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -1066,7 +1076,7 @@ class OpenFileCtx {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized void cleanup() {
|
||||
synchronized void cleanup() {
|
||||
if (!activeState) {
|
||||
LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
|
||||
return;
|
||||
|
@ -1074,7 +1084,7 @@ class OpenFileCtx {
|
|||
activeState = false;
|
||||
|
||||
// stop the dump thread
|
||||
if (dumpThread != null) {
|
||||
if (dumpThread != null && dumpThread.isAlive()) {
|
||||
dumpThread.interrupt();
|
||||
try {
|
||||
dumpThread.join(3000);
|
||||
|
@ -1156,4 +1166,10 @@ class OpenFileCtx {
|
|||
void setActiveStatusForTest(boolean activeState) {
|
||||
this.activeState = activeState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("activeState: %b asyncStatus: %b nextOffset: %d",
|
||||
activeState, asyncStatus, nextOffset.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* A cache saves OpenFileCtx objects for different users. Each cache entry is
|
||||
* used to maintain the writing context for a single file.
|
||||
*/
|
||||
class OpenFileCtxCache {
|
||||
private static final Log LOG = LogFactory.getLog(OpenFileCtxCache.class);
|
||||
// Insert and delete with openFileMap are synced
|
||||
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
|
||||
.newConcurrentMap();
|
||||
|
||||
private final int maxStreams;
|
||||
private final long streamTimeout;
|
||||
private final StreamMonitor streamMonitor;
|
||||
|
||||
OpenFileCtxCache(Configuration config, long streamTimeout) {
|
||||
maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
|
||||
Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
|
||||
LOG.info("Maximum open streams is " + maxStreams);
|
||||
this.streamTimeout = streamTimeout;
|
||||
streamMonitor = new StreamMonitor();
|
||||
}
|
||||
|
||||
/**
|
||||
* The entry to be evicted is based on the following rules:<br>
|
||||
* 1. if the OpenFileCtx has any pending task, it will not be chosen.<br>
|
||||
* 2. if there is inactive OpenFileCtx, the first found one is to evict. <br>
|
||||
* 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one
|
||||
* is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it
|
||||
* will be evicted. Otherwise, the whole eviction request is failed.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
Entry<FileHandle, OpenFileCtx> getEntryToEvict() {
|
||||
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
|
||||
.iterator();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("openFileMap size:" + openFileMap.size());
|
||||
}
|
||||
|
||||
Entry<FileHandle, OpenFileCtx> idlest = null;
|
||||
|
||||
while (it.hasNext()) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = it.next();
|
||||
OpenFileCtx ctx = pairs.getValue();
|
||||
if (!ctx.getActiveState()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got one inactive stream: " + ctx);
|
||||
}
|
||||
return pairs;
|
||||
}
|
||||
if (ctx.hasPendingWork()) {
|
||||
// Always skip files with pending work.
|
||||
continue;
|
||||
}
|
||||
if (idlest == null) {
|
||||
idlest = pairs;
|
||||
} else {
|
||||
if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) {
|
||||
idlest = pairs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (idlest == null) {
|
||||
LOG.warn("No eviction candidate. All streams have pending work.");
|
||||
return null;
|
||||
} else {
|
||||
long idleTime = System.currentTimeMillis()
|
||||
- idlest.getValue().getLastAccessTime();
|
||||
if (idleTime < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("idlest stream's idle time:" + idleTime);
|
||||
}
|
||||
LOG.warn("All opened streams are busy, can't remove any from cache.");
|
||||
return null;
|
||||
} else {
|
||||
return idlest;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean put(FileHandle h, OpenFileCtx context) {
|
||||
OpenFileCtx toEvict = null;
|
||||
synchronized (this) {
|
||||
Preconditions.checkState(openFileMap.size() <= this.maxStreams,
|
||||
"stream cache size " + openFileMap.size()
|
||||
+ " is larger than maximum" + this.maxStreams);
|
||||
if (openFileMap.size() == this.maxStreams) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = getEntryToEvict();
|
||||
if (pairs ==null) {
|
||||
return false;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Evict stream ctx: " + pairs.getValue());
|
||||
}
|
||||
toEvict = openFileMap.remove(pairs.getKey());
|
||||
Preconditions.checkState(toEvict == pairs.getValue(),
|
||||
"The deleted entry is not the same as odlest found.");
|
||||
}
|
||||
}
|
||||
openFileMap.put(h, context);
|
||||
}
|
||||
|
||||
// Cleanup the old stream outside the lock
|
||||
if (toEvict != null) {
|
||||
toEvict.cleanup();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void scan(long streamTimeout) {
|
||||
ArrayList<OpenFileCtx> ctxToRemove = new ArrayList<OpenFileCtx>();
|
||||
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
|
||||
.iterator();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("openFileMap size:" + openFileMap.size());
|
||||
}
|
||||
|
||||
while (it.hasNext()) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = it.next();
|
||||
FileHandle handle = pairs.getKey();
|
||||
OpenFileCtx ctx = pairs.getValue();
|
||||
if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check it again inside lock before removing
|
||||
synchronized (this) {
|
||||
OpenFileCtx ctx2 = openFileMap.get(handle);
|
||||
if (ctx2 != null) {
|
||||
if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) {
|
||||
openFileMap.remove(handle);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After remove stream " + handle.getFileId()
|
||||
+ ", the stream number:" + openFileMap.size());
|
||||
}
|
||||
ctxToRemove.add(ctx2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Invoke the cleanup outside the lock
|
||||
for (OpenFileCtx ofc : ctxToRemove) {
|
||||
ofc.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
OpenFileCtx get(FileHandle key) {
|
||||
return openFileMap.get(key);
|
||||
}
|
||||
|
||||
int size() {
|
||||
return openFileMap.size();
|
||||
}
|
||||
|
||||
void start() {
|
||||
streamMonitor.start();
|
||||
}
|
||||
|
||||
// Evict all entries
|
||||
void cleanAll() {
|
||||
ArrayList<OpenFileCtx> cleanedContext = new ArrayList<OpenFileCtx>();
|
||||
synchronized (this) {
|
||||
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
|
||||
.iterator();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("openFileMap size:" + openFileMap.size());
|
||||
}
|
||||
|
||||
while (it.hasNext()) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = it.next();
|
||||
OpenFileCtx ctx = pairs.getValue();
|
||||
it.remove();
|
||||
cleanedContext.add(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
// Invoke the cleanup outside the lock
|
||||
for (OpenFileCtx ofc : cleanedContext) {
|
||||
ofc.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
// stop the dump thread
|
||||
if (streamMonitor != null && streamMonitor.isAlive()) {
|
||||
streamMonitor.shouldRun(false);
|
||||
streamMonitor.interrupt();
|
||||
try {
|
||||
streamMonitor.join(3000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
cleanAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* StreamMonitor wakes up periodically to find and closes idle streams.
|
||||
*/
|
||||
class StreamMonitor extends Daemon {
|
||||
private final static int rotation = 5 * 1000; // 5 seconds
|
||||
private long lastWakeupTime = 0;
|
||||
private boolean shouldRun = true;
|
||||
|
||||
void shouldRun(boolean shouldRun) {
|
||||
this.shouldRun = shouldRun;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (shouldRun) {
|
||||
scan(streamTimeout);
|
||||
|
||||
// Check if it can sleep
|
||||
try {
|
||||
long workedTime = System.currentTimeMillis() - lastWakeupTime;
|
||||
if (workedTime < rotation) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("StreamMonitor can still have a sleep:"
|
||||
+ ((rotation - workedTime) / 1000));
|
||||
}
|
||||
Thread.sleep(rotation - workedTime);
|
||||
}
|
||||
lastWakeupTime = System.currentTimeMillis();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("StreamMonitor got interrupted");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -214,6 +214,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startDaemons() {
|
||||
writeManager.startAsyncDataSerivce();
|
||||
}
|
||||
|
||||
/******************************************************
|
||||
* RPC call handlers
|
||||
******************************************************/
|
||||
|
@ -778,7 +783,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
|
||||
int createMode = request.getMode();
|
||||
if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE)
|
||||
&& request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) {
|
||||
&& request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)
|
||||
&& request.getObjAttr().getSize() != 0) {
|
||||
LOG.error("Setting file size is not supported when creating file: "
|
||||
+ fileName + " dir fileId:" + dirHandle.getFileId());
|
||||
return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL);
|
||||
|
@ -831,6 +837,23 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
|
||||
dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
|
||||
dfsClient, dirFileIdPath, iug);
|
||||
|
||||
// Add open stream
|
||||
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
|
||||
writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
|
||||
fileHandle = new FileHandle(postOpObjAttr.getFileId());
|
||||
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
|
||||
LOG.warn("Can't add more stream, close it."
|
||||
+ " Future write will become append");
|
||||
fos.close();
|
||||
fos = null;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Opened stream for file:" + fileName + ", fileId:"
|
||||
+ fileHandle.getFileId());
|
||||
}
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception", e);
|
||||
if (fos != null) {
|
||||
|
@ -859,16 +882,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
}
|
||||
}
|
||||
|
||||
// Add open stream
|
||||
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
|
||||
+ "/" + postOpObjAttr.getFileId(), dfsClient, iug);
|
||||
fileHandle = new FileHandle(postOpObjAttr.getFileId());
|
||||
writeManager.addOpenFileStream(fileHandle, openFileCtx);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("open stream for file:" + fileName + ", fileId:"
|
||||
+ fileHandle.getFileId());
|
||||
}
|
||||
|
||||
return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
|
||||
dirWcc);
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -29,11 +27,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.nfs.NfsFileType;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||
|
@ -56,69 +55,70 @@ public class WriteManager {
|
|||
|
||||
private final Configuration config;
|
||||
private final IdUserGroup iug;
|
||||
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
|
||||
.newConcurrentMap();
|
||||
|
||||
|
||||
private AsyncDataService asyncDataService;
|
||||
private boolean asyncDataServiceStarted = false;
|
||||
|
||||
private final StreamMonitor streamMonitor;
|
||||
|
||||
private final int maxStreams;
|
||||
|
||||
/**
|
||||
* The time limit to wait for accumulate reordered sequential writes to the
|
||||
* same file before the write is considered done.
|
||||
*/
|
||||
private long streamTimeout;
|
||||
|
||||
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
|
||||
public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
|
||||
|
||||
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
|
||||
openFileMap.put(h, ctx);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After add the new stream " + h.getFileId()
|
||||
+ ", the stream number:" + openFileMap.size());
|
||||
|
||||
private final OpenFileCtxCache fileContextCache;
|
||||
|
||||
static public class MultipleCachedStreamException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public MultipleCachedStreamException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
|
||||
return fileContextCache.put(h, ctx);
|
||||
}
|
||||
|
||||
WriteManager(IdUserGroup iug, final Configuration config) {
|
||||
this.iug = iug;
|
||||
this.config = config;
|
||||
|
||||
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
|
||||
DEFAULT_STREAM_TIMEOUT);
|
||||
streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT,
|
||||
Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
|
||||
LOG.info("Stream timeout is " + streamTimeout + "ms.");
|
||||
if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) {
|
||||
if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
|
||||
LOG.info("Reset stream timeout to minimum value "
|
||||
+ MINIMIUM_STREAM_TIMEOUT + "ms.");
|
||||
streamTimeout = MINIMIUM_STREAM_TIMEOUT;
|
||||
+ Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
|
||||
streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT;
|
||||
}
|
||||
|
||||
this.streamMonitor = new StreamMonitor();
|
||||
maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
|
||||
Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
|
||||
LOG.info("Maximum open streams is "+ maxStreams);
|
||||
this.fileContextCache = new OpenFileCtxCache(config, streamTimeout);
|
||||
}
|
||||
|
||||
private void startAsyncDataSerivce() {
|
||||
streamMonitor.start();
|
||||
void startAsyncDataSerivce() {
|
||||
if (asyncDataServiceStarted) {
|
||||
return;
|
||||
}
|
||||
fileContextCache.start();
|
||||
this.asyncDataService = new AsyncDataService();
|
||||
asyncDataServiceStarted = true;
|
||||
}
|
||||
|
||||
private void shutdownAsyncDataService() {
|
||||
asyncDataService.shutdown();
|
||||
void shutdownAsyncDataService() {
|
||||
if (!asyncDataServiceStarted) {
|
||||
return;
|
||||
}
|
||||
asyncDataServiceStarted = false;
|
||||
streamMonitor.interrupt();
|
||||
asyncDataService.shutdown();
|
||||
fileContextCache.shutdown();
|
||||
}
|
||||
|
||||
void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
int xid, Nfs3FileAttributes preOpAttr) throws IOException {
|
||||
// First write request starts the async data service
|
||||
if (!asyncDataServiceStarted) {
|
||||
startAsyncDataSerivce();
|
||||
}
|
||||
|
||||
long offset = request.getOffset();
|
||||
int count = request.getCount();
|
||||
WriteStableHow stableHow = request.getStableHow();
|
||||
byte[] data = request.getData().array();
|
||||
if (data.length < count) {
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
|
||||
|
@ -129,13 +129,12 @@ public class WriteManager {
|
|||
|
||||
FileHandle handle = request.getHandle();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: "
|
||||
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
|
||||
LOG.debug("handleWrite " + request);
|
||||
}
|
||||
|
||||
// Check if there is a stream to write
|
||||
FileHandle fileHandle = request.getHandle();
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
|
||||
|
||||
|
@ -150,6 +149,15 @@ public class WriteManager {
|
|||
fos = dfsClient.append(fileIdPath, bufferSize, null, null);
|
||||
|
||||
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
|
||||
} catch (RemoteException e) {
|
||||
IOException io = e.unwrapRemoteException();
|
||||
if (io instanceof AlreadyBeingCreatedException) {
|
||||
LOG.warn("Can't append file:" + fileIdPath
|
||||
+ ". Possibly the file is being closed. Drop the request:"
|
||||
+ request + ", wait for the client to retry...");
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
|
||||
if (fos != null) {
|
||||
|
@ -170,9 +178,26 @@ public class WriteManager {
|
|||
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
|
||||
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
|
||||
+ fileHandle.getFileId(), dfsClient, iug);
|
||||
addOpenFileStream(fileHandle, openFileCtx);
|
||||
|
||||
if (!addOpenFileStream(fileHandle, openFileCtx)) {
|
||||
LOG.info("Can't add new stream. Close it. Tell client to retry.");
|
||||
try {
|
||||
fos.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't close stream for fileId:" + handle.getFileId());
|
||||
}
|
||||
// Notify client to retry
|
||||
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX,
|
||||
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel,
|
||||
response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
|
||||
xid);
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("opened stream for file:" + fileHandle.getFileId());
|
||||
LOG.debug("Opened stream for appending file:" + fileHandle.getFileId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,7 +210,7 @@ public class WriteManager {
|
|||
void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
|
||||
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
||||
int status;
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
||||
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
|
||||
|
@ -238,7 +263,7 @@ public class WriteManager {
|
|||
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
|
||||
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
|
||||
if (attr != null) {
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
||||
if (openFileCtx != null) {
|
||||
attr.setSize(openFileCtx.getNextOffset());
|
||||
attr.setUsed(openFileCtx.getNextOffset());
|
||||
|
@ -253,8 +278,8 @@ public class WriteManager {
|
|||
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
|
||||
|
||||
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
|
||||
OpenFileCtx openFileCtx = openFileMap
|
||||
.get(new FileHandle(attr.getFileId()));
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
|
||||
.getFileId()));
|
||||
|
||||
if (openFileCtx != null) {
|
||||
attr.setSize(openFileCtx.getNextOffset());
|
||||
|
@ -263,56 +288,9 @@ public class WriteManager {
|
|||
}
|
||||
return attr;
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
|
||||
return this.openFileMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* StreamMonitor wakes up periodically to find and closes idle streams.
|
||||
*/
|
||||
class StreamMonitor extends Daemon {
|
||||
private int rotation = 5 * 1000; // 5 seconds
|
||||
private long lastWakeupTime = 0;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
|
||||
.iterator();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("openFileMap size:" + openFileMap.size());
|
||||
}
|
||||
while (it.hasNext()) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = it.next();
|
||||
OpenFileCtx ctx = pairs.getValue();
|
||||
if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
|
||||
it.remove();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After remove stream " + pairs.getKey().getFileId()
|
||||
+ ", the stream number:" + openFileMap.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if it can sleep
|
||||
try {
|
||||
long workedTime = System.currentTimeMillis() - lastWakeupTime;
|
||||
if (workedTime < rotation) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("StreamMonitor can still have a sleep:"
|
||||
+ ((rotation - workedTime) / 1000));
|
||||
}
|
||||
Thread.sleep(rotation - workedTime);
|
||||
}
|
||||
lastWakeupTime = System.currentTimeMillis();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("StreamMonitor got interrupted");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
OpenFileCtxCache getOpenFileCtxCache() {
|
||||
return this.fileContextCache;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TestMountd {
|
|||
Nfs3 nfs3 = new Nfs3(exports, config);
|
||||
nfs3.start(false);
|
||||
|
||||
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase()
|
||||
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
|
||||
.getRpcProgram();
|
||||
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
|
||||
|
||||
|
|
|
@ -135,6 +135,7 @@ public class TestOutOfOrderWrite {
|
|||
@Override
|
||||
protected ChannelPipelineFactory setPipelineFactory() {
|
||||
this.pipelineFactory = new ChannelPipelineFactory() {
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() {
|
||||
return Channels.pipeline(
|
||||
RpcUtil.constructRpcFrameDecoder(),
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestOpenFileCtxCache {
|
||||
static boolean cleaned = false;
|
||||
|
||||
@Test
|
||||
public void testEviction() throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
// Only two entries will be in the cache
|
||||
conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
|
||||
|
||||
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
||||
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||
|
||||
OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
|
||||
OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
|
||||
|
||||
boolean ret = cache.put(new FileHandle(1), context1);
|
||||
assertTrue(ret);
|
||||
Thread.sleep(1000);
|
||||
ret = cache.put(new FileHandle(2), context2);
|
||||
assertTrue(ret);
|
||||
ret = cache.put(new FileHandle(3), context3);
|
||||
assertFalse(ret);
|
||||
assertTrue(cache.size() == 2);
|
||||
|
||||
// Wait for the oldest stream to be evict-able, insert again
|
||||
Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
|
||||
assertTrue(cache.size() == 2);
|
||||
|
||||
ret = cache.put(new FileHandle(3), context3);
|
||||
assertTrue(ret);
|
||||
assertTrue(cache.size() == 2);
|
||||
assertTrue(cache.get(new FileHandle(1)) == null);
|
||||
|
||||
// Test inactive entry is evicted immediately
|
||||
context3.setActiveStatusForTest(false);
|
||||
ret = cache.put(new FileHandle(4), context4);
|
||||
assertTrue(ret);
|
||||
|
||||
// Now the cache has context2 and context4
|
||||
// Test eviction failure if all entries have pending work.
|
||||
context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
|
||||
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||
context4.getPendingCommitsForTest().put(new Long(100),
|
||||
new CommitCtx(0, null, 0, attr));
|
||||
Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
|
||||
ret = cache.put(new FileHandle(5), context5);
|
||||
assertFalse(ret);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScan() throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
// Only two entries will be in the cache
|
||||
conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
|
||||
|
||||
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
||||
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||
|
||||
OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
|
||||
dfsClient, new IdUserGroup());
|
||||
|
||||
OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
|
||||
|
||||
// Test cleaning expired entry
|
||||
boolean ret = cache.put(new FileHandle(1), context1);
|
||||
assertTrue(ret);
|
||||
ret = cache.put(new FileHandle(2), context2);
|
||||
assertTrue(ret);
|
||||
Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1);
|
||||
cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
|
||||
assertTrue(cache.size() == 0);
|
||||
|
||||
// Test cleaning inactive entry
|
||||
ret = cache.put(new FileHandle(3), context3);
|
||||
assertTrue(ret);
|
||||
ret = cache.put(new FileHandle(4), context4);
|
||||
assertTrue(ret);
|
||||
context3.setActiveStatusForTest(false);
|
||||
cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
|
||||
assertTrue(cache.size() == 1);
|
||||
assertTrue(cache.get(new FileHandle(3)) == null);
|
||||
assertTrue(cache.get(new FileHandle(4)) != null);
|
||||
}
|
||||
}
|
|
@ -186,9 +186,8 @@ public class TestWrites {
|
|||
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
||||
throws InterruptedException {
|
||||
int waitedTime = 0;
|
||||
ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
|
||||
.getOpenFileMap();
|
||||
OpenFileCtx ctx = openFileMap.get(handle);
|
||||
OpenFileCtx ctx = nfsd.getWriteManager()
|
||||
.getOpenFileCtxCache().get(handle);
|
||||
assertTrue(ctx != null);
|
||||
do {
|
||||
Thread.sleep(3000);
|
||||
|
|
|
@ -235,6 +235,8 @@ Release 2.2.1 - UNRELEASED
|
|||
|
||||
HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
|
||||
|
||||
HDFS-5364. Add OpenFileCtx cache. (brandonli)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
Loading…
Reference in New Issue