HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response. Contributed by Brandon Li
This commit is contained in:
parent
171f2376d2
commit
b6f9d5538c
|
@ -372,7 +372,7 @@ public class IdUserGroup {
|
||||||
uid = getUid(user);
|
uid = getUid(user);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
uid = user.hashCode();
|
uid = user.hashCode();
|
||||||
LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid, e);
|
LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid);
|
||||||
}
|
}
|
||||||
return uid;
|
return uid;
|
||||||
}
|
}
|
||||||
|
@ -385,7 +385,7 @@ public class IdUserGroup {
|
||||||
gid = getGid(group);
|
gid = getGid(group);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
gid = group.hashCode();
|
gid = group.hashCode();
|
||||||
LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid, e);
|
LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid);
|
||||||
}
|
}
|
||||||
return gid;
|
return gid;
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,4 +57,7 @@ public class NfsConfigKeys {
|
||||||
|
|
||||||
public static final String AIX_COMPAT_MODE_KEY = "nfs.aix.compatibility.mode.enabled";
|
public static final String AIX_COMPAT_MODE_KEY = "nfs.aix.compatibility.mode.enabled";
|
||||||
public static final boolean AIX_COMPAT_MODE_DEFAULT = false;
|
public static final boolean AIX_COMPAT_MODE_DEFAULT = false;
|
||||||
|
|
||||||
|
public final static String LARGE_FILE_UPLOAD = "nfs.large.file.upload";
|
||||||
|
public final static boolean LARGE_FILE_UPLOAD_DEFAULT = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState;
|
import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState;
|
||||||
import org.apache.hadoop.io.BytesWritable.Comparator;
|
import org.apache.hadoop.io.BytesWritable.Comparator;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -77,7 +78,26 @@ class OpenFileCtx {
|
||||||
COMMIT_INACTIVE_CTX,
|
COMMIT_INACTIVE_CTX,
|
||||||
COMMIT_INACTIVE_WITH_PENDING_WRITE,
|
COMMIT_INACTIVE_WITH_PENDING_WRITE,
|
||||||
COMMIT_ERROR,
|
COMMIT_ERROR,
|
||||||
COMMIT_DO_SYNC;
|
COMMIT_DO_SYNC,
|
||||||
|
/**
|
||||||
|
* Deferred COMMIT response could fail file uploading. The following two
|
||||||
|
* status are introduced as a solution. 1. if client asks to commit
|
||||||
|
* non-sequential trunk of data, NFS gateway return success with the hope
|
||||||
|
* that client will send the prerequisite writes. 2. if client asks to
|
||||||
|
* commit a sequential trunk(means it can be flushed to HDFS), NFS gateway
|
||||||
|
* return a special error NFS3ERR_JUKEBOX indicating the client needs to
|
||||||
|
* retry. Meanwhile, NFS gateway keeps flush data to HDFS and do sync
|
||||||
|
* eventually.
|
||||||
|
*
|
||||||
|
* The reason to let client wait is that, we want the client to wait for the
|
||||||
|
* last commit. Otherwise, client thinks file upload finished (e.g., cp
|
||||||
|
* command returns success) but NFS could be still flushing staged data to
|
||||||
|
* HDFS. However, we don't know which one is the last commit. We make the
|
||||||
|
* assumption that a commit after sequential writes may be the last.
|
||||||
|
* Referring HDFS-7259 for more details.
|
||||||
|
* */
|
||||||
|
COMMIT_SPECIAL_WAIT, // scoped pending writes is sequential
|
||||||
|
COMMIT_SPECIAL_SUCCESS;// scoped pending writes is not sequential
|
||||||
}
|
}
|
||||||
|
|
||||||
private final DFSClient client;
|
private final DFSClient client;
|
||||||
|
@ -159,6 +179,7 @@ class OpenFileCtx {
|
||||||
private RandomAccessFile raf;
|
private RandomAccessFile raf;
|
||||||
private final String dumpFilePath;
|
private final String dumpFilePath;
|
||||||
private Daemon dumpThread;
|
private Daemon dumpThread;
|
||||||
|
private final boolean uploadLargeFile;
|
||||||
|
|
||||||
private void updateLastAccessTime() {
|
private void updateLastAccessTime() {
|
||||||
lastAccessTime = Time.monotonicNow();
|
lastAccessTime = Time.monotonicNow();
|
||||||
|
@ -200,12 +221,13 @@ class OpenFileCtx {
|
||||||
|
|
||||||
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
|
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
|
||||||
String dumpFilePath, DFSClient client, IdUserGroup iug) {
|
String dumpFilePath, DFSClient client, IdUserGroup iug) {
|
||||||
this(fos, latestAttr, dumpFilePath, client, iug, false);
|
this(fos, latestAttr, dumpFilePath, client, iug, false,
|
||||||
|
new NfsConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
|
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
|
||||||
String dumpFilePath, DFSClient client, IdUserGroup iug,
|
String dumpFilePath, DFSClient client, IdUserGroup iug,
|
||||||
boolean aixCompatMode) {
|
boolean aixCompatMode, NfsConfiguration config) {
|
||||||
this.fos = fos;
|
this.fos = fos;
|
||||||
this.latestAttr = latestAttr;
|
this.latestAttr = latestAttr;
|
||||||
this.aixCompatMode = aixCompatMode;
|
this.aixCompatMode = aixCompatMode;
|
||||||
|
@ -233,6 +255,8 @@ class OpenFileCtx {
|
||||||
dumpThread = null;
|
dumpThread = null;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.iug = iug;
|
this.iug = iug;
|
||||||
|
this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD,
|
||||||
|
NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Nfs3FileAttributes getLatestAttr() {
|
public Nfs3FileAttributes getLatestAttr() {
|
||||||
|
@ -781,12 +805,43 @@ class OpenFileCtx {
|
||||||
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
|
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (pendingWrites.isEmpty()) {
|
||||||
|
// Note that, there is no guarantee data is synced. Caller should still
|
||||||
|
// do a sync here though the output stream might be closed.
|
||||||
|
return COMMIT_STATUS.COMMIT_FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
long flushed = getFlushedOffset();
|
long flushed = getFlushedOffset();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
|
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle large file upload
|
||||||
|
if (uploadLargeFile && !aixCompatMode) {
|
||||||
|
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
|
||||||
|
.getKey().getMax() - 1;
|
||||||
|
|
||||||
|
if (co <= flushed) {
|
||||||
|
return COMMIT_STATUS.COMMIT_DO_SYNC;
|
||||||
|
} else if (co < nextOffset.get()) {
|
||||||
|
if (!fromRead) {
|
||||||
|
// let client retry the same request, add pending commit to sync later
|
||||||
|
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
|
||||||
|
preOpAttr);
|
||||||
|
pendingCommits.put(commitOffset, commitCtx);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("return COMMIT_SPECIAL_WAIT");
|
||||||
|
}
|
||||||
|
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
|
||||||
|
}
|
||||||
|
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (commitOffset > 0) {
|
if (commitOffset > 0) {
|
||||||
if (aixCompatMode) {
|
if (aixCompatMode) {
|
||||||
// The AIX NFS client misinterprets RFC-1813 and will always send 4096
|
// The AIX NFS client misinterprets RFC-1813 and will always send 4096
|
||||||
|
@ -817,11 +872,6 @@ class OpenFileCtx {
|
||||||
Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
|
Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
|
||||||
|
|
||||||
// Commit whole file, commitOffset == 0
|
// Commit whole file, commitOffset == 0
|
||||||
if (pendingWrites.isEmpty()) {
|
|
||||||
// Note that, there is no guarantee data is synced. TODO: We could still
|
|
||||||
// do a sync here though the output stream might be closed.
|
|
||||||
return COMMIT_STATUS.COMMIT_FINISHED;
|
|
||||||
} else {
|
|
||||||
if (!fromRead) {
|
if (!fromRead) {
|
||||||
// Insert commit
|
// Insert commit
|
||||||
long maxOffset = key.getKey().getMax() - 1;
|
long maxOffset = key.getKey().getMax() - 1;
|
||||||
|
@ -831,7 +881,6 @@ class OpenFileCtx {
|
||||||
}
|
}
|
||||||
return COMMIT_STATUS.COMMIT_WAIT;
|
return COMMIT_STATUS.COMMIT_WAIT;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private void addWrite(WriteCtx writeCtx) {
|
private void addWrite(WriteCtx writeCtx) {
|
||||||
long offset = writeCtx.getOffset();
|
long offset = writeCtx.getOffset();
|
||||||
|
|
|
@ -944,7 +944,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
// Add open stream
|
// Add open stream
|
||||||
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
|
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
|
||||||
writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
|
writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
|
||||||
aixCompatMode);
|
aixCompatMode, config);
|
||||||
fileHandle = new FileHandle(postOpObjAttr.getFileId());
|
fileHandle = new FileHandle(postOpObjAttr.getFileId());
|
||||||
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
|
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
|
||||||
LOG.warn("Can't add more stream, close it."
|
LOG.warn("Can't add more stream, close it."
|
||||||
|
|
|
@ -41,7 +41,7 @@ class WriteCtx {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In memory write data has 3 states. ALLOW_DUMP: not sequential write, still
|
* In memory write data has 3 states. ALLOW_DUMP: not sequential write, still
|
||||||
* wait for prerequisit writes. NO_DUMP: sequential write, no need to dump
|
* wait for prerequisite writes. NO_DUMP: sequential write, no need to dump
|
||||||
* since it will be written to HDFS soon. DUMPED: already dumped to a file.
|
* since it will be written to HDFS soon. DUMPED: already dumped to a file.
|
||||||
*/
|
*/
|
||||||
public static enum DataState {
|
public static enum DataState {
|
||||||
|
|
|
@ -178,7 +178,7 @@ public class WriteManager {
|
||||||
String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
|
String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
|
||||||
NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
|
NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
|
||||||
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
|
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
|
||||||
+ fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
|
+ fileHandle.getFileId(), dfsClient, iug, aixCompatMode, config);
|
||||||
|
|
||||||
if (!addOpenFileStream(fileHandle, openFileCtx)) {
|
if (!addOpenFileStream(fileHandle, openFileCtx)) {
|
||||||
LOG.info("Can't add new stream. Close it. Tell client to retry.");
|
LOG.info("Can't add new stream. Close it. Tell client to retry.");
|
||||||
|
@ -236,6 +236,7 @@ public class WriteManager {
|
||||||
status = Nfs3Status.NFS3ERR_IO;
|
status = Nfs3Status.NFS3ERR_IO;
|
||||||
break;
|
break;
|
||||||
case COMMIT_WAIT:
|
case COMMIT_WAIT:
|
||||||
|
case COMMIT_SPECIAL_WAIT:
|
||||||
/**
|
/**
|
||||||
* This should happen rarely in some possible cases, such as read
|
* This should happen rarely in some possible cases, such as read
|
||||||
* request arrives before DFSClient is able to quickly flush data to DN,
|
* request arrives before DFSClient is able to quickly flush data to DN,
|
||||||
|
@ -244,6 +245,10 @@ public class WriteManager {
|
||||||
*/
|
*/
|
||||||
status = Nfs3Status.NFS3ERR_JUKEBOX;
|
status = Nfs3Status.NFS3ERR_JUKEBOX;
|
||||||
break;
|
break;
|
||||||
|
case COMMIT_SPECIAL_SUCCESS:
|
||||||
|
// Read beyond eof could result in partial read
|
||||||
|
status = Nfs3Status.NFS3_OK;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.error("Should not get commit return code:" + ret.name());
|
LOG.error("Should not get commit return code:" + ret.name());
|
||||||
throw new RuntimeException("Should not get commit return code:"
|
throw new RuntimeException("Should not get commit return code:"
|
||||||
|
@ -278,6 +283,12 @@ public class WriteManager {
|
||||||
case COMMIT_WAIT:
|
case COMMIT_WAIT:
|
||||||
// Do nothing. Commit is async now.
|
// Do nothing. Commit is async now.
|
||||||
return;
|
return;
|
||||||
|
case COMMIT_SPECIAL_WAIT:
|
||||||
|
status = Nfs3Status.NFS3ERR_JUKEBOX;
|
||||||
|
break;
|
||||||
|
case COMMIT_SPECIAL_SUCCESS:
|
||||||
|
status = Nfs3Status.NFS3_OK;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.error("Should not get commit return code:" + ret.name());
|
LOG.error("Should not get commit return code:" + ret.name());
|
||||||
throw new RuntimeException("Should not get commit return code:"
|
throw new RuntimeException("Should not get commit return code:"
|
||||||
|
@ -288,8 +299,7 @@ public class WriteManager {
|
||||||
// Send out the response
|
// Send out the response
|
||||||
Nfs3FileAttributes postOpAttr = null;
|
Nfs3FileAttributes postOpAttr = null;
|
||||||
try {
|
try {
|
||||||
String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileId());
|
postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug);
|
||||||
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
|
|
||||||
} catch (IOException e1) {
|
} catch (IOException e1) {
|
||||||
LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1);
|
LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,8 +138,10 @@ public class TestWrites {
|
||||||
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||||
|
|
||||||
|
NfsConfiguration conf = new NfsConfiguration();
|
||||||
|
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
|
||||||
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
||||||
new IdUserGroup(new NfsConfiguration()));
|
new IdUserGroup(conf), false, conf);
|
||||||
|
|
||||||
COMMIT_STATUS ret;
|
COMMIT_STATUS ret;
|
||||||
|
|
||||||
|
@ -157,6 +159,7 @@ public class TestWrites {
|
||||||
// Test request with non zero commit offset
|
// Test request with non zero commit offset
|
||||||
ctx.setActiveStatusForTest(true);
|
ctx.setActiveStatusForTest(true);
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
||||||
|
ctx.setNextOffsetForTest(10);
|
||||||
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
||||||
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
// Do_SYNC state will be updated to FINISHED after data sync
|
// Do_SYNC state will be updated to FINISHED after data sync
|
||||||
|
@ -192,15 +195,85 @@ public class TestWrites {
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
|
||||||
|
// large file upload option.
|
||||||
|
public void testCheckCommitLargeFileUpload() throws IOException {
|
||||||
|
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||||
|
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
||||||
|
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||||
|
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||||
|
|
||||||
|
NfsConfiguration conf = new NfsConfiguration();
|
||||||
|
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
|
||||||
|
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
||||||
|
new IdUserGroup(conf), false, conf);
|
||||||
|
|
||||||
|
COMMIT_STATUS ret;
|
||||||
|
|
||||||
|
// Test inactive open file context
|
||||||
|
ctx.setActiveStatusForTest(false);
|
||||||
|
Channel ch = Mockito.mock(Channel.class);
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
|
||||||
|
|
||||||
|
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
|
||||||
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
|
||||||
|
|
||||||
|
// Test request with non zero commit offset
|
||||||
|
ctx.setActiveStatusForTest(true);
|
||||||
|
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
||||||
|
ctx.setNextOffsetForTest(10);
|
||||||
|
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
||||||
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
|
// Do_SYNC state will be updated to FINISHED after data sync
|
||||||
|
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||||
|
|
||||||
|
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
|
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||||
|
|
||||||
|
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
|
||||||
|
.getPendingCommitsForTest();
|
||||||
|
Assert.assertTrue(commits.size() == 0);
|
||||||
|
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
|
||||||
|
Assert.assertTrue(commits.size() == 0);
|
||||||
|
|
||||||
|
// Test request with zero commit offset
|
||||||
|
commits.remove(new Long(11));
|
||||||
|
// There is one pending write [5,10]
|
||||||
|
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
|
|
||||||
|
Mockito.when(fos.getPos()).thenReturn((long) 6);
|
||||||
|
ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
|
Assert.assertTrue(commits.size() == 1);
|
||||||
|
long key = commits.firstKey();
|
||||||
|
Assert.assertTrue(key == 8);
|
||||||
|
|
||||||
|
// Empty pending writes
|
||||||
|
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckCommitAixCompatMode() throws IOException {
|
public void testCheckCommitAixCompatMode() throws IOException {
|
||||||
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||||
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
||||||
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||||
|
|
||||||
// Last argument "true" here to enable AIX compatibility mode.
|
NfsConfiguration conf = new NfsConfiguration();
|
||||||
|
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
|
||||||
|
// Enable AIX compatibility mode.
|
||||||
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
||||||
new IdUserGroup(new NfsConfiguration()), true);
|
new IdUserGroup(new NfsConfiguration()), true, conf);
|
||||||
|
|
||||||
// Test fall-through to pendingWrites check in the event that commitOffset
|
// Test fall-through to pendingWrites check in the event that commitOffset
|
||||||
// is greater than the number of bytes we've so far flushed.
|
// is greater than the number of bytes we've so far flushed.
|
||||||
|
@ -210,6 +283,8 @@ public class TestWrites {
|
||||||
|
|
||||||
// Test the case when we actually have received more bytes than we're trying
|
// Test the case when we actually have received more bytes than we're trying
|
||||||
// to commit.
|
// to commit.
|
||||||
|
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
|
||||||
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
||||||
status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
||||||
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
|
@ -226,8 +301,9 @@ public class TestWrites {
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||||
NfsConfiguration config = new NfsConfiguration();
|
NfsConfiguration config = new NfsConfiguration();
|
||||||
|
|
||||||
|
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
|
||||||
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
||||||
new IdUserGroup(config));
|
new IdUserGroup(config), false, config);
|
||||||
|
|
||||||
FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
|
FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
|
||||||
COMMIT_STATUS ret;
|
COMMIT_STATUS ret;
|
||||||
|
@ -285,6 +361,75 @@ public class TestWrites {
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
|
||||||
|
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
|
||||||
|
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||||
|
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
||||||
|
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||||
|
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||||
|
NfsConfiguration config = new NfsConfiguration();
|
||||||
|
|
||||||
|
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
|
||||||
|
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
||||||
|
new IdUserGroup(config), false, config);
|
||||||
|
|
||||||
|
FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
|
||||||
|
COMMIT_STATUS ret;
|
||||||
|
WriteManager wm = new WriteManager(new IdUserGroup(config), config, false);
|
||||||
|
assertTrue(wm.addOpenFileStream(h, ctx));
|
||||||
|
|
||||||
|
// Test inactive open file context
|
||||||
|
ctx.setActiveStatusForTest(false);
|
||||||
|
Channel ch = Mockito.mock(Channel.class);
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
|
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
|
||||||
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
|
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
|
||||||
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
|
||||||
|
assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
|
// Test request with non zero commit offset
|
||||||
|
ctx.setActiveStatusForTest(true);
|
||||||
|
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
||||||
|
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
|
||||||
|
// Do_SYNC state will be updated to FINISHED after data sync
|
||||||
|
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
||||||
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
|
||||||
|
|
||||||
|
status = ctx.checkCommitInternal(10, ch, 1, attr, true);
|
||||||
|
assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
|
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
||||||
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
|
||||||
|
|
||||||
|
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
|
||||||
|
.getPendingCommitsForTest();
|
||||||
|
assertTrue(commits.size() == 0);
|
||||||
|
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
|
||||||
|
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
|
||||||
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
|
||||||
|
|
||||||
|
// Test request with zero commit offset
|
||||||
|
// There is one pending write [5,10]
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
||||||
|
assertEquals(0, commits.size());
|
||||||
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
|
// Empty pending writes
|
||||||
|
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
|
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
||||||
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
}
|
||||||
|
|
||||||
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
int waitedTime = 0;
|
int waitedTime = 0;
|
||||||
|
|
|
@ -976,6 +976,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo)
|
HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response.
|
||||||
|
(brandonli)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||||
|
|
Loading…
Reference in New Issue