HADOOP-16049. DistCp result has data and checksum mismatch when blocks per chunk > 0.
Contributed by Kai Xie.
This commit is contained in:
parent
338dbbedf7
commit
6d3e7a8570
|
@ -257,7 +257,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
boolean finished = false;
|
||||
try {
|
||||
inStream = getInputStream(source, context.getConfiguration());
|
||||
int bytesRead = readBytes(inStream, buf, sourceOffset);
|
||||
seekIfRequired(inStream, sourceOffset);
|
||||
int bytesRead = readBytes(inStream, buf);
|
||||
while (bytesRead >= 0) {
|
||||
if (chunkLength > 0 &&
|
||||
(totalBytesRead + bytesRead) >= chunkLength) {
|
||||
|
@ -273,7 +274,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
if (finished) {
|
||||
break;
|
||||
}
|
||||
bytesRead = readBytes(inStream, buf, sourceOffset);
|
||||
bytesRead = readBytes(inStream, buf);
|
||||
}
|
||||
outStream.close();
|
||||
outStream = null;
|
||||
|
@ -296,13 +297,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
context.setStatus(message.toString());
|
||||
}
|
||||
|
||||
private static int readBytes(ThrottledInputStream inStream, byte buf[],
|
||||
long position) throws IOException {
|
||||
private static int readBytes(ThrottledInputStream inStream, byte[] buf)
|
||||
throws IOException {
|
||||
try {
|
||||
if (position == 0) {
|
||||
return inStream.read(buf);
|
||||
} else {
|
||||
return inStream.read(position, buf, 0, buf.length);
|
||||
return inStream.read(buf);
|
||||
} catch (IOException e) {
|
||||
throw new CopyReadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void seekIfRequired(ThrottledInputStream inStream,
|
||||
long sourceOffset) throws IOException {
|
||||
try {
|
||||
if (sourceOffset != inStream.getPos()) {
|
||||
inStream.seek(sourceOffset);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new CopyReadException(e);
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.tools.util;
|
||||
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -33,7 +33,7 @@ import java.io.InputStream;
|
|||
* (Thus, while the read-rate might exceed the maximum for a given short interval,
|
||||
* the average tends towards the specified maximum, overall.)
|
||||
*/
|
||||
public class ThrottledInputStream extends InputStream {
|
||||
public class ThrottledInputStream extends InputStream implements Seekable {
|
||||
|
||||
private final InputStream rawStream;
|
||||
private final long maxBytesPerSec;
|
||||
|
@ -95,25 +95,6 @@ public class ThrottledInputStream extends InputStream {
|
|||
return readLen;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read bytes starting from the specified position. This requires rawStream is
|
||||
* an instance of {@link PositionedReadable}.
|
||||
*/
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
if (!(rawStream instanceof PositionedReadable)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"positioned read is not supported by the internal stream");
|
||||
}
|
||||
throttle();
|
||||
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
|
||||
offset, length);
|
||||
if (readLen != -1) {
|
||||
bytesRead += readLen;
|
||||
}
|
||||
return readLen;
|
||||
}
|
||||
|
||||
private void throttle() throws IOException {
|
||||
while (getBytesPerSec() > maxBytesPerSec) {
|
||||
try {
|
||||
|
@ -165,4 +146,29 @@ public class ThrottledInputStream extends InputStream {
|
|||
", totalSleepTime=" + totalSleepTime +
|
||||
'}';
|
||||
}
|
||||
|
||||
private void checkSeekable() throws IOException {
|
||||
if (!(rawStream instanceof Seekable)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"seek operations are unsupported by the internal stream");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
checkSeekable();
|
||||
((Seekable) rawStream).seek(pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPos() throws IOException {
|
||||
checkSeekable();
|
||||
return ((Seekable) rawStream).getPos();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
checkSeekable();
|
||||
return ((Seekable) rawStream).seekToNewSource(targetPos);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -27,6 +28,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -35,8 +38,10 @@ import org.apache.hadoop.security.Credentials;
|
|||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.tools.mapred.CopyMapper;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -44,20 +49,35 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
public class TestDistCpSync {
|
||||
private MiniDFSCluster cluster;
|
||||
private final Configuration conf = new HdfsConfiguration();
|
||||
private static MiniDFSCluster cluster;
|
||||
private static final short DATA_NUM = 1;
|
||||
private static final Configuration DFS_CONF = new HdfsConfiguration();
|
||||
|
||||
private Configuration conf;
|
||||
private DistributedFileSystem dfs;
|
||||
private DistCpOptions options;
|
||||
private final Path source = new Path("/source");
|
||||
private final Path target = new Path("/target");
|
||||
private final long BLOCK_SIZE = 1024;
|
||||
private final short DATA_NUM = 1;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(DFS_CONF)
|
||||
.numDataNodes(DATA_NUM)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build();
|
||||
cluster.waitActive();
|
||||
|
||||
public void init() throws Exception {
|
||||
conf = new HdfsConfiguration(DFS_CONF);
|
||||
dfs = cluster.getFileSystem();
|
||||
dfs.mkdirs(source);
|
||||
dfs.mkdirs(target);
|
||||
|
@ -72,11 +92,20 @@ public class TestDistCpSync {
|
|||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
IOUtils.cleanup(null, dfs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
public void cleanup() throws Exception {
|
||||
SnapshotManager snapshotManager = cluster
|
||||
.getNameNode()
|
||||
.getNamesystem()
|
||||
.getSnapshotManager();
|
||||
for (SnapshotInfo.Bean snapshot : snapshotManager.getSnapshots()) {
|
||||
dfs.deleteSnapshot(new Path(
|
||||
StringUtils.substringBefore(
|
||||
snapshot.getSnapshotDirectory(), ".snapshot")),
|
||||
snapshot.getSnapshotID());
|
||||
}
|
||||
dfs.delete(source, true);
|
||||
dfs.delete(target, true);
|
||||
IOUtils.cleanup(null, dfs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsShell;
|
||||
|
@ -27,6 +28,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -34,8 +37,10 @@ import org.apache.hadoop.mapreduce.Mapper;
|
|||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.tools.mapred.CopyMapper;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -53,15 +58,17 @@ import java.util.StringTokenizer;
|
|||
* Shared by "-rdiff s2 s1 src tgt" and "-rdiff s2 s1 tgt tgt"
|
||||
*/
|
||||
public abstract class TestDistCpSyncReverseBase {
|
||||
private MiniDFSCluster cluster;
|
||||
private final Configuration conf = new HdfsConfiguration();
|
||||
private static MiniDFSCluster cluster;
|
||||
private static final short DATA_NUM = 1;
|
||||
private static final Configuration DFS_CONF = new HdfsConfiguration();
|
||||
|
||||
private Configuration conf;
|
||||
private DistributedFileSystem dfs;
|
||||
private DistCpOptions options;
|
||||
private Path source;
|
||||
private boolean isSrcNotSameAsTgt = true;
|
||||
private final Path target = new Path("/target");
|
||||
private final long blockSize = 1024;
|
||||
private final short dataNum = 1;
|
||||
|
||||
abstract void initSourcePath();
|
||||
|
||||
|
@ -126,13 +133,25 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
isSrcNotSameAsTgt = srcNotSameAsTgt;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initSourcePath();
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNum).build();
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(DFS_CONF)
|
||||
.numDataNodes(DATA_NUM)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
initSourcePath();
|
||||
conf = new HdfsConfiguration(DFS_CONF);
|
||||
dfs = cluster.getFileSystem();
|
||||
if (isSrcNotSameAsTgt) {
|
||||
dfs.mkdirs(source);
|
||||
|
@ -149,11 +168,20 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
IOUtils.cleanup(null, dfs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
public void cleanup() throws Exception {
|
||||
SnapshotManager snapshotManager = cluster
|
||||
.getNameNode()
|
||||
.getNamesystem()
|
||||
.getSnapshotManager();
|
||||
for (SnapshotInfo.Bean snapshot : snapshotManager.getSnapshots()) {
|
||||
dfs.deleteSnapshot(new Path(
|
||||
StringUtils.substringBefore(
|
||||
snapshot.getSnapshotDirectory(), ".snapshot")),
|
||||
snapshot.getSnapshotID());
|
||||
}
|
||||
dfs.delete(source, true);
|
||||
dfs.delete(target, true);
|
||||
IOUtils.cleanup(null, dfs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -254,10 +282,10 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path f3 = new Path(d1, "f3");
|
||||
final Path f4 = new Path(d2, "f4");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0);
|
||||
DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 0);
|
||||
DFSTestUtil.createFile(dfs, f4, blockSize, dataNum, 0);
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 0);
|
||||
DFSTestUtil.createFile(dfs, f3, blockSize, DATA_NUM, 0);
|
||||
DFSTestUtil.createFile(dfs, f4, blockSize, DATA_NUM, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -297,7 +325,7 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path f1 = new Path(newfoo, "f1");
|
||||
dfs.delete(f1, true);
|
||||
numDeletedModified += 1; // delete ./foo/f1
|
||||
DFSTestUtil.createFile(dfs, f1, 2 * blockSize, dataNum, 0);
|
||||
DFSTestUtil.createFile(dfs, f1, 2 * blockSize, DATA_NUM, 0);
|
||||
DFSTestUtil.appendFile(dfs, f2, (int) blockSize);
|
||||
numDeletedModified += 1; // modify ./bar/f2
|
||||
dfs.rename(bar, new Path(dir, "foo"));
|
||||
|
@ -451,9 +479,9 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path f2 = new Path(foo, "f2");
|
||||
final Path f3 = new Path(bar, "f3");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 1L);
|
||||
DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 2L);
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 1L);
|
||||
DFSTestUtil.createFile(dfs, f3, blockSize, DATA_NUM, 2L);
|
||||
}
|
||||
|
||||
private void changeData2(Path dir) throws Exception {
|
||||
|
@ -495,9 +523,9 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path f2 = new Path(foo, "file");
|
||||
final Path f3 = new Path(bar, "file");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize * 2, dataNum, 1L);
|
||||
DFSTestUtil.createFile(dfs, f3, blockSize * 3, dataNum, 2L);
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize * 2, DATA_NUM, 1L);
|
||||
DFSTestUtil.createFile(dfs, f3, blockSize * 3, DATA_NUM, 2L);
|
||||
}
|
||||
|
||||
private void changeData3(Path dir) throws Exception {
|
||||
|
@ -543,7 +571,7 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path d2 = new Path(d1, "d2");
|
||||
final Path f1 = new Path(d2, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData4(Path dir) throws Exception {
|
||||
|
@ -594,8 +622,8 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path f1 = new Path(d1, "f1");
|
||||
final Path f2 = new Path(d2, "f2");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData5(Path dir) throws Exception {
|
||||
|
@ -694,8 +722,8 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path foo_f1 = new Path(foo, "f1");
|
||||
final Path bar_f1 = new Path(bar, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData6(Path dir) throws Exception {
|
||||
|
@ -736,8 +764,8 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
final Path foo_f1 = new Path(foo, "f1");
|
||||
final Path bar_f1 = new Path(bar, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData7(Path dir) throws Exception {
|
||||
|
@ -750,7 +778,7 @@ public abstract class TestDistCpSyncReverseBase {
|
|||
|
||||
int numDeletedAndModified = 0;
|
||||
dfs.rename(foo, foo2);
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize);
|
||||
dfs.rename(foo_f1, foo2_f2);
|
||||
/*
|
||||
|
@ -763,7 +791,7 @@ M ./foo
|
|||
+ ./foo/f2
|
||||
*/
|
||||
numDeletedAndModified += 1; // "M ./foo"
|
||||
DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, DATA_NUM, 0L);
|
||||
return numDeletedAndModified;
|
||||
}
|
||||
|
||||
|
@ -793,9 +821,9 @@ M ./foo
|
|||
final Path bar_f1 = new Path(bar, "f1");
|
||||
final Path d1_f1 = new Path(d1, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, d1_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, d1_f1, blockSize, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData8(Path dir, boolean createMiddleSnapshot)
|
||||
|
@ -813,8 +841,8 @@ M ./foo
|
|||
final Path bar1 = new Path(dir, "bar1");
|
||||
|
||||
int numDeletedAndModified = 0;
|
||||
DFSTestUtil.createFile(dfs, foo_f3, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, dataNum, 0L);
|
||||
DFSTestUtil.createFile(dfs, foo_f3, blockSize, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, DATA_NUM, 0L);
|
||||
dfs.rename(createdDir_f1, foo_f4);
|
||||
dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1
|
||||
numDeletedAndModified += 1; // modify ./c/foo/d1
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
|
@ -55,6 +56,10 @@ import org.junit.Assert;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
|
||||
public class TestCopyMapper {
|
||||
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
|
||||
private static List<Path> pathList = new ArrayList<Path>();
|
||||
|
@ -248,7 +253,11 @@ public class TestCopyMapper {
|
|||
|
||||
// do the distcp again with -update and -append option
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Configuration conf = getConfiguration();
|
||||
// set the buffer size to 1/10th the size of the file.
|
||||
conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
|
||||
DEFAULT_FILE_SIZE/10);
|
||||
StubContext stubContext = new StubContext(conf, null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
stubContext.getContext();
|
||||
// Enable append
|
||||
|
@ -257,6 +266,10 @@ public class TestCopyMapper {
|
|||
copyMapper.setup(context);
|
||||
|
||||
int numFiles = 0;
|
||||
MetricsRecordBuilder rb =
|
||||
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
|
||||
String readCounter = "ReadsFromLocalClient";
|
||||
long readsFromClient = getLongCounter(readCounter, rb);
|
||||
for (Path path: pathList) {
|
||||
if (fs.getFileStatus(path).isFile()) {
|
||||
numFiles++;
|
||||
|
@ -274,6 +287,15 @@ public class TestCopyMapper {
|
|||
.getValue());
|
||||
Assert.assertEquals(numFiles, stubContext.getReporter().
|
||||
getCounter(CopyMapper.Counter.COPY).getValue());
|
||||
rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
|
||||
/*
|
||||
* added as part of HADOOP-15292 to ensure that multiple readBlock()
|
||||
* operations are not performed to read a block from a single Datanode.
|
||||
* assert assumes that there is only one block per file, and that the number
|
||||
* of files appended to in appendSourceData() above is captured by the
|
||||
* variable numFiles.
|
||||
*/
|
||||
assertCounter(readCounter, readsFromClient + numFiles, rb);
|
||||
}
|
||||
|
||||
private void testCopy(boolean preserveChecksum) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue