HDFS-5252. Merging change r1539740 form trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1539742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5406659b12
commit
f87469c7aa
|
@ -19,8 +19,11 @@ package org.apache.hadoop.nfs.nfs3.request;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* READ3 Request
|
||||
*/
|
||||
|
@ -34,6 +37,13 @@ public class READ3Request extends RequestWithHandle {
|
|||
count = xdr.readInt();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public READ3Request(FileHandle handle, long offset, int count) {
|
||||
super(handle);
|
||||
this.offset = offset;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
return this.offset;
|
||||
}
|
||||
|
@ -41,4 +51,11 @@ public class READ3Request extends RequestWithHandle {
|
|||
public int getCount() {
|
||||
return this.count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(XDR xdr) {
|
||||
handle.serialize(xdr);
|
||||
xdr.writeLongAsHyper(offset);
|
||||
xdr.writeInt(count);
|
||||
}
|
||||
}
|
|
@ -109,6 +109,12 @@ public class Nfs3Utils {
|
|||
* Send a write response to the netty network socket channel
|
||||
*/
|
||||
public static void writeChannel(Channel channel, XDR out, int xid) {
|
||||
if (channel == null) {
|
||||
RpcProgramNfs3.LOG
|
||||
.info("Null channel should only happen in tests. Do nothing.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
|
||||
RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
|
||||
}
|
||||
|
|
|
@ -1017,6 +1017,23 @@ class OpenFileCtx {
|
|||
}
|
||||
|
||||
if (!writeCtx.getReplied()) {
|
||||
if (stableHow != WriteStableHow.UNSTABLE) {
|
||||
LOG.info("Do sync for stable write:" + writeCtx);
|
||||
try {
|
||||
if (stableHow == WriteStableHow.DATA_SYNC) {
|
||||
fos.hsync();
|
||||
} else {
|
||||
Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC,
|
||||
"Unknown WriteStableHow:" + stableHow);
|
||||
// Sync file data and length
|
||||
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("hsync failed with writeCtx:" + writeCtx + " error:" + e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
WccAttr preOpAttr = latestAttr.getWccAttr();
|
||||
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
||||
if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
|
||||
|
|
|
@ -126,6 +126,8 @@ import org.jboss.netty.buffer.ChannelBuffers;
|
|||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
|
||||
*/
|
||||
|
@ -1975,4 +1977,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
WriteManager getWriteManager() {
|
||||
return this.writeManager;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
|
|||
import org.apache.hadoop.util.Daemon;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
|
@ -262,6 +263,11 @@ public class WriteManager {
|
|||
}
|
||||
return attr;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
|
||||
return this.openFileMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* StreamMonitor wakes up periodically to find and closes idle streams.
|
||||
|
|
|
@ -17,21 +17,41 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
||||
import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.request.READ3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
|
||||
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
|
||||
import org.apache.hadoop.nfs.nfs3.response.READ3Response;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.apache.hadoop.oncrpc.security.SecurityHandler;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
@ -105,7 +125,7 @@ public class TestWrites {
|
|||
Assert.assertTrue(limit - position == 1);
|
||||
Assert.assertTrue(appendedData.get(position) == (byte) 19);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
|
||||
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
|
||||
|
@ -162,4 +182,117 @@ public class TestWrites {
|
|||
ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
|
||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||
}
|
||||
|
||||
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
||||
throws InterruptedException {
|
||||
int waitedTime = 0;
|
||||
ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
|
||||
.getOpenFileMap();
|
||||
OpenFileCtx ctx = openFileMap.get(handle);
|
||||
assertTrue(ctx != null);
|
||||
do {
|
||||
Thread.sleep(3000);
|
||||
waitedTime += 3000;
|
||||
if (ctx.getPendingWritesForTest().size() == 0) {
|
||||
return;
|
||||
}
|
||||
} while (waitedTime < maxWaitTime);
|
||||
|
||||
fail("Write can't finish.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteStableHow() throws IOException, InterruptedException {
|
||||
HdfsConfiguration config = new HdfsConfiguration();
|
||||
DFSClient client = null;
|
||||
MiniDFSCluster cluster = null;
|
||||
RpcProgramNfs3 nfsd;
|
||||
SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
|
||||
Mockito.when(securityHandler.getUser()).thenReturn(
|
||||
System.getProperty("user.name"));
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
client = new DFSClient(NameNode.getAddress(config), config);
|
||||
|
||||
// Start nfs
|
||||
List<String> exports = new ArrayList<String>();
|
||||
exports.add("/");
|
||||
Nfs3 nfs3 = new Nfs3(exports, config);
|
||||
nfs3.start(false);
|
||||
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||
|
||||
HdfsFileStatus status = client.getFileInfo("/");
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId());
|
||||
// Create file1
|
||||
CREATE3Request createReq = new CREATE3Request(rootHandle, "file1",
|
||||
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
||||
XDR createXdr = new XDR();
|
||||
createReq.serialize(createXdr);
|
||||
CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
|
||||
securityHandler, InetAddress.getLocalHost());
|
||||
FileHandle handle = createRsp.getObjHandle();
|
||||
|
||||
// Test DATA_SYNC
|
||||
byte[] buffer = new byte[10];
|
||||
for (int i = 0; i < 10; i++) {
|
||||
buffer[i] = (byte) i;
|
||||
}
|
||||
WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
|
||||
WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
|
||||
XDR writeXdr = new XDR();
|
||||
writeReq.serialize(writeXdr);
|
||||
nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
|
||||
InetAddress.getLocalHost());
|
||||
|
||||
waitWrite(nfsd, handle, 60000);
|
||||
|
||||
// Readback
|
||||
READ3Request readReq = new READ3Request(handle, 0, 10);
|
||||
XDR readXdr = new XDR();
|
||||
readReq.serialize(readXdr);
|
||||
READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
|
||||
securityHandler, InetAddress.getLocalHost());
|
||||
|
||||
assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
|
||||
|
||||
// Test FILE_SYNC
|
||||
|
||||
// Create file2
|
||||
CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2",
|
||||
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
||||
XDR createXdr2 = new XDR();
|
||||
createReq2.serialize(createXdr2);
|
||||
CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
|
||||
securityHandler, InetAddress.getLocalHost());
|
||||
FileHandle handle2 = createRsp2.getObjHandle();
|
||||
|
||||
WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
|
||||
WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer));
|
||||
XDR writeXdr2 = new XDR();
|
||||
writeReq2.serialize(writeXdr2);
|
||||
nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
|
||||
InetAddress.getLocalHost());
|
||||
|
||||
waitWrite(nfsd, handle2, 60000);
|
||||
|
||||
// Readback
|
||||
READ3Request readReq2 = new READ3Request(handle2, 0, 10);
|
||||
XDR readXdr2 = new XDR();
|
||||
readReq2.serialize(readXdr2);
|
||||
READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
|
||||
securityHandler, InetAddress.getLocalHost());
|
||||
|
||||
assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
|
||||
// FILE_SYNC should sync the file size
|
||||
status = client.getFileInfo("/file2");
|
||||
assertTrue(status.getLen() == 10);
|
||||
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,6 +230,8 @@ Release 2.2.1 - UNRELEASED
|
|||
HDFS-5458. Datanode failed volume threshold ignored if exception is thrown
|
||||
in getDataDirsFromURIs. (Mike Mellenthin via wang)
|
||||
|
||||
HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
Loading…
Reference in New Issue