HDFS-5171. Merging change r1535586 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1535588 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
81d6bead1e
commit
c86c303a62
|
@ -20,15 +20,19 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Objects;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
|
@ -41,15 +45,52 @@ import com.google.common.cache.RemovalNotification;
|
||||||
class DFSClientCache {
|
class DFSClientCache {
|
||||||
private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
|
private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
|
||||||
/**
|
/**
|
||||||
* Cache that maps User id to corresponding DFSClient.
|
* Cache that maps User id to the corresponding DFSClient.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final LoadingCache<String, DFSClient> clientCache;
|
final LoadingCache<String, DFSClient> clientCache;
|
||||||
|
|
||||||
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
|
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache that maps <DFSClient, inode path> to the corresponding
|
||||||
|
* FSDataInputStream.
|
||||||
|
*/
|
||||||
|
final LoadingCache<DFSInputStreamCaheKey, FSDataInputStream> inputstreamCache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time to live for a DFSClient (in seconds)
|
||||||
|
*/
|
||||||
|
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024;
|
||||||
|
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60;
|
||||||
|
|
||||||
private final Configuration config;
|
private final Configuration config;
|
||||||
|
|
||||||
|
private static class DFSInputStreamCaheKey {
|
||||||
|
final String userId;
|
||||||
|
final String inodePath;
|
||||||
|
|
||||||
|
private DFSInputStreamCaheKey(String userId, String inodePath) {
|
||||||
|
super();
|
||||||
|
this.userId = userId;
|
||||||
|
this.inodePath = inodePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof DFSInputStreamCaheKey) {
|
||||||
|
DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj;
|
||||||
|
return userId.equals(k.userId) && inodePath.equals(k.inodePath);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hashCode(userId, inodePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DFSClientCache(Configuration config) {
|
DFSClientCache(Configuration config) {
|
||||||
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
|
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
|
||||||
}
|
}
|
||||||
|
@ -60,6 +101,12 @@ class DFSClientCache {
|
||||||
.maximumSize(clientCache)
|
.maximumSize(clientCache)
|
||||||
.removalListener(clientRemovealListener())
|
.removalListener(clientRemovealListener())
|
||||||
.build(clientLoader());
|
.build(clientLoader());
|
||||||
|
|
||||||
|
this.inputstreamCache = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE)
|
||||||
|
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
|
||||||
|
.removalListener(inputStreamRemovalListener())
|
||||||
|
.build(inputStreamLoader());
|
||||||
}
|
}
|
||||||
|
|
||||||
private CacheLoader<String, DFSClient> clientLoader() {
|
private CacheLoader<String, DFSClient> clientLoader() {
|
||||||
|
@ -95,7 +142,33 @@ class DFSClientCache {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient get(String userName) {
|
private RemovalListener<DFSInputStreamCaheKey, FSDataInputStream> inputStreamRemovalListener() {
|
||||||
|
return new RemovalListener<DFSClientCache.DFSInputStreamCaheKey, FSDataInputStream>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onRemoval(
|
||||||
|
RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) {
|
||||||
|
try {
|
||||||
|
notification.getValue().close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() {
|
||||||
|
return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
|
||||||
|
DFSClient client = getDfsClient(key.userId);
|
||||||
|
DFSInputStream dis = client.open(key.inodePath);
|
||||||
|
return new FSDataInputStream(dis);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
DFSClient getDfsClient(String userName) {
|
||||||
DFSClient client = null;
|
DFSClient client = null;
|
||||||
try {
|
try {
|
||||||
client = clientCache.get(userName);
|
client = clientCache.get(userName);
|
||||||
|
@ -105,4 +178,21 @@ class DFSClientCache {
|
||||||
}
|
}
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FSDataInputStream getDfsInputStream(String userName, String inodePath) {
|
||||||
|
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
|
||||||
|
FSDataInputStream s = null;
|
||||||
|
try {
|
||||||
|
s = inputstreamCache.get(k);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
LOG.warn("Failed to create DFSInputStream for user:" + userName
|
||||||
|
+ " Cause:" + e);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void invalidateDfsInputStream(String userName, String inodePath) {
|
||||||
|
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
|
||||||
|
inputstreamCache.invalidate(k);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
@ -235,7 +234,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -310,7 +309,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
|
public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
|
||||||
InetAddress client) {
|
InetAddress client) {
|
||||||
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
|
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -392,7 +391,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -454,7 +453,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -502,7 +501,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -563,13 +562,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
public READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
||||||
InetAddress client) {
|
InetAddress client) {
|
||||||
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
|
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
|
||||||
|
final String userName = securityHandler.getUser();
|
||||||
|
|
||||||
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
|
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
|
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(userName);
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -628,11 +628,28 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count);
|
int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count);
|
||||||
byte[] readbuffer = new byte[buffSize];
|
byte[] readbuffer = new byte[buffSize];
|
||||||
|
|
||||||
DFSInputStream is = dfsClient.open(Nfs3Utils.getFileIdPath(handle));
|
int readCount = 0;
|
||||||
FSDataInputStream fis = new FSDataInputStream(is);
|
/**
|
||||||
|
* Retry exactly once because the DFSInputStream can be stale.
|
||||||
int readCount = fis.read(offset, readbuffer, 0, count);
|
*/
|
||||||
fis.close();
|
for (int i = 0; i < 1; ++i) {
|
||||||
|
FSDataInputStream fis = clientCache.getDfsInputStream(userName,
|
||||||
|
Nfs3Utils.getFileIdPath(handle));
|
||||||
|
|
||||||
|
try {
|
||||||
|
readCount = fis.read(offset, readbuffer, 0, count);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO: A cleaner way is to throw a new type of exception
|
||||||
|
// which requires incompatible changes.
|
||||||
|
if (e.getMessage() == "Stream closed") {
|
||||||
|
clientCache.invalidateDfsInputStream(userName,
|
||||||
|
Nfs3Utils.getFileIdPath(handle));
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle),
|
attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle),
|
||||||
iug);
|
iug);
|
||||||
|
@ -660,7 +677,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
SecurityHandler securityHandler, InetAddress client) {
|
SecurityHandler securityHandler, InetAddress client) {
|
||||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -735,7 +752,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
|
public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
|
||||||
InetAddress client) {
|
InetAddress client) {
|
||||||
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
|
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -858,7 +875,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
|
public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
|
||||||
InetAddress client) {
|
InetAddress client) {
|
||||||
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
|
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -954,7 +971,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public REMOVE3Response remove(XDR xdr,
|
public REMOVE3Response remove(XDR xdr,
|
||||||
SecurityHandler securityHandler, InetAddress client) {
|
SecurityHandler securityHandler, InetAddress client) {
|
||||||
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
|
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1029,7 +1046,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
|
public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
|
||||||
InetAddress client) {
|
InetAddress client) {
|
||||||
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
|
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1111,7 +1128,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
|
public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
|
||||||
InetAddress client) {
|
InetAddress client) {
|
||||||
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
|
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1205,7 +1222,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1293,7 +1310,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1430,7 +1447,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
|
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
|
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
}
|
}
|
||||||
|
@ -1587,7 +1604,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1645,7 +1662,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1697,7 +1714,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
@ -1738,7 +1755,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
|
public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
|
||||||
SecurityHandler securityHandler, InetAddress client) {
|
SecurityHandler securityHandler, InetAddress client) {
|
||||||
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
|
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
|
||||||
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
|
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||||
if (dfsClient == null) {
|
if (dfsClient == null) {
|
||||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||||
return response;
|
return response;
|
||||||
|
|
|
@ -39,12 +39,12 @@ public class TestDFSClientCache {
|
||||||
|
|
||||||
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
|
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
|
||||||
|
|
||||||
DFSClient c1 = cache.get("test1");
|
DFSClient c1 = cache.getDfsClient("test1");
|
||||||
assertTrue(cache.get("test1").toString().contains("ugi=test1"));
|
assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1"));
|
||||||
assertEquals(c1, cache.get("test1"));
|
assertEquals(c1, cache.getDfsClient("test1"));
|
||||||
assertFalse(isDfsClientClose(c1));
|
assertFalse(isDfsClientClose(c1));
|
||||||
|
|
||||||
cache.get("test2");
|
cache.getDfsClient("test2");
|
||||||
assertTrue(isDfsClientClose(c1));
|
assertTrue(isDfsClientClose(c1));
|
||||||
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
|
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,6 +196,9 @@ Release 2.2.1 - UNRELEASED
|
||||||
HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers
|
HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers
|
||||||
post HDFS-5306. (atm)
|
post HDFS-5306. (atm)
|
||||||
|
|
||||||
|
HDFS-5171. NFS should create input stream for a file and try to share it
|
||||||
|
with multiple read requests. (Haohui Mai via brandonli)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
Loading…
Reference in New Issue