HDFS-9092. Nfs silently drops overlapping write requests and causes data copying to fail. Contributed by Yongjun Zhang.
This commit is contained in:
parent
5c3b663bf9
commit
151fca5032
|
@ -70,4 +70,8 @@ public class OffsetRange {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "[" + getMin() + ", " + getMax() + ")";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -490,11 +490,11 @@ class OpenFileCtx {
|
|||
int count = request.getCount();
|
||||
long smallerCount = offset + count - cachedOffset;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
|
||||
+ " current offset %d," + " drop the overlapped section (%d-%d)"
|
||||
+ " and append new data (%d-%d).", offset, (offset + count - 1),
|
||||
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
|
||||
+ count - 1)));
|
||||
LOG.debug(String.format("Got overwrite with appended data [%d-%d),"
|
||||
+ " current offset %d," + " drop the overlapped section [%d-%d)"
|
||||
+ " and append new data [%d-%d).", offset, (offset + count),
|
||||
cachedOffset, offset, cachedOffset, cachedOffset, (offset
|
||||
+ count)));
|
||||
}
|
||||
|
||||
ByteBuffer data = request.getData();
|
||||
|
@ -508,6 +508,22 @@ class OpenFileCtx {
|
|||
request.setCount((int) smallerCount);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
private static void trimWriteRequest(WriteCtx writeCtx,
|
||||
long currentOffset) {
|
||||
long offset = writeCtx.getOffset();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
int count = writeCtx.getCount();
|
||||
LOG.debug(String.format("Trim request [%d-%d),"
|
||||
+ " current offset %d," + " drop the overlapped section [%d-%d)"
|
||||
+ " and write new data [%d-%d)",
|
||||
offset, (offset + count),
|
||||
currentOffset, offset, (currentOffset),
|
||||
currentOffset, (offset + count)));
|
||||
}
|
||||
writeCtx.trimWrite((int)(currentOffset - offset));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and adds a WriteCtx into the pendingWrites map. This is a
|
||||
* synchronized method to handle concurrent writes.
|
||||
|
@ -527,23 +543,27 @@ class OpenFileCtx {
|
|||
+ cachedOffset);
|
||||
}
|
||||
|
||||
// Handle a special case first
|
||||
// Ignore write request with range below the current offset
|
||||
if (offset + count <= cachedOffset) {
|
||||
LOG.warn(String.format("Got overwrite [%d-%d) smaller than"
|
||||
+ " current offset %d," + " drop the request.",
|
||||
offset, (offset + count), cachedOffset));
|
||||
return null;
|
||||
}
|
||||
|
||||
// Handle a special case: trim request whose offset is smaller than
|
||||
// the current offset
|
||||
if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
|
||||
// One Linux client behavior: after a file is closed and reopened to
|
||||
// write, the client sometimes combines previous written data(could still
|
||||
// be in kernel buffer) with newly appended data in one write. This is
|
||||
// usually the first write after file reopened. In this
|
||||
// case, we log the event and drop the overlapped section.
|
||||
LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
|
||||
+ " current offset %d," + " drop the overlapped section (%d-%d)"
|
||||
+ " and append new data (%d-%d).", offset, (offset + count - 1),
|
||||
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
|
||||
+ count - 1)));
|
||||
|
||||
if (!pendingWrites.isEmpty()) {
|
||||
LOG.warn("There are other pending writes, fail this jumbo write");
|
||||
return null;
|
||||
}
|
||||
LOG.warn(String.format("Got overwrite with appended data [%d-%d),"
|
||||
+ " current offset %d," + " drop the overlapped section [%d-%d)"
|
||||
+ " and append new data [%d-%d).", offset, (offset + count),
|
||||
cachedOffset, offset, cachedOffset, cachedOffset, (offset
|
||||
+ count)));
|
||||
|
||||
LOG.warn("Modify this write to write only the appended data");
|
||||
alterWriteRequest(request, cachedOffset);
|
||||
|
@ -1002,45 +1022,56 @@ class OpenFileCtx {
|
|||
this.asyncStatus = false;
|
||||
return null;
|
||||
}
|
||||
|
||||
Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
|
||||
OffsetRange range = lastEntry.getKey();
|
||||
WriteCtx toWrite = lastEntry.getValue();
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
|
||||
+ nextOffset);
|
||||
|
||||
Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
|
||||
OffsetRange range = lastEntry.getKey();
|
||||
WriteCtx toWrite = lastEntry.getValue();
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
|
||||
+ nextOffset);
|
||||
}
|
||||
|
||||
long offset = nextOffset.get();
|
||||
if (range.getMin() > offset) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The next sequential write has not arrived yet");
|
||||
}
|
||||
|
||||
long offset = nextOffset.get();
|
||||
if (range.getMin() > offset) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The next sequential write has not arrived yet");
|
||||
}
|
||||
processCommits(nextOffset.get()); // handle race
|
||||
this.asyncStatus = false;
|
||||
} else if (range.getMin() < offset && range.getMax() > offset) {
|
||||
// shouldn't happen since we do sync for overlapped concurrent writers
|
||||
LOG.warn("Got an overlapping write (" + range.getMin() + ", "
|
||||
+ range.getMax() + "), nextOffset=" + offset
|
||||
+ ". Silently drop it now");
|
||||
pendingWrites.remove(range);
|
||||
processCommits(nextOffset.get()); // handle race
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
|
||||
+ ") from the list");
|
||||
}
|
||||
// after writing, remove the WriteCtx from cache
|
||||
pendingWrites.remove(range);
|
||||
// update nextOffset
|
||||
nextOffset.addAndGet(toWrite.getCount());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Change nextOffset to " + nextOffset.get());
|
||||
}
|
||||
return toWrite;
|
||||
processCommits(nextOffset.get()); // handle race
|
||||
this.asyncStatus = false;
|
||||
} else if (range.getMax() <= offset) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove write " + range.toString()
|
||||
+ " which is already written from the list");
|
||||
}
|
||||
|
||||
// remove the WriteCtx from cache
|
||||
pendingWrites.remove(range);
|
||||
} else if (range.getMin() < offset && range.getMax() > offset) {
|
||||
LOG.warn("Got an overlapping write " + range.toString()
|
||||
+ ", nextOffset=" + offset
|
||||
+ ". Remove and trim it");
|
||||
pendingWrites.remove(range);
|
||||
trimWriteRequest(toWrite, offset);
|
||||
// update nextOffset
|
||||
nextOffset.addAndGet(toWrite.getCount());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Change nextOffset (after trim) to " + nextOffset.get());
|
||||
}
|
||||
return toWrite;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove write " + range.toString()
|
||||
+ " from the list");
|
||||
}
|
||||
// after writing, remove the WriteCtx from cache
|
||||
pendingWrites.remove(range);
|
||||
// update nextOffset
|
||||
nextOffset.addAndGet(toWrite.getCount());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Change nextOffset to " + nextOffset.get());
|
||||
}
|
||||
return toWrite;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1272,8 +1303,8 @@ class OpenFileCtx {
|
|||
WccAttr preOpAttr = latestAttr.getWccAttr();
|
||||
while (!pendingWrites.isEmpty()) {
|
||||
OffsetRange key = pendingWrites.firstKey();
|
||||
LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax()
|
||||
+ "), nextOffset=" + nextOffset.get());
|
||||
LOG.info("Fail pending write: " + key.toString()
|
||||
+ ", nextOffset=" + nextOffset.get());
|
||||
|
||||
WriteCtx writeCtx = pendingWrites.remove(key);
|
||||
if (!writeCtx.getReplied()) {
|
||||
|
|
|
@ -51,8 +51,8 @@ class WriteCtx {
|
|||
}
|
||||
|
||||
private final FileHandle handle;
|
||||
private final long offset;
|
||||
private final int count;
|
||||
private long offset;
|
||||
private int count;
|
||||
|
||||
/**
|
||||
* Some clients can send a write that includes previously written data along
|
||||
|
@ -61,13 +61,61 @@ class WriteCtx {
|
|||
* request before it was modified to write only the new data.
|
||||
* @see OpenFileCtx#addWritesToCache for more details
|
||||
*/
|
||||
private final int originalCount;
|
||||
private int originalCount;
|
||||
public static final int INVALID_ORIGINAL_COUNT = -1;
|
||||
|
||||
/**
|
||||
* Overlapping Write Request Handling
|
||||
* A write request can be in three states:
|
||||
* s0. just created, with data != null
|
||||
* s1. dumped as length "count", and data set to null
|
||||
* s2. read back from dumped area as length "count"
|
||||
*
|
||||
* Write requests may have overlapping range, we detect this by comparing
|
||||
* the data offset range of the request against the current offset of data
|
||||
* already written to HDFS. There are two categories:
|
||||
*
|
||||
* 1. If the beginning part of a new write request data is already written
|
||||
* due to an earlier request, we alter the new request by trimming this
|
||||
* portion before the new request enters state s0, and the originalCount is
|
||||
* remembered.
|
||||
*
|
||||
* 2. If the lower end of the write request range is beyond the current
|
||||
* offset of data already written, we put the request into cache, and detect
|
||||
* the overlapping when taking the request out from cache.
|
||||
*
|
||||
* For category 2, if we find out that a write request overlap with another,
|
||||
* this write request is already in state s0, s1, or s3. We trim the
|
||||
* beginning part of this request, by remembering the size of this portion
|
||||
* as trimDelta. So the resulted offset of the write request is
|
||||
* "offset + trimDelta" and the resulted size of the write request is
|
||||
* "count - trimDelta".
|
||||
*
|
||||
* What important to notice is, if the request is in s1 when we do the
|
||||
* trimming, the data dumped is of size "count", so when we load
|
||||
* the data back from dumped area, we should set the position of the data
|
||||
* buffer to trimDelta.
|
||||
*/
|
||||
private int trimDelta;
|
||||
|
||||
public int getOriginalCount() {
|
||||
return originalCount;
|
||||
}
|
||||
|
||||
public void trimWrite(int delta) {
|
||||
Preconditions.checkState(delta < count);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Trim write request by delta:" + delta + " " + toString());
|
||||
}
|
||||
synchronized(this) {
|
||||
trimDelta = delta;
|
||||
if (originalCount == INVALID_ORIGINAL_COUNT) {
|
||||
originalCount = count;
|
||||
}
|
||||
trimData();
|
||||
}
|
||||
}
|
||||
|
||||
private final WriteStableHow stableHow;
|
||||
private volatile ByteBuffer data;
|
||||
|
||||
|
@ -139,11 +187,17 @@ class WriteCtx {
|
|||
}
|
||||
|
||||
long getOffset() {
|
||||
return offset;
|
||||
synchronized(this) {
|
||||
// See comment "Overlapping Write Request Handling" above
|
||||
return offset + trimDelta;
|
||||
}
|
||||
}
|
||||
|
||||
int getCount() {
|
||||
return count;
|
||||
synchronized(this) {
|
||||
// See comment "Overlapping Write Request Handling" above
|
||||
return count - trimDelta;
|
||||
}
|
||||
}
|
||||
|
||||
WriteStableHow getStableHow() {
|
||||
|
@ -174,7 +228,22 @@ class WriteCtx {
|
|||
throw new IOException("Data count is " + count + ", but read back "
|
||||
+ size + "bytes");
|
||||
}
|
||||
data = ByteBuffer.wrap(rawData);
|
||||
synchronized(this) {
|
||||
data = ByteBuffer.wrap(rawData);
|
||||
trimData();
|
||||
}
|
||||
}
|
||||
|
||||
private void trimData() {
|
||||
if (data != null && trimDelta > 0) {
|
||||
// make it not dump-able since the data will be used
|
||||
// shortly
|
||||
dataState = DataState.NO_DUMP;
|
||||
data.position(data.position() + trimDelta);
|
||||
offset += trimDelta;
|
||||
count -= trimDelta;
|
||||
trimDelta = 0;
|
||||
}
|
||||
}
|
||||
|
||||
public void writeData(HdfsDataOutputStream fos) throws IOException {
|
||||
|
@ -229,6 +298,7 @@ class WriteCtx {
|
|||
this.offset = offset;
|
||||
this.count = count;
|
||||
this.originalCount = originalCount;
|
||||
this.trimDelta = 0;
|
||||
this.stableHow = stableHow;
|
||||
this.data = data;
|
||||
this.channel = channel;
|
||||
|
|
|
@ -640,7 +640,97 @@ public class TestWrites {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOverlappingWrites() throws IOException, InterruptedException {
|
||||
NfsConfiguration config = new NfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
RpcProgramNfs3 nfsd;
|
||||
final int bufSize = 32;
|
||||
SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
|
||||
Mockito.when(securityHandler.getUser()).thenReturn(
|
||||
System.getProperty("user.name"));
|
||||
String currentUser = System.getProperty("user.name");
|
||||
config.set(
|
||||
DefaultImpersonationProvider.getTestProvider().
|
||||
getProxySuperuserGroupConfKey(currentUser),
|
||||
"*");
|
||||
config.set(
|
||||
DefaultImpersonationProvider.getTestProvider().
|
||||
getProxySuperuserIpConfKey(currentUser),
|
||||
"*");
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
|
||||
// Use emphral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
||||
Nfs3 nfs3 = new Nfs3(config);
|
||||
nfs3.startServiceInternal(false);
|
||||
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||
|
||||
DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config),
|
||||
config);
|
||||
HdfsFileStatus status = dfsClient.getFileInfo("/");
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId());
|
||||
|
||||
CREATE3Request createReq = new CREATE3Request(rootHandle,
|
||||
"overlapping-writes" + System.currentTimeMillis(),
|
||||
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
||||
XDR createXdr = new XDR();
|
||||
createReq.serialize(createXdr);
|
||||
CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
|
||||
securityHandler, new InetSocketAddress("localhost", 1234));
|
||||
FileHandle handle = createRsp.getObjHandle();
|
||||
byte[] buffer = new byte[bufSize];
|
||||
for (int i = 0; i < bufSize; i++) {
|
||||
buffer[i] = (byte) i;
|
||||
}
|
||||
int[][] ranges = new int[][] {
|
||||
{0, 10},
|
||||
{5, 7},
|
||||
{5, 5},
|
||||
{10, 6},
|
||||
{18, 6},
|
||||
{20, 6},
|
||||
{28, 4},
|
||||
{16, 2},
|
||||
{25, 4}
|
||||
};
|
||||
for (int i = 0; i < ranges.length; i++) {
|
||||
int x[] = ranges[i];
|
||||
byte[] tbuffer = new byte[x[1]];
|
||||
for (int j = 0; j < x[1]; j++) {
|
||||
tbuffer[j] = buffer[x[0] + j];
|
||||
}
|
||||
WRITE3Request writeReq = new WRITE3Request(handle, (long)x[0], x[1],
|
||||
WriteStableHow.UNSTABLE, ByteBuffer.wrap(tbuffer));
|
||||
XDR writeXdr = new XDR();
|
||||
writeReq.serialize(writeXdr);
|
||||
nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
|
||||
new InetSocketAddress("localhost", 1234));
|
||||
}
|
||||
|
||||
waitWrite(nfsd, handle, 60000);
|
||||
READ3Request readReq = new READ3Request(handle, 0, bufSize);
|
||||
XDR readXdr = new XDR();
|
||||
readReq.serialize(readXdr);
|
||||
READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
|
||||
securityHandler, new InetSocketAddress("localhost", config.getInt(
|
||||
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
|
||||
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT)));
|
||||
|
||||
assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckSequential() throws IOException {
|
||||
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||
|
|
|
@ -1447,6 +1447,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin
|
||||
P. McCabe via Lei (Eddy) Xu)
|
||||
|
||||
HDFS-9092. Nfs silently drops overlapping write requests and causes data
|
||||
copying to fail. (Yongjun Zhang)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
Loading…
Reference in New Issue