HADOOP-15292. Distcp's use of pread is slowing it down.

Contributed by Virajith Jalaparti.

(cherry picked from commit 3bd6b1fd85)
This commit is contained in:
Steve Loughran 2018-03-08 11:18:33 +00:00
parent 56f14cb5fb
commit f0b486f6ae
3 changed files with 66 additions and 30 deletions

View File

@ -260,7 +260,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
boolean finished = false; boolean finished = false;
try { try {
inStream = getInputStream(source, context.getConfiguration()); inStream = getInputStream(source, context.getConfiguration());
int bytesRead = readBytes(inStream, buf, sourceOffset); seekIfRequired(inStream, sourceOffset);
int bytesRead = readBytes(inStream, buf);
while (bytesRead >= 0) { while (bytesRead >= 0) {
if (chunkLength > 0 && if (chunkLength > 0 &&
(totalBytesRead + bytesRead) >= chunkLength) { (totalBytesRead + bytesRead) >= chunkLength) {
@ -276,7 +277,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
if (finished) { if (finished) {
break; break;
} }
bytesRead = readBytes(inStream, buf, sourceOffset); bytesRead = readBytes(inStream, buf);
} }
outStream.close(); outStream.close();
outStream = null; outStream = null;
@ -299,13 +300,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
context.setStatus(message.toString()); context.setStatus(message.toString());
} }
private static int readBytes(ThrottledInputStream inStream, byte buf[], private static int readBytes(ThrottledInputStream inStream, byte buf[])
long position) throws IOException { throws IOException {
try { try {
if (position == 0) {
return inStream.read(buf); return inStream.read(buf);
} else { } catch (IOException e) {
return inStream.read(position, buf, 0, buf.length); throw new CopyReadException(e);
}
}
private static void seekIfRequired(ThrottledInputStream inStream,
long sourceOffset) throws IOException {
try {
if (sourceOffset != inStream.getPos()) {
inStream.seek(sourceOffset);
} }
} catch (IOException e) { } catch (IOException e) {
throw new CopyReadException(e); throw new CopyReadException(e);

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.tools.util; package org.apache.hadoop.tools.util;
import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -33,7 +33,7 @@ import java.io.InputStream;
* (Thus, while the read-rate might exceed the maximum for a given short interval, * (Thus, while the read-rate might exceed the maximum for a given short interval,
* the average tends towards the specified maximum, overall.) * the average tends towards the specified maximum, overall.)
*/ */
public class ThrottledInputStream extends InputStream { public class ThrottledInputStream extends InputStream implements Seekable {
private final InputStream rawStream; private final InputStream rawStream;
private final float maxBytesPerSec; private final float maxBytesPerSec;
@ -95,25 +95,6 @@ public class ThrottledInputStream extends InputStream {
return readLen; return readLen;
} }
/**
* Read bytes starting from the specified position. This requires rawStream is
* an instance of {@link PositionedReadable}.
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
if (!(rawStream instanceof PositionedReadable)) {
throw new UnsupportedOperationException(
"positioned read is not supported by the internal stream");
}
throttle();
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
offset, length);
if (readLen != -1) {
bytesRead += readLen;
}
return readLen;
}
private void throttle() throws IOException { private void throttle() throws IOException {
while (getBytesPerSec() > maxBytesPerSec) { while (getBytesPerSec() > maxBytesPerSec) {
try { try {
@ -165,4 +146,29 @@ public class ThrottledInputStream extends InputStream {
", totalSleepTime=" + totalSleepTime + ", totalSleepTime=" + totalSleepTime +
'}'; '}';
} }
private void checkSeekable() throws IOException {
if (!(rawStream instanceof Seekable)) {
throw new UnsupportedOperationException(
"seek operations are unsupported by the internal stream");
}
}
@Override
public void seek(long pos) throws IOException {
checkSeekable();
((Seekable) rawStream).seek(pos);
}
@Override
public long getPos() throws IOException {
checkSeekable();
return ((Seekable) rawStream).getPos();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
checkSeekable();
return ((Seekable) rawStream).seekToNewSource(targetPos);
}
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.CopyListingFileStatus;
@ -55,6 +56,10 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
public class TestCopyMapper { public class TestCopyMapper {
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
private static List<Path> pathList = new ArrayList<Path>(); private static List<Path> pathList = new ArrayList<Path>();
@ -248,7 +253,11 @@ public class TestCopyMapper {
// do the distcp again with -update and -append option // do the distcp again with -update and -append option
CopyMapper copyMapper = new CopyMapper(); CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0); Configuration conf = getConfiguration();
// set the buffer size to 1/10th the size of the file.
conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
DEFAULT_FILE_SIZE/10);
StubContext stubContext = new StubContext(conf, null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context = Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
stubContext.getContext(); stubContext.getContext();
// Enable append // Enable append
@ -257,6 +266,10 @@ public class TestCopyMapper {
copyMapper.setup(context); copyMapper.setup(context);
int numFiles = 0; int numFiles = 0;
MetricsRecordBuilder rb =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
String readCounter = "ReadsFromLocalClient";
long readsFromClient = getLongCounter(readCounter, rb);
for (Path path: pathList) { for (Path path: pathList) {
if (fs.getFileStatus(path).isFile()) { if (fs.getFileStatus(path).isFile()) {
numFiles++; numFiles++;
@ -274,6 +287,15 @@ public class TestCopyMapper {
.getValue()); .getValue());
Assert.assertEquals(numFiles, stubContext.getReporter(). Assert.assertEquals(numFiles, stubContext.getReporter().
getCounter(CopyMapper.Counter.COPY).getValue()); getCounter(CopyMapper.Counter.COPY).getValue());
rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
/*
* added as part of HADOOP-15292 to ensure that multiple readBlock()
* operations are not performed to read a block from a single Datanode.
* assert assumes that there is only one block per file, and that the number
* of files appended to in appendSourceData() above is captured by the
* variable numFiles.
*/
assertCounter(readCounter, readsFromClient + numFiles, rb);
} }
private void testCopy(boolean preserveChecksum) throws Exception { private void testCopy(boolean preserveChecksum) throws Exception {