HDFS-4971. Merge change r1525681 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1525688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-09-23 20:26:20 +00:00
parent e2bfdfc585
commit b35d226b62
7 changed files with 567 additions and 455 deletions

View File

@ -97,7 +97,7 @@ public class AsyncDataService {
void writeAsync(OpenFileCtx openFileCtx) { void writeAsync(OpenFileCtx openFileCtx) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling write back task for fileId: " LOG.debug("Scheduling write back task for fileId: "
+ openFileCtx.copyLatestAttr().getFileId()); + openFileCtx.getLatestAttr().getFileId());
} }
WriteBackTask wbTask = new WriteBackTask(openFileCtx); WriteBackTask wbTask = new WriteBackTask(openFileCtx);
execute(wbTask); execute(wbTask);
@ -125,7 +125,7 @@ public class AsyncDataService {
public String toString() { public String toString() {
// Called in AsyncDataService.execute for displaying error messages. // Called in AsyncDataService.execute for displaying error messages.
return "write back data for fileId" return "write back data for fileId"
+ openFileCtx.copyLatestAttr().getFileId() + " with nextOffset " + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
+ openFileCtx.getNextOffset(); + openFileCtx.getNextOffset();
} }

View File

@ -17,19 +17,34 @@
*/ */
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import java.util.Comparator;
import com.google.common.base.Preconditions;
/** /**
* OffsetRange is the range of read/write request. A single point (e.g.,[5,5]) * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
* is not a valid range. * is not a valid range.
*/ */
public class OffsetRange implements Comparable<OffsetRange> { public class OffsetRange {
public static final Comparator<OffsetRange> ReverseComparatorOnMin =
new Comparator<OffsetRange>() {
@Override
public int compare(OffsetRange o1, OffsetRange o2) {
if (o1.getMin() == o2.getMin()) {
return o1.getMax() < o2.getMax() ?
1 : (o1.getMax() > o2.getMax() ? -1 : 0);
} else {
return o1.getMin() < o2.getMin() ? 1 : -1;
}
}
};
private final long min; private final long min;
private final long max; private final long max;
OffsetRange(long min, long max) { OffsetRange(long min, long max) {
if ((min >= max) || (min < 0) || (max < 0)) { Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
throw new IllegalArgumentException("Wrong offset range: (" + min + ","
+ max + ")");
}
this.min = min; this.min = min;
this.max = max; this.max = max;
} }
@ -49,24 +64,10 @@ public class OffsetRange implements Comparable<OffsetRange> {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
assert (o instanceof OffsetRange); if (o instanceof OffsetRange) {
OffsetRange range = (OffsetRange) o; OffsetRange range = (OffsetRange) o;
return (min == range.getMin()) && (max == range.getMax()); return (min == range.getMin()) && (max == range.getMax());
} }
return false;
private static int compareTo(long left, long right) {
if (left < right) {
return -1;
} else if (left > right) {
return 1;
} else {
return 0;
}
}
@Override
public int compareTo(OffsetRange other) {
final int d = compareTo(min, other.getMin());
return d != 0 ? d : compareTo(max, other.getMax());
} }
} }

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import com.google.common.base.Preconditions;
/** /**
* WriteCtx saves the context of one write request, such as request, channel, * WriteCtx saves the context of one write request, such as request, channel,
* xid and reply status. * xid and reply status.
@ -49,13 +51,21 @@ class WriteCtx {
private final long offset; private final long offset;
private final int count; private final int count;
private final WriteStableHow stableHow; private final WriteStableHow stableHow;
private byte[] data; private volatile byte[] data;
private final Channel channel; private final Channel channel;
private final int xid; private final int xid;
private boolean replied; private boolean replied;
private DataState dataState; /**
* Data belonging to the same {@link OpenFileCtx} may be dumped to a file.
* After being dumped to the file, the corresponding {@link WriteCtx} records
* the dump file and the offset.
*/
private RandomAccessFile raf;
private long dumpFileOffset;
private volatile DataState dataState;
public DataState getDataState() { public DataState getDataState() {
return dataState; return dataState;
@ -65,11 +75,12 @@ class WriteCtx {
this.dataState = dataState; this.dataState = dataState;
} }
private RandomAccessFile raf; /**
private long dumpFileOffset; * Writing the data into a local file. After the writing, if
* {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set
// Return the dumped data size * {@link #dataState} to DUMPED.
public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) */
long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
throws IOException { throws IOException {
if (dataState != DataState.ALLOW_DUMP) { if (dataState != DataState.ALLOW_DUMP) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -84,37 +95,54 @@ class WriteCtx {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
} }
// it is possible that while we dump the data, the data is also being
// written back to HDFS. After dump, if the writing back has not finished
// yet, we change its flag to DUMPED and set the data to null. Otherwise
// this WriteCtx instance should have been removed from the buffer.
if (dataState == DataState.ALLOW_DUMP) {
synchronized (this) {
if (dataState == DataState.ALLOW_DUMP) {
data = null; data = null;
dataState = DataState.DUMPED; dataState = DataState.DUMPED;
return count; return count;
} }
}
}
return 0;
}
public FileHandle getHandle() { FileHandle getHandle() {
return handle; return handle;
} }
public long getOffset() { long getOffset() {
return offset; return offset;
} }
public int getCount() { int getCount() {
return count; return count;
} }
public WriteStableHow getStableHow() { WriteStableHow getStableHow() {
return stableHow; return stableHow;
} }
public byte[] getData() throws IOException { byte[] getData() throws IOException {
if (dataState != DataState.DUMPED) { if (dataState != DataState.DUMPED) {
if (data == null) { synchronized (this) {
throw new IOException("Data is not dumpted but has null:" + this); if (dataState != DataState.DUMPED) {
Preconditions.checkState(data != null);
return data;
} }
} else {
// read back
if (data != null) {
throw new IOException("Data is dumpted but not null");
} }
}
// read back from dumped file
this.loadData();
return data;
}
private void loadData() throws IOException {
Preconditions.checkState(data == null);
data = new byte[count]; data = new byte[count];
raf.seek(dumpFileOffset); raf.seek(dumpFileOffset);
int size = raf.read(data, 0, count); int size = raf.read(data, 0, count);
@ -123,8 +151,6 @@ class WriteCtx {
+ size + "bytes"); + size + "bytes");
} }
} }
return data;
}
Channel getChannel() { Channel getChannel() {
return channel; return channel;

View File

@ -67,8 +67,8 @@ public class WriteManager {
*/ */
private long streamTimeout; private long streamTimeout;
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
openFileMap.put(h, ctx); openFileMap.put(h, ctx);
@ -215,6 +215,10 @@ public class WriteManager {
LOG.info("Inactive stream, fileId=" + fileHandle.getFileId() LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset); + " commitOffset=" + commitOffset);
return true; return true;
} else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
LOG.info("Inactive stream with pending writes, fileId="
+ fileHandle.getFileId() + " commitOffset=" + commitOffset);
return false;
} }
assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR); assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
if (ret == OpenFileCtx.COMMIT_ERROR) { if (ret == OpenFileCtx.COMMIT_ERROR) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
@ -51,8 +52,9 @@ public class TestOffsetRange {
OffsetRange r3 = new OffsetRange(1, 3); OffsetRange r3 = new OffsetRange(1, 3);
OffsetRange r4 = new OffsetRange(3, 4); OffsetRange r4 = new OffsetRange(3, 4);
assertTrue(r2.compareTo(r3) == 0); assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
assertTrue(r2.compareTo(r1) == 1); assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
assertTrue(r2.compareTo(r4) == -1); assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
} }
} }

View File

@ -176,6 +176,9 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5212. Refactor RpcMessage and NFS3Response to support different HDFS-5212. Refactor RpcMessage and NFS3Response to support different
types of authentication information. (jing9) types of authentication information. (jing9)
HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and
jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES