HDFS-9092. Nfs silently drops overlapping write requests and causes data copying to fail. Contributed by Yongjun Zhang.

(cherry picked from commit 151fca5032)
This commit is contained in:
Yongjun Zhang 2015-09-28 18:45:00 -07:00
parent ac7d85efb2
commit 1a5f3e93c3
5 changed files with 260 additions and 62 deletions

View File

@ -70,4 +70,8 @@ public boolean equals(Object o) {
}
return false;
}
public String toString() {
return "[" + getMin() + ", " + getMax() + ")";
}
}

View File

@ -492,11 +492,11 @@ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
int count = request.getCount();
long smallerCount = offset + count - cachedOffset;
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+ " current offset %d," + " drop the overlapped section (%d-%d)"
+ " and append new data (%d-%d).", offset, (offset + count - 1),
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ count - 1)));
LOG.debug(String.format("Got overwrite with appended data [%d-%d),"
+ " current offset %d," + " drop the overlapped section [%d-%d)"
+ " and append new data [%d-%d).", offset, (offset + count),
cachedOffset, offset, cachedOffset, cachedOffset, (offset
+ count)));
}
ByteBuffer data = request.getData();
@ -510,6 +510,22 @@ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
request.setCount((int) smallerCount);
}
@VisibleForTesting
private static void trimWriteRequest(WriteCtx writeCtx,
long currentOffset) {
long offset = writeCtx.getOffset();
if (LOG.isDebugEnabled()) {
int count = writeCtx.getCount();
LOG.debug(String.format("Trim request [%d-%d),"
+ " current offset %d," + " drop the overlapped section [%d-%d)"
+ " and write new data [%d-%d)",
offset, (offset + count),
currentOffset, offset, (currentOffset),
currentOffset, (offset + count)));
}
writeCtx.trimWrite((int)(currentOffset - offset));
}
/**
* Creates and adds a WriteCtx into the pendingWrites map. This is a
* synchronized method to handle concurrent writes.
@ -529,23 +545,27 @@ private synchronized WriteCtx addWritesToCache(WRITE3Request request,
+ cachedOffset);
}
// Handle a special case first
// Ignore write request with range below the current offset
if (offset + count <= cachedOffset) {
LOG.warn(String.format("Got overwrite [%d-%d) smaller than"
+ " current offset %d," + " drop the request.",
offset, (offset + count), cachedOffset));
return null;
}
// Handle a special case: trim request whose offset is smaller than
// the current offset
if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
// One Linux client behavior: after a file is closed and reopened to
// write, the client sometimes combines previous written data(could still
// be in kernel buffer) with newly appended data in one write. This is
// usually the first write after file reopened. In this
// case, we log the event and drop the overlapped section.
LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+ " current offset %d," + " drop the overlapped section (%d-%d)"
+ " and append new data (%d-%d).", offset, (offset + count - 1),
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ count - 1)));
if (!pendingWrites.isEmpty()) {
LOG.warn("There are other pending writes, fail this jumbo write");
return null;
}
LOG.warn(String.format("Got overwrite with appended data [%d-%d),"
+ " current offset %d," + " drop the overlapped section [%d-%d)"
+ " and append new data [%d-%d).", offset, (offset + count),
cachedOffset, offset, cachedOffset, cachedOffset, (offset
+ count)));
LOG.warn("Modify this write to write only the appended data");
alterWriteRequest(request, cachedOffset);
@ -1011,45 +1031,56 @@ private synchronized WriteCtx offerNextToWrite() {
this.asyncStatus = false;
return null;
}
Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
OffsetRange range = lastEntry.getKey();
WriteCtx toWrite = lastEntry.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ nextOffset);
Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
OffsetRange range = lastEntry.getKey();
WriteCtx toWrite = lastEntry.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ nextOffset);
}
long offset = nextOffset.get();
if (range.getMin() > offset) {
if (LOG.isDebugEnabled()) {
LOG.debug("The next sequential write has not arrived yet");
}
long offset = nextOffset.get();
if (range.getMin() > offset) {
if (LOG.isDebugEnabled()) {
LOG.debug("The next sequential write has not arrived yet");
}
processCommits(nextOffset.get()); // handle race
this.asyncStatus = false;
} else if (range.getMin() < offset && range.getMax() > offset) {
// shouldn't happen since we do sync for overlapped concurrent writers
LOG.warn("Got an overlapping write (" + range.getMin() + ", "
+ range.getMax() + "), nextOffset=" + offset
+ ". Silently drop it now");
pendingWrites.remove(range);
processCommits(nextOffset.get()); // handle race
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+ ") from the list");
}
// after writing, remove the WriteCtx from cache
pendingWrites.remove(range);
// update nextOffset
nextOffset.addAndGet(toWrite.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug("Change nextOffset to " + nextOffset.get());
}
return toWrite;
processCommits(nextOffset.get()); // handle race
this.asyncStatus = false;
} else if (range.getMax() <= offset) {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove write " + range.toString()
+ " which is already written from the list");
}
// remove the WriteCtx from cache
pendingWrites.remove(range);
} else if (range.getMin() < offset && range.getMax() > offset) {
LOG.warn("Got an overlapping write " + range.toString()
+ ", nextOffset=" + offset
+ ". Remove and trim it");
pendingWrites.remove(range);
trimWriteRequest(toWrite, offset);
// update nextOffset
nextOffset.addAndGet(toWrite.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug("Change nextOffset (after trim) to " + nextOffset.get());
}
return toWrite;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove write " + range.toString()
+ " from the list");
}
// after writing, remove the WriteCtx from cache
pendingWrites.remove(range);
// update nextOffset
nextOffset.addAndGet(toWrite.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug("Change nextOffset to " + nextOffset.get());
}
return toWrite;
}
return null;
}
@ -1283,8 +1314,8 @@ synchronized void cleanup() {
WccAttr preOpAttr = latestAttr.getWccAttr();
while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey();
LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax()
+ "), nextOffset=" + nextOffset.get());
LOG.info("Fail pending write: " + key.toString()
+ ", nextOffset=" + nextOffset.get());
WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) {

View File

@ -51,8 +51,8 @@ public static enum DataState {
}
private final FileHandle handle;
private final long offset;
private final int count;
private long offset;
private int count;
/**
* Some clients can send a write that includes previously written data along
@ -61,13 +61,61 @@ public static enum DataState {
* request before it was modified to write only the new data.
* @see OpenFileCtx#addWritesToCache for more details
*/
private final int originalCount;
private int originalCount;
public static final int INVALID_ORIGINAL_COUNT = -1;
/**
* Overlapping Write Request Handling
* A write request can be in three states:
* s0. just created, with data != null
* s1. dumped as length "count", and data set to null
* s2. read back from dumped area as length "count"
*
* Write requests may have overlapping range, we detect this by comparing
* the data offset range of the request against the current offset of data
* already written to HDFS. There are two categories:
*
* 1. If the beginning part of a new write request data is already written
* due to an earlier request, we alter the new request by trimming this
* portion before the new request enters state s0, and the originalCount is
* remembered.
*
* 2. If the lower end of the write request range is beyond the current
* offset of data already written, we put the request into cache, and detect
* the overlapping when taking the request out from cache.
*
* For category 2, if we find out that a write request overlap with another,
* this write request is already in state s0, s1, or s3. We trim the
* beginning part of this request, by remembering the size of this portion
* as trimDelta. So the resulted offset of the write request is
* "offset + trimDelta" and the resulted size of the write request is
* "count - trimDelta".
*
* What important to notice is, if the request is in s1 when we do the
* trimming, the data dumped is of size "count", so when we load
* the data back from dumped area, we should set the position of the data
* buffer to trimDelta.
*/
private int trimDelta;
public int getOriginalCount() {
return originalCount;
}
public void trimWrite(int delta) {
Preconditions.checkState(delta < count);
if (LOG.isDebugEnabled()) {
LOG.debug("Trim write request by delta:" + delta + " " + toString());
}
synchronized(this) {
trimDelta = delta;
if (originalCount == INVALID_ORIGINAL_COUNT) {
originalCount = count;
}
trimData();
}
}
private final WriteStableHow stableHow;
private volatile ByteBuffer data;
@ -139,11 +187,17 @@ FileHandle getHandle() {
}
long getOffset() {
return offset;
synchronized(this) {
// See comment "Overlapping Write Request Handling" above
return offset + trimDelta;
}
}
int getCount() {
return count;
synchronized(this) {
// See comment "Overlapping Write Request Handling" above
return count - trimDelta;
}
}
WriteStableHow getStableHow() {
@ -174,7 +228,22 @@ private void loadData() throws IOException {
throw new IOException("Data count is " + count + ", but read back "
+ size + "bytes");
}
data = ByteBuffer.wrap(rawData);
synchronized(this) {
data = ByteBuffer.wrap(rawData);
trimData();
}
}
private void trimData() {
if (data != null && trimDelta > 0) {
// make it not dump-able since the data will be used
// shortly
dataState = DataState.NO_DUMP;
data.position(data.position() + trimDelta);
offset += trimDelta;
count -= trimDelta;
trimDelta = 0;
}
}
public void writeData(HdfsDataOutputStream fos) throws IOException {
@ -229,6 +298,7 @@ void setReplied(boolean replied) {
this.offset = offset;
this.count = count;
this.originalCount = originalCount;
this.trimDelta = 0;
this.stableHow = stableHow;
this.data = data;
this.channel = channel;

View File

@ -640,7 +640,97 @@ securityHandler, new InetSocketAddress("localhost", config.getInt(
}
}
}
@Test
public void testOverlappingWrites() throws IOException, InterruptedException {
NfsConfiguration config = new NfsConfiguration();
MiniDFSCluster cluster = null;
RpcProgramNfs3 nfsd;
final int bufSize = 32;
SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
Mockito.when(securityHandler.getUser()).thenReturn(
System.getProperty("user.name"));
String currentUser = System.getProperty("user.name");
config.set(
DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(currentUser),
"*");
config.set(
DefaultImpersonationProvider.getTestProvider().
getProxySuperuserIpConfKey(currentUser),
"*");
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
// Use emphral port in case tests are running in parallel
config.setInt("nfs3.mountd.port", 0);
config.setInt("nfs3.server.port", 0);
try {
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
Nfs3 nfs3 = new Nfs3(config);
nfs3.startServiceInternal(false);
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config),
config);
HdfsFileStatus status = dfsClient.getFileInfo("/");
FileHandle rootHandle = new FileHandle(status.getFileId());
CREATE3Request createReq = new CREATE3Request(rootHandle,
"overlapping-writes" + System.currentTimeMillis(),
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
XDR createXdr = new XDR();
createReq.serialize(createXdr);
CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
securityHandler, new InetSocketAddress("localhost", 1234));
FileHandle handle = createRsp.getObjHandle();
byte[] buffer = new byte[bufSize];
for (int i = 0; i < bufSize; i++) {
buffer[i] = (byte) i;
}
int[][] ranges = new int[][] {
{0, 10},
{5, 7},
{5, 5},
{10, 6},
{18, 6},
{20, 6},
{28, 4},
{16, 2},
{25, 4}
};
for (int i = 0; i < ranges.length; i++) {
int x[] = ranges[i];
byte[] tbuffer = new byte[x[1]];
for (int j = 0; j < x[1]; j++) {
tbuffer[j] = buffer[x[0] + j];
}
WRITE3Request writeReq = new WRITE3Request(handle, (long)x[0], x[1],
WriteStableHow.UNSTABLE, ByteBuffer.wrap(tbuffer));
XDR writeXdr = new XDR();
writeReq.serialize(writeXdr);
nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
new InetSocketAddress("localhost", 1234));
}
waitWrite(nfsd, handle, 60000);
READ3Request readReq = new READ3Request(handle, 0, bufSize);
XDR readXdr = new XDR();
readReq.serialize(readXdr);
READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
securityHandler, new InetSocketAddress("localhost", config.getInt(
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT)));
assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testCheckSequential() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);

View File

@ -1100,6 +1100,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin
P. McCabe via Lei (Eddy) Xu)
HDFS-9092. Nfs silently drops overlapping write requests and causes data
copying to fail. (Yongjun Zhang)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES