HDFS-15875. Check whether file is being truncated before truncate (#2746)
(cherry picked from commit 6a55baeee4
)
This commit is contained in:
parent
cdaa64458d
commit
34f7562abe
|
@ -119,6 +119,7 @@ public class BlockRecoveryWorker {
|
||||||
List<BlockRecord> syncList = new ArrayList<>(locs.length);
|
List<BlockRecord> syncList = new ArrayList<>(locs.length);
|
||||||
int errorCount = 0;
|
int errorCount = 0;
|
||||||
int candidateReplicaCnt = 0;
|
int candidateReplicaCnt = 0;
|
||||||
|
DataNodeFaultInjector.get().delay();
|
||||||
|
|
||||||
// Check generation stamps, replica size and state. Replica must satisfy
|
// Check generation stamps, replica size and state. Replica must satisfy
|
||||||
// the following criteria to be included in syncList for recovery:
|
// the following criteria to be included in syncList for recovery:
|
||||||
|
|
|
@ -127,4 +127,9 @@ public class DataNodeFaultInjector {
|
||||||
* Used as a hook to inject intercept when re-register.
|
* Used as a hook to inject intercept when re-register.
|
||||||
*/
|
*/
|
||||||
public void blockUtilSendFullBlockReport() {}
|
public void blockUtilSendFullBlockReport() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Just delay a while.
|
||||||
|
*/
|
||||||
|
public void delay() {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
|
@ -111,6 +112,10 @@ final class FSDirTruncateOp {
|
||||||
+ truncatedBlock.getNumBytes();
|
+ truncatedBlock.getNumBytes();
|
||||||
if (newLength == truncateLength) {
|
if (newLength == truncateLength) {
|
||||||
return new TruncateResult(false, fsd.getAuditFileInfo(iip));
|
return new TruncateResult(false, fsd.getAuditFileInfo(iip));
|
||||||
|
} else {
|
||||||
|
throw new AlreadyBeingCreatedException(
|
||||||
|
RecoverLeaseOp.TRUNCATE_FILE.getExceptionMessage(src,
|
||||||
|
clientName, clientMachine, src + " is being truncated."));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2761,7 +2761,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
TRUNCATE_FILE,
|
TRUNCATE_FILE,
|
||||||
RECOVER_LEASE;
|
RECOVER_LEASE;
|
||||||
|
|
||||||
private String getExceptionMessage(String src, String holder,
|
public String getExceptionMessage(String src, String holder,
|
||||||
String clientMachine, String reason) {
|
String clientMachine, String reason) {
|
||||||
return "Failed to " + this + " " + src + " for " + holder +
|
return "Failed to " + this + " " + src + " for " + holder +
|
||||||
" on " + clientMachine + " because " + reason;
|
" on " + clientMachine + " because " + reason;
|
||||||
|
|
|
@ -33,6 +33,9 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
@ -218,6 +221,70 @@ public class TestFileTruncate {
|
||||||
fs.delete(dir, true);
|
fs.delete(dir, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test truncate twice together on a file.
|
||||||
|
*/
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testTruncateTwiceTogether() throws Exception {
|
||||||
|
|
||||||
|
Path dir = new Path("/testTruncateTwiceTogether");
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
final Path p = new Path(dir, "file");
|
||||||
|
final byte[] data = new byte[100 * BLOCK_SIZE];
|
||||||
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
|
writeContents(data, data.length, p);
|
||||||
|
|
||||||
|
DataNodeFaultInjector originInjector = DataNodeFaultInjector.get();
|
||||||
|
DataNodeFaultInjector injector = new DataNodeFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void delay() {
|
||||||
|
try {
|
||||||
|
// Bigger than soft lease period.
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Delay to recovery.
|
||||||
|
DataNodeFaultInjector.set(injector);
|
||||||
|
|
||||||
|
// Truncate by using different client name.
|
||||||
|
Thread t = new Thread(() -> {
|
||||||
|
String hdfsCacheDisableKey = "fs.hdfs.impl.disable.cache";
|
||||||
|
boolean originCacheDisable =
|
||||||
|
conf.getBoolean(hdfsCacheDisableKey, false);
|
||||||
|
try {
|
||||||
|
conf.setBoolean(hdfsCacheDisableKey, true);
|
||||||
|
FileSystem fs1 = FileSystem.get(conf);
|
||||||
|
fs1.truncate(p, data.length-1);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
} finally{
|
||||||
|
conf.setBoolean(hdfsCacheDisableKey, originCacheDisable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
t.start();
|
||||||
|
t.join();
|
||||||
|
NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
|
||||||
|
.setLeasePeriod(LOW_SOFTLIMIT, LOW_HARDLIMIT);
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(RemoteException.class,
|
||||||
|
"/testTruncateTwiceTogether/file is being truncated",
|
||||||
|
() -> fs.truncate(p, data.length - 2));
|
||||||
|
|
||||||
|
// wait for block recovery
|
||||||
|
checkBlockRecovery(p);
|
||||||
|
assertFileLength(p, data.length - 1);
|
||||||
|
|
||||||
|
DataNodeFaultInjector.set(originInjector);
|
||||||
|
NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
|
||||||
|
.setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
|
||||||
|
conf.getLong(DFSConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
|
||||||
|
DFSConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000);
|
||||||
|
fs.delete(dir, true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Truncate files and then run other operations such as
|
* Truncate files and then run other operations such as
|
||||||
* rename, set replication, set permission, etc.
|
* rename, set replication, set permission, etc.
|
||||||
|
@ -631,7 +698,7 @@ public class TestFileTruncate {
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
fs.truncate(p, 0);
|
fs.truncate(p, 0);
|
||||||
fail("Truncate must fail since a trancate is already in pregress.");
|
fail("Truncate must fail since a truncate is already in progress.");
|
||||||
} catch (IOException expected) {
|
} catch (IOException expected) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
"Failed to TRUNCATE_FILE", expected);
|
"Failed to TRUNCATE_FILE", expected);
|
||||||
|
|
Loading…
Reference in New Issue