HDFS-13509. Bug fix for breakHardlinks() of ReplicaInfo/LocalReplica, and fix TestFileAppend failures on Windows. Contributed by Xiao Liang.
(cherry picked from commit eb7fe1d588
)
This commit is contained in:
parent
36fc9a6939
commit
ce62991648
|
@ -186,16 +186,18 @@ abstract public class LocalReplica extends ReplicaInfo {
|
||||||
final FileIoProvider fileIoProvider = getFileIoProvider();
|
final FileIoProvider fileIoProvider = getFileIoProvider();
|
||||||
final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
|
final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
|
||||||
getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
|
getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
|
||||||
try (FileInputStream in = fileIoProvider.getFileInputStream(
|
try {
|
||||||
getVolume(), file)) {
|
try (FileInputStream in = fileIoProvider.getFileInputStream(
|
||||||
try (FileOutputStream out = fileIoProvider.getFileOutputStream(
|
getVolume(), file)) {
|
||||||
getVolume(), tmpFile)) {
|
try (FileOutputStream out = fileIoProvider.getFileOutputStream(
|
||||||
IOUtils.copyBytes(in, out, 16 * 1024);
|
getVolume(), tmpFile)) {
|
||||||
|
IOUtils.copyBytes(in, out, 16 * 1024);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (file.length() != tmpFile.length()) {
|
if (file.length() != tmpFile.length()) {
|
||||||
throw new IOException("Copy of file " + file + " size " + file.length()+
|
throw new IOException("Copy of file " + file + " size " + file.length()
|
||||||
" into file " + tmpFile +
|
+ " into file " + tmpFile + " resulted in a size of "
|
||||||
" resulted in a size of " + tmpFile.length());
|
+ tmpFile.length());
|
||||||
}
|
}
|
||||||
fileIoProvider.replaceFile(getVolume(), tmpFile, file);
|
fileIoProvider.replaceFile(getVolume(), tmpFile, file);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -120,7 +121,9 @@ public class TestFileAppend{
|
||||||
@Test
|
@Test
|
||||||
public void testBreakHardlinksIfNeeded() throws IOException {
|
public void testBreakHardlinksIfNeeded() throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
InetSocketAddress addr = new InetSocketAddress("localhost",
|
InetSocketAddress addr = new InetSocketAddress("localhost",
|
||||||
cluster.getNameNodePort());
|
cluster.getNameNodePort());
|
||||||
|
@ -186,7 +189,9 @@ public class TestFileAppend{
|
||||||
public void testSimpleFlush() throws IOException {
|
public void testSimpleFlush() throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -239,7 +244,9 @@ public class TestFileAppend{
|
||||||
public void testComplexFlush() throws IOException {
|
public void testComplexFlush() throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -286,7 +293,9 @@ public class TestFileAppend{
|
||||||
@Test(expected = FileNotFoundException.class)
|
@Test(expected = FileNotFoundException.class)
|
||||||
public void testFileNotFound() throws IOException {
|
public void testFileNotFound() throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
Path file1 = new Path("/nonexistingfile.dat");
|
Path file1 = new Path("/nonexistingfile.dat");
|
||||||
|
@ -301,7 +310,9 @@ public class TestFileAppend{
|
||||||
@Test
|
@Test
|
||||||
public void testAppendTwice() throws Exception {
|
public void testAppendTwice() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
final FileSystem fs1 = cluster.getFileSystem();
|
final FileSystem fs1 = cluster.getFileSystem();
|
||||||
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
|
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
|
||||||
try {
|
try {
|
||||||
|
@ -340,7 +351,9 @@ public class TestFileAppend{
|
||||||
@Test
|
@Test
|
||||||
public void testAppend2Twice() throws Exception {
|
public void testAppend2Twice() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
final DistributedFileSystem fs1 = cluster.getFileSystem();
|
final DistributedFileSystem fs1 = cluster.getFileSystem();
|
||||||
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
|
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
|
||||||
try {
|
try {
|
||||||
|
@ -386,8 +399,9 @@ public class TestFileAppend{
|
||||||
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
||||||
false);
|
false);
|
||||||
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
.numDataNodes(4).build();
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf,
|
||||||
|
builderBaseDir).numDataNodes(4).build();
|
||||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
final Path p = new Path("/testMultipleAppend/foo");
|
final Path p = new Path("/testMultipleAppend/foo");
|
||||||
|
@ -438,8 +452,9 @@ public class TestFileAppend{
|
||||||
final long softLimit = 1L;
|
final long softLimit = 1L;
|
||||||
final long hardLimit = 9999999L;
|
final long hardLimit = 9999999L;
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
.build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.numDataNodes(1).build();
|
||||||
cluster.setLeasePeriod(softLimit, hardLimit);
|
cluster.setLeasePeriod(softLimit, hardLimit);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
@ -478,8 +493,9 @@ public class TestFileAppend{
|
||||||
final long softLimit = 1L;
|
final long softLimit = 1L;
|
||||||
final long hardLimit = 9999999L;
|
final long hardLimit = 9999999L;
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
.build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.numDataNodes(1).build();
|
||||||
cluster.setLeasePeriod(softLimit, hardLimit);
|
cluster.setLeasePeriod(softLimit, hardLimit);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
@ -525,8 +541,9 @@ public class TestFileAppend{
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
|
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
|
||||||
"false");
|
"false");
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
.build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.numDataNodes(3).build();
|
||||||
DistributedFileSystem fs = null;
|
DistributedFileSystem fs = null;
|
||||||
try {
|
try {
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
|
@ -578,8 +595,9 @@ public class TestFileAppend{
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
|
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
|
||||||
"false");
|
"false");
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
.build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.numDataNodes(3).build();
|
||||||
DistributedFileSystem fs = null;
|
DistributedFileSystem fs = null;
|
||||||
final String hello = "hello\n";
|
final String hello = "hello\n";
|
||||||
try {
|
try {
|
||||||
|
@ -650,8 +668,9 @@ public class TestFileAppend{
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
conf.setInt("dfs.min.replication", 1);
|
conf.setInt("dfs.min.replication", 1);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
.build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.numDataNodes(1).build();
|
||||||
try {
|
try {
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
Path fileName = new Path("/appendCorruptBlock");
|
Path fileName = new Path("/appendCorruptBlock");
|
||||||
|
@ -676,7 +695,9 @@ public class TestFileAppend{
|
||||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
conf.setInt("dfs.min.replication", 1);
|
conf.setInt("dfs.min.replication", 1);
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
|
||||||
|
.build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
@ -693,9 +714,9 @@ public class TestFileAppend{
|
||||||
// Call FsDatasetImpl#append to append the block file,
|
// Call FsDatasetImpl#append to append the block file,
|
||||||
// which converts it to a rbw replica.
|
// which converts it to a rbw replica.
|
||||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
|
||||||
long newGS = block.getGenerationStamp()+1;
|
long newGS = block.getGenerationStamp() + 1;
|
||||||
ReplicaHandler
|
ReplicaHandler replicaHandler =
|
||||||
replicaHandler = dataSet.append(block, newGS, initialFileLength);
|
dataSet.append(block, newGS, initialFileLength);
|
||||||
|
|
||||||
// write data to block file
|
// write data to block file
|
||||||
ReplicaBeingWritten rbw =
|
ReplicaBeingWritten rbw =
|
||||||
|
@ -711,9 +732,8 @@ public class TestFileAppend{
|
||||||
|
|
||||||
// update checksum file
|
// update checksum file
|
||||||
final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||||
FsDatasetUtil.computeChecksum(
|
FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(),
|
||||||
rbw.getMetaFile(), rbw.getMetaFile(), rbw.getBlockFile(),
|
rbw.getBlockFile(), smallBufferSize, conf);
|
||||||
smallBufferSize, conf);
|
|
||||||
|
|
||||||
// read the block
|
// read the block
|
||||||
// the DataNode BlockSender should read from the rbw replica's in-memory
|
// the DataNode BlockSender should read from the rbw replica's in-memory
|
||||||
|
@ -725,5 +745,4 @@ public class TestFileAppend{
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue