Fix merge conflicts.
This commit is contained in:
parent
9593776e34
commit
1a8139e6ad
|
@ -1119,7 +1119,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
/**
|
/**
|
||||||
* Read data from one DataNode.
|
* Read data from one DataNode.
|
||||||
* @param datanode the datanode from which to read data
|
* @param datanode the datanode from which to read data
|
||||||
* @param block the block to read
|
* @param blockStartOffset starting offset in the file
|
||||||
* @param startInBlk the startInBlk offset of the block
|
* @param startInBlk the startInBlk offset of the block
|
||||||
* @param endInBlk the endInBlk offset of the block
|
* @param endInBlk the endInBlk offset of the block
|
||||||
* @param buf the given byte array into which the data is read
|
* @param buf the given byte array into which the data is read
|
||||||
|
@ -1149,7 +1149,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
BlockReader reader = null;
|
BlockReader reader = null;
|
||||||
try {
|
try {
|
||||||
DFSClientFaultInjector.get().fetchFromDatanodeException();
|
DFSClientFaultInjector.get().fetchFromDatanodeException();
|
||||||
reader = getBlockReader(block, start, len, datanode.addr,
|
reader = getBlockReader(block, startInBlk, len, datanode.addr,
|
||||||
datanode.storageType, datanode.info);
|
datanode.storageType, datanode.info);
|
||||||
for (int i = 0; i < offsets.length; i++) {
|
for (int i = 0; i < offsets.length; i++) {
|
||||||
int nread = reader.readAll(buf, offsets[i], lengths[i]);
|
int nread = reader.readAll(buf, offsets[i], lengths[i]);
|
||||||
|
@ -1206,8 +1206,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* with each other.
|
* with each other.
|
||||||
*/
|
*/
|
||||||
private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
|
private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
|
||||||
Preconditions.checkArgument(offsets.length == lengths.length &&
|
Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
|
||||||
offsets.length > 0);
|
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
for (int i = 0; i < lengths.length; i++) {
|
for (int i = 0; i < lengths.length; i++) {
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
|
|
|
@ -124,10 +124,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
||||||
for (short i = 0; i < numAllBlocks; i++) {
|
for (short i = 0; i < numAllBlocks; i++) {
|
||||||
StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
|
StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
|
||||||
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
|
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
|
||||||
i, stripeBlocks);
|
i, stripeBlocks, favoredNodes);
|
||||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
|
||||||
streamer.setFavoredNodes(favoredNodes);
|
|
||||||
}
|
|
||||||
s.add(streamer);
|
s.add(streamer);
|
||||||
}
|
}
|
||||||
streamers = Collections.unmodifiableList(s);
|
streamers = Collections.unmodifiableList(s);
|
||||||
|
@ -316,7 +313,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (StripedDataStreamer streamer : streamers) {
|
for (StripedDataStreamer streamer : streamers) {
|
||||||
streamer.setLastException(new IOException("Lease timeout of "
|
streamer.getLastException().set(new IOException("Lease timeout of "
|
||||||
+ (dfsClient.getConf().getHdfsTimeout()/1000) +
|
+ (dfsClient.getConf().getHdfsTimeout()/1000) +
|
||||||
" seconds expired."));
|
" seconds expired."));
|
||||||
}
|
}
|
||||||
|
@ -414,13 +411,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void closeImpl() throws IOException {
|
protected synchronized void closeImpl() throws IOException {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
IOException e = getLeadingStreamer().getLastException().getAndSet(null);
|
getLeadingStreamer().getLastException().check();
|
||||||
if (e != null) {
|
|
||||||
throw e;
|
|
||||||
} else {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// flush from all upper layers
|
// flush from all upper layers
|
||||||
|
|
|
@ -58,9 +58,10 @@ public class StripedDataStreamer extends DataStreamer {
|
||||||
Progressable progress, DataChecksum checksum,
|
Progressable progress, DataChecksum checksum,
|
||||||
AtomicReference<CachingStrategy> cachingStrategy,
|
AtomicReference<CachingStrategy> cachingStrategy,
|
||||||
ByteArrayManager byteArrayManage, short index,
|
ByteArrayManager byteArrayManage, short index,
|
||||||
List<BlockingQueue<LocatedBlock>> stripedBlocks) {
|
List<BlockingQueue<LocatedBlock>> stripedBlocks,
|
||||||
|
String[] favoredNodes) {
|
||||||
super(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
|
super(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
|
||||||
byteArrayManage);
|
byteArrayManage, favoredNodes);
|
||||||
this.index = index;
|
this.index = index;
|
||||||
this.stripedBlocks = stripedBlocks;
|
this.stripedBlocks = stripedBlocks;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue