HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE blocks.
(cherry picked from commit b10d8ced21
)
This commit is contained in:
parent
1d15c90233
commit
0ad3c51dfb
|
@ -34,7 +34,6 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.lang.reflect.Proxy;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
@ -168,6 +167,7 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
|
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
@ -182,16 +182,15 @@ import org.apache.hadoop.util.DataChecksum.Type;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.htrace.core.TraceScope;
|
import org.apache.htrace.core.TraceScope;
|
||||||
|
import org.apache.htrace.core.Tracer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.net.InetAddresses;
|
import com.google.common.net.InetAddresses;
|
||||||
import org.apache.htrace.core.Tracer;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/********************************************************
|
/********************************************************
|
||||||
* DFSClient can connect to a Hadoop Filesystem and
|
* DFSClient can connect to a Hadoop Filesystem and
|
||||||
|
@ -1355,17 +1354,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke namenode append RPC.
|
||||||
|
* It retries in case of {@link BlockNotYetCompleteException}.
|
||||||
|
*/
|
||||||
|
private LastBlockWithStatus callAppend(String src,
|
||||||
|
EnumSetWritable<CreateFlag> flag) throws IOException {
|
||||||
|
final long startTime = Time.monotonicNow();
|
||||||
|
for(;;) {
|
||||||
|
try {
|
||||||
|
return namenode.append(src, clientName, flag);
|
||||||
|
} catch(RemoteException re) {
|
||||||
|
if (Time.monotonicNow() - startTime > 5000
|
||||||
|
|| !RetriableException.class.getName().equals(
|
||||||
|
re.getClassName())) {
|
||||||
|
throw re;
|
||||||
|
}
|
||||||
|
|
||||||
|
try { // sleep and retry
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw DFSUtilClient.toInterruptedIOException("callAppend", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Method to get stream returned by append call */
|
/** Method to get stream returned by append call */
|
||||||
private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag,
|
private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag,
|
||||||
Progressable progress, String[] favoredNodes) throws IOException {
|
Progressable progress, String[] favoredNodes) throws IOException {
|
||||||
CreateFlag.validateForAppend(flag);
|
CreateFlag.validateForAppend(flag);
|
||||||
try {
|
try {
|
||||||
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
|
final LastBlockWithStatus blkWithStatus = callAppend(src,
|
||||||
new EnumSetWritable<>(flag, CreateFlag.class));
|
new EnumSetWritable<>(flag, CreateFlag.class));
|
||||||
HdfsFileStatus status = blkWithStatus.getFileStatus();
|
HdfsFileStatus status = blkWithStatus.getFileStatus();
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
DFSClient.LOG.debug("NameNode is on an older version, request file " +
|
LOG.debug("NameNode is on an older version, request file " +
|
||||||
"info with additional RPC call for file: " + src);
|
"info with additional RPC call for file: {}", src);
|
||||||
status = getFileInfo(src);
|
status = getFileInfo(src);
|
||||||
}
|
}
|
||||||
return DFSOutputStream.newStreamForAppend(this, src, flag, progress,
|
return DFSOutputStream.newStreamForAppend(this, src, flag, progress,
|
||||||
|
|
|
@ -17,9 +17,29 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||||
import com.google.common.collect.Maps;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
|
||||||
import com.google.common.primitives.SignedBytes;
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
import org.apache.commons.io.Charsets;
|
import org.apache.commons.io.Charsets;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
@ -52,26 +72,9 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import com.google.common.base.Joiner;
|
||||||
import java.io.IOException;
|
import com.google.common.collect.Maps;
|
||||||
import java.io.UnsupportedEncodingException;
|
import com.google.common.primitives.SignedBytes;
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.nio.channels.SocketChannel;
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
|
|
||||||
|
|
||||||
public class DFSUtilClient {
|
public class DFSUtilClient {
|
||||||
public static final byte[] EMPTY_BYTES = {};
|
public static final byte[] EMPTY_BYTES = {};
|
||||||
|
@ -676,4 +679,10 @@ public class DFSUtilClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static InterruptedIOException toInterruptedIOException(String message,
|
||||||
|
InterruptedException e) {
|
||||||
|
final InterruptedIOException iioe = new InterruptedIOException(message);
|
||||||
|
iioe.initCause(e);
|
||||||
|
return iioe;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -455,8 +455,7 @@ class DataStreamer extends Daemon {
|
||||||
setPipeline(lastBlock);
|
setPipeline(lastBlock);
|
||||||
if (nodes.length < 1) {
|
if (nodes.length < 1) {
|
||||||
throw new IOException("Unable to retrieve blocks locations " +
|
throw new IOException("Unable to retrieve blocks locations " +
|
||||||
" for last block " + block +
|
" for last block " + block + " of file " + src);
|
||||||
"of file " + src);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -956,6 +956,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10
|
HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10
|
||||||
datanodes by default. (Mingliang Liu via shv)
|
datanodes by default. (Mingliang Liu via shv)
|
||||||
|
|
||||||
|
HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE
|
||||||
|
blocks. (szetszwo)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-8091: ACLStatus and XAttributes should be presented to
|
HDFS-8091: ACLStatus and XAttributes should be presented to
|
||||||
|
|
|
@ -189,6 +189,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY =
|
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY =
|
||||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
||||||
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
|
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
|
||||||
|
public static final String DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY
|
||||||
|
= "dfs.namenode.file.close.num-committed-allowed";
|
||||||
|
public static final int DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT
|
||||||
|
= 0;
|
||||||
public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =
|
public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =
|
||||||
"dfs.namenode.safemode.replication.min";
|
"dfs.namenode.safemode.replication.min";
|
||||||
public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =
|
public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =
|
||||||
|
|
|
@ -618,6 +618,10 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
return (countNodes(block).liveReplicas() >= minReplication);
|
return (countNodes(block).liveReplicas() >= minReplication);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public short getMinReplication() {
|
||||||
|
return minReplication;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit a block of a file
|
* Commit a block of a file
|
||||||
*
|
*
|
||||||
|
@ -665,7 +669,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
final boolean b = commitBlock(lastBlock, commitBlock);
|
final boolean b = commitBlock(lastBlock, commitBlock);
|
||||||
if (countNodes(lastBlock).liveReplicas() >= minReplication) {
|
if (countNodes(lastBlock).liveReplicas() >= minReplication) {
|
||||||
if (b) {
|
if (b) {
|
||||||
addExpectedReplicasToPending(lastBlock);
|
addExpectedReplicasToPending(lastBlock, bc);
|
||||||
}
|
}
|
||||||
completeBlock(lastBlock, false);
|
completeBlock(lastBlock, false);
|
||||||
}
|
}
|
||||||
|
@ -677,6 +681,10 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
* pendingReplications in order to keep ReplicationMonitor from scheduling
|
* pendingReplications in order to keep ReplicationMonitor from scheduling
|
||||||
* the block.
|
* the block.
|
||||||
*/
|
*/
|
||||||
|
public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) {
|
||||||
|
addExpectedReplicasToPending(blk);
|
||||||
|
}
|
||||||
|
|
||||||
private void addExpectedReplicasToPending(BlockInfo lastBlock) {
|
private void addExpectedReplicasToPending(BlockInfo lastBlock) {
|
||||||
DatanodeStorageInfo[] expectedStorages =
|
DatanodeStorageInfo[] expectedStorages =
|
||||||
lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
|
lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
|
||||||
|
@ -2617,7 +2625,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||||
numLiveReplicas >= minReplication) {
|
numLiveReplicas >= minReplication) {
|
||||||
addExpectedReplicasToPending(storedBlock);
|
addExpectedReplicasToPending(storedBlock, bc);
|
||||||
completeBlock(storedBlock, false);
|
completeBlock(storedBlock, false);
|
||||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||||
// check whether safe replication is reached for the block
|
// check whether safe replication is reached for the block
|
||||||
|
@ -3453,25 +3461,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that the indicated blocks are present and
|
|
||||||
* replicated.
|
|
||||||
*/
|
|
||||||
public boolean checkBlocksProperlyReplicated(
|
|
||||||
String src, BlockInfo[] blocks) {
|
|
||||||
for (BlockInfo b: blocks) {
|
|
||||||
if (!b.isComplete()) {
|
|
||||||
final int numNodes = b.numNodes();
|
|
||||||
LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
|
|
||||||
+ b.getBlockUCState() + ", replication# = " + numNodes
|
|
||||||
+ (numNodes < minReplication ? " < ": " >= ")
|
|
||||||
+ " minimum = " + minReplication + ") in file " + src);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return 0 if the block is not found;
|
* @return 0 if the block is not found;
|
||||||
* otherwise, return the replication factor of the block.
|
* otherwise, return the replication factor of the block.
|
||||||
|
|
|
@ -33,8 +33,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@ -119,10 +121,17 @@ final class FSDirAppendOp {
|
||||||
|
|
||||||
final BlockInfo lastBlock = file.getLastBlock();
|
final BlockInfo lastBlock = file.getLastBlock();
|
||||||
// Check that the block has at least minimum replication.
|
// Check that the block has at least minimum replication.
|
||||||
if (lastBlock != null && lastBlock.isComplete()
|
if (lastBlock != null) {
|
||||||
|
if (lastBlock.getBlockUCState() == BlockUCState.COMMITTED) {
|
||||||
|
throw new RetriableException(
|
||||||
|
new NotReplicatedYetException("append: lastBlock="
|
||||||
|
+ lastBlock + " of src=" + path
|
||||||
|
+ " is COMMITTED but not yet COMPLETE."));
|
||||||
|
} else if (lastBlock.isComplete()
|
||||||
&& !blockManager.isSufficientlyReplicated(lastBlock)) {
|
&& !blockManager.isSufficientlyReplicated(lastBlock)) {
|
||||||
throw new IOException("append: lastBlock=" + lastBlock + " of src="
|
throw new IOException("append: lastBlock=" + lastBlock + " of src="
|
||||||
+ path + " is not sufficiently replicated yet.");
|
+ path + " is not sufficiently replicated yet.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
|
lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
|
||||||
true, logRetryCache);
|
true, logRetryCache);
|
||||||
|
|
|
@ -758,8 +758,10 @@ class FSDirWriteFileOp {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fsn.addCommittedBlocksToPending(pendingFile);
|
||||||
|
|
||||||
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
|
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||||
Snapshot.CURRENT_STATE_ID);
|
Snapshot.CURRENT_STATE_ID, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
|
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
|
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
|
||||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||||
|
|
||||||
|
@ -29,7 +28,6 @@ import java.util.EnumMap;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -89,6 +87,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
|
@ -446,8 +445,9 @@ public class FSEditLogLoader {
|
||||||
// One might expect that you could use removeLease(holder, path) here,
|
// One might expect that you could use removeLease(holder, path) here,
|
||||||
// but OP_CLOSE doesn't serialize the holder. So, remove the inode.
|
// but OP_CLOSE doesn't serialize the holder. So, remove the inode.
|
||||||
if (file.isUnderConstruction()) {
|
if (file.isUnderConstruction()) {
|
||||||
fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId()));
|
fsNamesys.getLeaseManager().removeLease(file.getId());
|
||||||
file.toCompleteFile(file.getModificationTime());
|
file.toCompleteFile(file.getModificationTime(), 0,
|
||||||
|
fsNamesys.getBlockManager().getMinReplication());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -463,6 +463,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
private final long minBlockSize; // minimum block size
|
private final long minBlockSize; // minimum block size
|
||||||
final long maxBlocksPerFile; // maximum # of blocks per file
|
final long maxBlocksPerFile; // maximum # of blocks per file
|
||||||
|
private final int numCommittedAllowed;
|
||||||
|
|
||||||
/** Lock to protect FSNamesystem. */
|
/** Lock to protect FSNamesystem. */
|
||||||
private final FSNamesystemLock fsLock;
|
private final FSNamesystemLock fsLock;
|
||||||
|
@ -769,6 +770,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
|
||||||
this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
|
this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
|
||||||
|
this.numCommittedAllowed = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
|
||||||
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
|
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
|
||||||
LOG.info("Append Enabled: " + supportAppends);
|
LOG.info("Append Enabled: " + supportAppends);
|
||||||
|
|
||||||
|
@ -2602,17 +2606,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
|
boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
|
||||||
assert hasReadLock();
|
assert hasReadLock();
|
||||||
if (checkall) {
|
if (checkall) {
|
||||||
return blockManager.checkBlocksProperlyReplicated(src, v
|
return checkBlocksComplete(src, true, v.getBlocks());
|
||||||
.getBlocks());
|
|
||||||
} else {
|
} else {
|
||||||
// check the penultimate block of this file
|
final BlockInfo[] blocks = v.getBlocks();
|
||||||
BlockInfo b = v.getPenultimateBlock();
|
final int i = blocks.length - numCommittedAllowed - 2;
|
||||||
return b == null ||
|
return i < 0 || blocks[i] == null
|
||||||
blockManager.checkBlocksProperlyReplicated(
|
|| checkBlocksComplete(src, false, blocks[i]);
|
||||||
src, new BlockInfo[] { b });
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the blocks are COMPLETE;
|
||||||
|
* it may allow the last block to be COMMITTED.
|
||||||
|
*/
|
||||||
|
private boolean checkBlocksComplete(String src, boolean allowCommittedBlock,
|
||||||
|
BlockInfo... blocks) {
|
||||||
|
final int n = allowCommittedBlock? numCommittedAllowed: 0;
|
||||||
|
for(int i = 0; i < blocks.length; i++) {
|
||||||
|
final short min = blockManager.getMinReplication();
|
||||||
|
final String err = INodeFile.checkBlockComplete(blocks, i, n, min);
|
||||||
|
if (err != null) {
|
||||||
|
final int numNodes = blocks[i].numNodes();
|
||||||
|
LOG.info("BLOCK* " + err + "(numNodes= " + numNodes
|
||||||
|
+ (numNodes < min ? " < " : " >= ")
|
||||||
|
+ " minimum = " + min + ") in file " + src);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change the indicated filename.
|
* Change the indicated filename.
|
||||||
* @deprecated Use {@link #renameTo(String, String, boolean,
|
* @deprecated Use {@link #renameTo(String, String, boolean,
|
||||||
|
@ -2746,7 +2769,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
List<INode> removedINodes,
|
List<INode> removedINodes,
|
||||||
final boolean acquireINodeMapLock) {
|
final boolean acquireINodeMapLock) {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
leaseManager.removeLeases(removedUCFiles);
|
for(long i : removedUCFiles) {
|
||||||
|
leaseManager.removeLease(i);
|
||||||
|
}
|
||||||
// remove inodes from inodesMap
|
// remove inodes from inodesMap
|
||||||
if (removedINodes != null) {
|
if (removedINodes != null) {
|
||||||
if (acquireINodeMapLock) {
|
if (acquireINodeMapLock) {
|
||||||
|
@ -3054,7 +3079,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// then reap lease immediately and close the file.
|
// then reap lease immediately and close the file.
|
||||||
if(nrCompleteBlocks == nrBlocks) {
|
if(nrCompleteBlocks == nrBlocks) {
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||||
iip.getLatestSnapshotId());
|
iip.getLatestSnapshotId(), false);
|
||||||
NameNode.stateChangeLog.warn("BLOCK*"
|
NameNode.stateChangeLog.warn("BLOCK*"
|
||||||
+ " internalReleaseLease: All existing blocks are COMPLETE,"
|
+ " internalReleaseLease: All existing blocks are COMPLETE,"
|
||||||
+ " lease removed, file closed.");
|
+ " lease removed, file closed.");
|
||||||
|
@ -3093,7 +3118,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
if(penultimateBlockMinReplication &&
|
if(penultimateBlockMinReplication &&
|
||||||
blockManager.checkMinReplication(lastBlock)) {
|
blockManager.checkMinReplication(lastBlock)) {
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||||
iip.getLatestSnapshotId());
|
iip.getLatestSnapshotId(), false);
|
||||||
NameNode.stateChangeLog.warn("BLOCK*"
|
NameNode.stateChangeLog.warn("BLOCK*"
|
||||||
+ " internalReleaseLease: Committed blocks are minimally replicated,"
|
+ " internalReleaseLease: Committed blocks are minimally replicated,"
|
||||||
+ " lease removed, file closed.");
|
+ " lease removed, file closed.");
|
||||||
|
@ -3137,7 +3162,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// We can remove this block and close the file.
|
// We can remove this block and close the file.
|
||||||
pendingFile.removeLastBlock(lastBlock);
|
pendingFile.removeLastBlock(lastBlock);
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||||
iip.getLatestSnapshotId());
|
iip.getLatestSnapshotId(), false);
|
||||||
NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
|
NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
|
||||||
+ "Removed empty last block and closed file.");
|
+ "Removed empty last block and closed file.");
|
||||||
return true;
|
return true;
|
||||||
|
@ -3202,8 +3227,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void finalizeINodeFileUnderConstruction(
|
void addCommittedBlocksToPending(final INodeFile pendingFile) {
|
||||||
String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
|
final BlockInfo[] blocks = pendingFile.getBlocks();
|
||||||
|
int i = blocks.length - numCommittedAllowed;
|
||||||
|
if (i < 0) {
|
||||||
|
i = 0;
|
||||||
|
}
|
||||||
|
for(; i < blocks.length; i++) {
|
||||||
|
final BlockInfo b = blocks[i];
|
||||||
|
if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) {
|
||||||
|
// b is COMMITTED but not yet COMPLETE, add it to pending replication.
|
||||||
|
blockManager.addExpectedReplicasToPending(b, pendingFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile,
|
||||||
|
int latestSnapshot, boolean allowCommittedBlock) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
|
||||||
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
|
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
|
||||||
|
@ -3218,7 +3258,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// The file is no longer pending.
|
// The file is no longer pending.
|
||||||
// Create permanent INode, update blocks. No need to replace the inode here
|
// Create permanent INode, update blocks. No need to replace the inode here
|
||||||
// since we just remove the uc feature from pendingFile
|
// since we just remove the uc feature from pendingFile
|
||||||
pendingFile.toCompleteFile(now());
|
pendingFile.toCompleteFile(now(),
|
||||||
|
allowCommittedBlock? numCommittedAllowed: 0,
|
||||||
|
blockManager.getMinReplication());
|
||||||
|
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
// close file and persist block allocations for this file
|
// close file and persist block allocations for this file
|
||||||
|
@ -3468,8 +3510,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
|
commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
|
||||||
|
|
||||||
//remove lease, close file
|
//remove lease, close file
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID);
|
||||||
Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID));
|
finalizeINodeFileUnderConstruction(src, pendingFile, s, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -201,30 +201,58 @@ public class INodeFile extends INodeWithAdditionalFields
|
||||||
* Convert the file to a complete file, i.e., to remove the Under-Construction
|
* Convert the file to a complete file, i.e., to remove the Under-Construction
|
||||||
* feature.
|
* feature.
|
||||||
*/
|
*/
|
||||||
public INodeFile toCompleteFile(long mtime) {
|
void toCompleteFile(long mtime, int numCommittedAllowed, short minReplication) {
|
||||||
Preconditions.checkState(isUnderConstruction(),
|
final FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
|
||||||
"file is no longer under construction");
|
Preconditions.checkNotNull(uc, "File %s is not under construction", this);
|
||||||
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
|
assertAllBlocksComplete(numCommittedAllowed, minReplication);
|
||||||
if (uc != null) {
|
removeFeature(uc);
|
||||||
assertAllBlocksComplete();
|
setModificationTime(mtime);
|
||||||
removeFeature(uc);
|
|
||||||
this.setModificationTime(mtime);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Assert all blocks are complete. */
|
/** Assert all blocks are complete. */
|
||||||
private void assertAllBlocksComplete() {
|
private void assertAllBlocksComplete(int numCommittedAllowed,
|
||||||
|
short minReplication) {
|
||||||
if (blocks == null) {
|
if (blocks == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
|
final String err = checkBlockComplete(blocks, i, numCommittedAllowed,
|
||||||
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
|
minReplication);
|
||||||
getClass().getSimpleName(), this, i, Arrays.asList(blocks));
|
Preconditions.checkState(err == null,
|
||||||
|
"Unexpected block state: %s, file=%s (%s), blocks=%s (i=%s)",
|
||||||
|
err, this, getClass().getSimpleName(), Arrays.asList(blocks), i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the i-th block is COMPLETE;
|
||||||
|
* when the i-th block is the last block, it may be allowed to be COMMITTED.
|
||||||
|
*
|
||||||
|
* @return null if the block passes the check;
|
||||||
|
* otherwise, return an error message.
|
||||||
|
*/
|
||||||
|
static String checkBlockComplete(BlockInfo[] blocks, int i,
|
||||||
|
int numCommittedAllowed, short minReplication) {
|
||||||
|
final BlockInfo b = blocks[i];
|
||||||
|
final BlockUCState state = b.getBlockUCState();
|
||||||
|
if (state == BlockUCState.COMPLETE) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (i < blocks.length - numCommittedAllowed) {
|
||||||
|
return b + " is " + state + " but not COMPLETE";
|
||||||
|
}
|
||||||
|
if (state != BlockUCState.COMMITTED) {
|
||||||
|
return b + " is " + state + " but neither COMPLETE nor COMMITTED";
|
||||||
|
}
|
||||||
|
final int numExpectedLocations
|
||||||
|
= b.getUnderConstructionFeature().getNumExpectedLocations();
|
||||||
|
if (numExpectedLocations <= minReplication) {
|
||||||
|
return b + " is " + state + " but numExpectedLocations = "
|
||||||
|
+ numExpectedLocations + " <= minReplication = " + minReplication;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // BlockCollection
|
@Override // BlockCollection
|
||||||
public void setBlock(int index, BlockInfo blk) {
|
public void setBlock(int index, BlockInfo blk) {
|
||||||
this.blocks[index] = blk;
|
this.blocks[index] = blk;
|
||||||
|
|
|
@ -160,6 +160,13 @@ public class LeaseManager {
|
||||||
return lease;
|
return lease;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized void removeLease(long inodeId) {
|
||||||
|
final Lease lease = leasesById.get(inodeId);
|
||||||
|
if (lease != null) {
|
||||||
|
removeLease(lease, inodeId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the specified lease and src.
|
* Remove the specified lease and src.
|
||||||
*/
|
*/
|
||||||
|
@ -298,16 +305,6 @@ public class LeaseManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
synchronized void removeLeases(Collection<Long> inodes) {
|
|
||||||
for (long inode : inodes) {
|
|
||||||
Lease lease = leasesById.get(inode);
|
|
||||||
if (lease != null) {
|
|
||||||
removeLease(lease, inode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLeasePeriod(long softLimit, long hardLimit) {
|
public void setLeasePeriod(long softLimit, long hardLimit) {
|
||||||
this.softLimit = softLimit;
|
this.softLimit = softLimit;
|
||||||
this.hardLimit = hardLimit;
|
this.hardLimit = hardLimit;
|
||||||
|
|
|
@ -27,10 +27,12 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.HardLink;
|
import org.apache.hadoop.fs.HardLink;
|
||||||
|
@ -41,12 +43,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -55,6 +57,8 @@ import org.junit.Test;
|
||||||
* support HDFS appends.
|
* support HDFS appends.
|
||||||
*/
|
*/
|
||||||
public class TestFileAppend{
|
public class TestFileAppend{
|
||||||
|
private static final long RANDOM_TEST_RUNTIME = 10000;
|
||||||
|
|
||||||
final boolean simulatedStorage = false;
|
final boolean simulatedStorage = false;
|
||||||
|
|
||||||
private static byte[] fileContents = null;
|
private static byte[] fileContents = null;
|
||||||
|
@ -381,6 +385,56 @@ public class TestFileAppend{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleAppends() throws Exception {
|
||||||
|
final long startTime = Time.monotonicNow();
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1);
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(4).build();
|
||||||
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
try {
|
||||||
|
final Path p = new Path("/testMultipleAppend/foo");
|
||||||
|
final int blockSize = 1 << 16;
|
||||||
|
final byte[] data = AppendTestUtil.initBuffer(blockSize);
|
||||||
|
|
||||||
|
// create an empty file.
|
||||||
|
fs.create(p, true, 4096, (short)3, blockSize).close();
|
||||||
|
|
||||||
|
int fileLen = 0;
|
||||||
|
for(int i = 0;
|
||||||
|
i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME;
|
||||||
|
i++) {
|
||||||
|
int appendLen = ThreadLocalRandom.current().nextInt(100) + 1;
|
||||||
|
if (fileLen + appendLen > data.length) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
AppendTestUtil.LOG.info(i + ") fileLen=" + fileLen
|
||||||
|
+ ", appendLen=" + appendLen);
|
||||||
|
final FSDataOutputStream out = fs.append(p);
|
||||||
|
out.write(data, fileLen, appendLen);
|
||||||
|
out.close();
|
||||||
|
fileLen += appendLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen());
|
||||||
|
final byte[] actual = new byte[fileLen];
|
||||||
|
final FSDataInputStream in = fs.open(p);
|
||||||
|
in.readFully(actual);
|
||||||
|
in.close();
|
||||||
|
for(int i = 0; i < fileLen; i++) {
|
||||||
|
Assert.assertEquals(data[i], actual[i]);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
fs.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Tests appending after soft-limit expires. */
|
/** Tests appending after soft-limit expires. */
|
||||||
@Test
|
@Test
|
||||||
public void testAppendAfterSoftLimit()
|
public void testAppendAfterSoftLimit()
|
||||||
|
|
|
@ -92,6 +92,10 @@ public class TestINodeFile {
|
||||||
(short)3, 1024L);
|
(short)3, 1024L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void toCompleteFile(INodeFile file) {
|
||||||
|
file.toCompleteFile(Time.now(), 0, (short)1);
|
||||||
|
}
|
||||||
|
|
||||||
INodeFile createINodeFile(short replication, long preferredBlockSize) {
|
INodeFile createINodeFile(short replication, long preferredBlockSize) {
|
||||||
return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
|
return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
|
||||||
null, replication, preferredBlockSize, (byte)0);
|
null, replication, preferredBlockSize, (byte)0);
|
||||||
|
@ -1089,7 +1093,7 @@ public class TestINodeFile {
|
||||||
assertEquals(clientName, uc.getClientName());
|
assertEquals(clientName, uc.getClientName());
|
||||||
assertEquals(clientMachine, uc.getClientMachine());
|
assertEquals(clientMachine, uc.getClientMachine());
|
||||||
|
|
||||||
file.toCompleteFile(Time.now());
|
toCompleteFile(file);
|
||||||
assertFalse(file.isUnderConstruction());
|
assertFalse(file.isUnderConstruction());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,8 @@ public class TestLeaseManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(4, lm.getINodeIdWithLeases().size());
|
assertEquals(4, lm.getINodeIdWithLeases().size());
|
||||||
synchronized (lm) {
|
for (long id : ids) {
|
||||||
lm.removeLeases(ids);
|
lm.removeLease(id);
|
||||||
}
|
}
|
||||||
assertEquals(0, lm.getINodeIdWithLeases().size());
|
assertEquals(0, lm.getINodeIdWithLeases().size());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue