HDFS-2241. Remove implementing FSConstants interface to just get the constants from the interface. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1156420 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a990ed646
commit
ef223e8e8e
|
@ -657,6 +657,9 @@ Trunk (unreleased changes)
|
|||
HDFS-2239. Reduce access levels of the fields and methods in FSNamesystem.
|
||||
(szetszwo)
|
||||
|
||||
HDFS-2241. Remove implementing FSConstants interface to just get the
|
||||
constants from the interface. (suresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||
|
|
|
@ -61,6 +61,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
|
@ -106,7 +109,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
*
|
||||
********************************************************/
|
||||
@InterfaceAudience.Private
|
||||
public class DFSClient implements FSConstants, java.io.Closeable {
|
||||
public class DFSClient implements java.io.Closeable {
|
||||
public static final Log LOG = LogFactory.getLog(DFSClient.class);
|
||||
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
|
||||
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
||||
|
@ -165,7 +168,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|||
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||
defaultBlockSize = conf.getLong(DFS_BLOCK_SIZE_KEY,
|
||||
DEFAULT_BLOCK_SIZE);
|
||||
DFS_BLOCK_SIZE_DEFAULT);
|
||||
defaultReplication = (short) conf.getInt(
|
||||
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
|
||||
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
|
||||
|
@ -1043,7 +1046,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|||
|
||||
out = new DataOutputStream(
|
||||
new BufferedOutputStream(NetUtils.getOutputStream(sock),
|
||||
DataNode.SMALL_BUFFER_SIZE));
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
in = new DataInputStream(NetUtils.getInputStream(sock));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -234,7 +234,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
|
||||
public static final int DFS_DF_INTERVAL_DEFAULT = 60000;
|
||||
public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";
|
||||
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 21600000;
|
||||
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000;
|
||||
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
|
||||
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
|
||||
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
|
@ -167,7 +168,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
this.seqno = HEART_BEAT_SEQNO;
|
||||
|
||||
buffer = null;
|
||||
int packetSize = PacketHeader.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER; // TODO(todd) strange
|
||||
int packetSize = PacketHeader.PKT_HEADER_LEN + FSConstants.BYTES_IN_INTEGER;
|
||||
buf = new byte[packetSize];
|
||||
|
||||
checksumStart = dataStart = packetSize;
|
||||
|
@ -235,12 +236,12 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
dataStart - checksumLen , checksumLen);
|
||||
}
|
||||
|
||||
int pktLen = DFSClient.SIZE_OF_INTEGER + dataLen + checksumLen;
|
||||
int pktLen = FSConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
|
||||
|
||||
//normally dataStart == checksumPos, i.e., offset is zero.
|
||||
buffer = ByteBuffer.wrap(
|
||||
buf, dataStart - checksumPos,
|
||||
PacketHeader.PKT_HEADER_LEN + pktLen - DFSClient.SIZE_OF_INTEGER);
|
||||
PacketHeader.PKT_HEADER_LEN + pktLen - FSConstants.BYTES_IN_INTEGER);
|
||||
buf = null;
|
||||
buffer.mark();
|
||||
|
||||
|
@ -839,7 +840,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
|
||||
out = new DataOutputStream(new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(sock, writeTimeout),
|
||||
DataNode.SMALL_BUFFER_SIZE));
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
|
||||
//send the TRANSFER_BLOCK request
|
||||
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
|
||||
|
@ -1011,7 +1012,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
//
|
||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(s, writeTimeout),
|
||||
DataNode.SMALL_BUFFER_SIZE));
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
|
||||
|
||||
// send the request
|
||||
|
@ -1156,7 +1157,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
final int timeout = client.getDatanodeReadTimeout(length);
|
||||
NetUtils.connect(sock, isa, timeout);
|
||||
sock.setSoTimeout(timeout);
|
||||
sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
|
||||
sock.setSendBufferSize(FSConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||
if(DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
|
||||
}
|
||||
|
|
|
@ -18,68 +18,71 @@
|
|||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
|
||||
/************************************
|
||||
* Some handy constants
|
||||
*
|
||||
*
|
||||
************************************/
|
||||
@InterfaceAudience.Private
|
||||
public interface FSConstants {
|
||||
public final class FSConstants {
|
||||
/* Hidden constructor */
|
||||
private FSConstants() {
|
||||
}
|
||||
|
||||
public static int MIN_BLOCKS_FOR_WRITE = 5;
|
||||
|
||||
// Long that indicates "leave current quota unchanged"
|
||||
public static final long QUOTA_DONT_SET = Long.MAX_VALUE;
|
||||
public static final long QUOTA_RESET = -1L;
|
||||
|
||||
|
||||
//
|
||||
// Timeouts, constants
|
||||
//
|
||||
public static long HEARTBEAT_INTERVAL = 3;
|
||||
public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
|
||||
public static long BLOCKREPORT_INITIAL_DELAY = 0;
|
||||
public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
|
||||
public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
|
||||
public static final long LEASE_RECOVER_PERIOD = 10 * 1000; //in ms
|
||||
|
||||
// We need to limit the length and depth of a path in the filesystem. HADOOP-438
|
||||
// Currently we set the maximum length to 8k characters and the maximum depth to 1k.
|
||||
public static final long LEASE_RECOVER_PERIOD = 10 * 1000; // in ms
|
||||
|
||||
// We need to limit the length and depth of a path in the filesystem.
|
||||
// HADOOP-438
|
||||
// Currently we set the maximum length to 8k characters and the maximum depth
|
||||
// to 1k.
|
||||
public static int MAX_PATH_LENGTH = 8000;
|
||||
public static int MAX_PATH_DEPTH = 1000;
|
||||
|
||||
public static final int BUFFER_SIZE = new HdfsConfiguration().getInt("io.file.buffer.size", 4096);
|
||||
//Used for writing header etc.
|
||||
public static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
|
||||
//TODO mb@media-style.com: should be conf injected?
|
||||
public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
|
||||
public static final int DEFAULT_BYTES_PER_CHECKSUM = 512;
|
||||
public static final int DEFAULT_WRITE_PACKET_SIZE = 64 * 1024;
|
||||
public static final short DEFAULT_REPLICATION_FACTOR = 3;
|
||||
public static final int DEFAULT_FILE_BUFFER_SIZE = 4096;
|
||||
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
|
||||
|
||||
public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
|
||||
// TODO mb@media-style.com: should be conf injected?
|
||||
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
|
||||
public static final int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt(
|
||||
DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
|
||||
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
// Used for writing header etc.
|
||||
public static final int SMALL_BUFFER_SIZE = Math.min(IO_FILE_BUFFER_SIZE / 2,
|
||||
512);
|
||||
|
||||
public static final int BYTES_IN_INTEGER = Integer.SIZE / Byte.SIZE;
|
||||
|
||||
// SafeMode actions
|
||||
public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
|
||||
public enum SafeModeAction {
|
||||
SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET;
|
||||
}
|
||||
|
||||
// type of the datanode report
|
||||
public static enum DatanodeReportType {ALL, LIVE, DEAD }
|
||||
|
||||
public static enum DatanodeReportType {
|
||||
ALL, LIVE, DEAD
|
||||
}
|
||||
|
||||
// An invalid transaction ID that will never be seen in a real namesystem.
|
||||
public static final long INVALID_TXID = -12345;
|
||||
|
||||
/**
|
||||
* Distributed upgrade actions:
|
||||
*
|
||||
* 1. Get upgrade status.
|
||||
* 2. Get detailed upgrade status.
|
||||
* 3. Proceed with the upgrade if it is stuck, no matter what the status is.
|
||||
* 1. Get upgrade status. 2. Get detailed upgrade status. 3. Proceed with the
|
||||
* upgrade if it is stuck, no matter what the status is.
|
||||
*/
|
||||
public static enum UpgradeAction {
|
||||
GET_STATUS,
|
||||
DETAILED_STATUS,
|
||||
FORCE_PROCEED;
|
||||
GET_STATUS, DETAILED_STATUS, FORCE_PROCEED;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -90,6 +93,6 @@ public interface FSConstants {
|
|||
/**
|
||||
* Please see {@link LayoutVersion} on adding new layout version.
|
||||
*/
|
||||
public static final int LAYOUT_VERSION =
|
||||
LayoutVersion.getCurrentLayoutVersion();
|
||||
public static final int LAYOUT_VERSION = LayoutVersion
|
||||
.getCurrentLayoutVersion();
|
||||
}
|
||||
|
|
|
@ -309,10 +309,10 @@ public class Balancer {
|
|||
target.datanode.getName()), HdfsConstants.READ_TIMEOUT);
|
||||
sock.setKeepAlive(true);
|
||||
out = new DataOutputStream( new BufferedOutputStream(
|
||||
sock.getOutputStream(), FSConstants.BUFFER_SIZE));
|
||||
sock.getOutputStream(), FSConstants.IO_FILE_BUFFER_SIZE));
|
||||
sendRequest(out);
|
||||
in = new DataInputStream( new BufferedInputStream(
|
||||
sock.getInputStream(), FSConstants.BUFFER_SIZE));
|
||||
sock.getInputStream(), FSConstants.IO_FILE_BUFFER_SIZE));
|
||||
receiveResponse(in);
|
||||
bytesMoved.inc(block.getNumBytes());
|
||||
LOG.info( "Moving block " + block.getBlock().getBlockId() +
|
||||
|
|
|
@ -29,7 +29,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
************************************/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface HdfsConstants {
|
||||
public final class HdfsConstants {
|
||||
/* Hidden constructor */
|
||||
private HdfsConstants() { }
|
||||
|
||||
/**
|
||||
* Type of the node
|
||||
*/
|
||||
|
|
|
@ -190,13 +190,15 @@ public class JspHelper {
|
|||
s.connect(addr, HdfsConstants.READ_TIMEOUT);
|
||||
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||
|
||||
long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
|
||||
long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
|
||||
|
||||
// Use the block name for file name.
|
||||
String file = BlockReader.getFileName(addr, poolId, blockId);
|
||||
BlockReader blockReader = BlockReader.newBlockReader(s, file,
|
||||
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
|
||||
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
String file = BlockReader.getFileName(addr, poolId, blockId);
|
||||
BlockReader blockReader = BlockReader.newBlockReader(s, file,
|
||||
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
|
||||
offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
|
||||
offsetIntoBlock, amtToRead, bufferSize);
|
||||
|
||||
byte[] buf = new byte[(int)amtToRead];
|
||||
int readOffset = 0;
|
||||
|
|
|
@ -54,7 +54,7 @@ import org.apache.hadoop.util.PureJavaCrc32;
|
|||
* may copies it to another site. If a throttler is provided,
|
||||
* streaming throttling is also supported.
|
||||
**/
|
||||
class BlockReceiver implements Closeable, FSConstants {
|
||||
class BlockReceiver implements Closeable {
|
||||
public static final Log LOG = DataNode.LOG;
|
||||
static final Log ClientTraceLog = DataNode.ClientTraceLog;
|
||||
|
||||
|
@ -179,8 +179,7 @@ class BlockReceiver implements Closeable, FSConstants {
|
|||
this.out = streams.dataOut;
|
||||
this.cout = streams.checksumOut;
|
||||
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
||||
streams.checksumOut,
|
||||
SMALL_BUFFER_SIZE));
|
||||
streams.checksumOut, FSConstants.SMALL_BUFFER_SIZE));
|
||||
// write data chunk header if creating a new replica
|
||||
if (isCreate) {
|
||||
BlockMetadataHeader.writeHeader(checksumOut, checksum);
|
||||
|
@ -399,7 +398,7 @@ class BlockReceiver implements Closeable, FSConstants {
|
|||
buf.limit(bufRead);
|
||||
}
|
||||
|
||||
while (buf.remaining() < SIZE_OF_INTEGER) {
|
||||
while (buf.remaining() < FSConstants.BYTES_IN_INTEGER) {
|
||||
if (buf.position() > 0) {
|
||||
shiftBufData();
|
||||
}
|
||||
|
@ -418,9 +417,10 @@ class BlockReceiver implements Closeable, FSConstants {
|
|||
payloadLen);
|
||||
}
|
||||
|
||||
// Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that
|
||||
// Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
|
||||
// we read above.
|
||||
int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER;
|
||||
int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
|
||||
- FSConstants.BYTES_IN_INTEGER;
|
||||
|
||||
if (buf.remaining() < pktSize) {
|
||||
//we need to read more data
|
||||
|
@ -817,7 +817,7 @@ class BlockReceiver implements Closeable, FSConstants {
|
|||
* Processed responses from downstream datanodes in the pipeline
|
||||
* and sends back replies to the originator.
|
||||
*/
|
||||
class PacketResponder implements Runnable, Closeable, FSConstants {
|
||||
class PacketResponder implements Runnable, Closeable {
|
||||
|
||||
/** queue for packets waiting for ack */
|
||||
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
/**
|
||||
* Reads a block from the disk and sends it to a recipient.
|
||||
*/
|
||||
class BlockSender implements java.io.Closeable, FSConstants {
|
||||
class BlockSender implements java.io.Closeable {
|
||||
public static final Log LOG = DataNode.LOG;
|
||||
static final Log ClientTraceLog = DataNode.ClientTraceLog;
|
||||
|
||||
|
@ -155,7 +155,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|||
|
||||
if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
|
||||
checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
|
||||
.getMetaDataInputStream(block), BUFFER_SIZE));
|
||||
.getMetaDataInputStream(block), FSConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
|
@ -472,15 +472,15 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|||
streamForSendChunks = baseStream;
|
||||
|
||||
// assure a mininum buffer size.
|
||||
maxChunksPerPacket = (Math.max(BUFFER_SIZE,
|
||||
maxChunksPerPacket = (Math.max(FSConstants.IO_FILE_BUFFER_SIZE,
|
||||
MIN_BUFFER_WITH_TRANSFERTO)
|
||||
+ bytesPerChecksum - 1)/bytesPerChecksum;
|
||||
|
||||
// allocate smaller buffer while using transferTo().
|
||||
pktSize += checksumSize * maxChunksPerPacket;
|
||||
} else {
|
||||
maxChunksPerPacket = Math.max(1,
|
||||
(BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
|
||||
maxChunksPerPacket = Math.max(1, (FSConstants.IO_FILE_BUFFER_SIZE
|
||||
+ bytesPerChecksum - 1) / bytesPerChecksum);
|
||||
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
|
||||
}
|
||||
|
||||
|
|
|
@ -169,7 +169,7 @@ import org.mortbay.util.ajax.JSON;
|
|||
**********************************************************/
|
||||
@InterfaceAudience.Private
|
||||
public class DataNode extends Configured
|
||||
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
|
||||
implements InterDatanodeProtocol, ClientDatanodeProtocol,
|
||||
DataNodeMXBean {
|
||||
public static final Log LOG = LogFactory.getLog(DataNode.class);
|
||||
|
||||
|
@ -348,7 +348,7 @@ public class DataNode extends Configured
|
|||
ThreadGroup threadGroup = null;
|
||||
long blockReportInterval;
|
||||
boolean resetBlockReportTime = true;
|
||||
long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
|
||||
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
|
||||
long heartBeatInterval;
|
||||
private boolean heartbeatsDisabledForTests = false;
|
||||
private DataStorage storage = null;
|
||||
|
@ -440,21 +440,23 @@ public class DataNode extends Configured
|
|||
HdfsConstants.WRITE_TIMEOUT);
|
||||
/* Based on results on different platforms, we might need set the default
|
||||
* to false on some of them. */
|
||||
this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed",
|
||||
true);
|
||||
this.transferToAllowed = conf.getBoolean(
|
||||
DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
|
||||
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
|
||||
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||
|
||||
this.blockReportInterval =
|
||||
conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
|
||||
this.initialBlockReportDelay = conf.getLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||
BLOCKREPORT_INITIAL_DELAY)* 1000L;
|
||||
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
||||
this.initialBlockReportDelay = conf.getLong(
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
|
||||
if (this.initialBlockReportDelay >= blockReportInterval) {
|
||||
this.initialBlockReportDelay = 0;
|
||||
LOG.info("dfs.blockreport.initialDelay is greater than " +
|
||||
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
|
||||
}
|
||||
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
|
||||
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
||||
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
||||
|
||||
// do we need to sync block file contents to disk when blockfile is closed?
|
||||
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
||||
|
@ -617,7 +619,7 @@ public class DataNode extends Configured
|
|||
} else {
|
||||
ss = secureResources.getStreamingSocket();
|
||||
}
|
||||
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
||||
ss.setReceiveBufferSize(FSConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||
// adjust machine name with the actual port
|
||||
int tmpPort = ss.getLocalPort();
|
||||
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
||||
|
@ -1967,8 +1969,8 @@ public class DataNode extends Configured
|
|||
long writeTimeout = socketWriteTimeout +
|
||||
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
||||
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
|
||||
out = new DataOutputStream(new BufferedOutputStream(baseStream,
|
||||
SMALL_BUFFER_SIZE));
|
||||
out = new DataOutputStream(new BufferedOutputStream(baseStream,
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
||||
false, false, false, DataNode.this);
|
||||
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
|
||||
|
|
|
@ -69,7 +69,7 @@ import com.google.protobuf.ByteString;
|
|||
/**
|
||||
* Thread for processing incoming/outgoing data stream.
|
||||
*/
|
||||
class DataXceiver extends Receiver implements Runnable, FSConstants {
|
||||
class DataXceiver extends Receiver implements Runnable {
|
||||
public static final Log LOG = DataNode.LOG;
|
||||
static final Log ClientTraceLog = DataNode.ClientTraceLog;
|
||||
|
||||
|
@ -202,8 +202,8 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||
final long length) throws IOException {
|
||||
OutputStream baseStream = NetUtils.getOutputStream(s,
|
||||
datanode.socketWriteTimeout);
|
||||
DataOutputStream out = new DataOutputStream(
|
||||
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
|
||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
baseStream, FSConstants.SMALL_BUFFER_SIZE));
|
||||
checkAccess(out, true, block, blockToken,
|
||||
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
|
||||
|
||||
|
@ -329,7 +329,7 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||
final DataOutputStream replyOut = new DataOutputStream(
|
||||
new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
|
||||
SMALL_BUFFER_SIZE));
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
checkAccess(replyOut, isClient, block, blockToken,
|
||||
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
|
||||
|
||||
|
@ -369,11 +369,11 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
|
||||
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
|
||||
mirrorSock.setSoTimeout(timeoutValue);
|
||||
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
||||
mirrorSock.setSendBufferSize(FSConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||
mirrorOut = new DataOutputStream(
|
||||
new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(mirrorSock, writeTimeout),
|
||||
SMALL_BUFFER_SIZE));
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
|
||||
|
||||
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
|
||||
|
@ -524,7 +524,7 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||
final MetaDataInputStream metadataIn =
|
||||
datanode.data.getMetaDataInputStream(block);
|
||||
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
|
||||
metadataIn, BUFFER_SIZE));
|
||||
metadataIn, FSConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
updateCurrentThreadName("Getting checksum for block " + block);
|
||||
try {
|
||||
|
@ -603,7 +603,7 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||
OutputStream baseStream = NetUtils.getOutputStream(
|
||||
s, datanode.socketWriteTimeout);
|
||||
reply = new DataOutputStream(new BufferedOutputStream(
|
||||
baseStream, SMALL_BUFFER_SIZE));
|
||||
baseStream, FSConstants.SMALL_BUFFER_SIZE));
|
||||
|
||||
// send status first
|
||||
writeResponse(SUCCESS, reply);
|
||||
|
@ -681,15 +681,15 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||
|
||||
OutputStream baseStream = NetUtils.getOutputStream(proxySock,
|
||||
datanode.socketWriteTimeout);
|
||||
proxyOut = new DataOutputStream(
|
||||
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
|
||||
proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
|
||||
/* send request to the proxy */
|
||||
new Sender(proxyOut).copyBlock(block, blockToken);
|
||||
|
||||
// receive the response from the proxy
|
||||
proxyReply = new DataInputStream(new BufferedInputStream(
|
||||
NetUtils.getInputStream(proxySock), BUFFER_SIZE));
|
||||
NetUtils.getInputStream(proxySock), FSConstants.IO_FILE_BUFFER_SIZE));
|
||||
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
|
||||
HdfsProtoUtil.vintPrefixed(proxyReply));
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.hadoop.util.Daemon;
|
|||
* other DataNodes. This small server does not use the
|
||||
* Hadoop IPC mechanism.
|
||||
*/
|
||||
class DataXceiverServer implements Runnable, FSConstants {
|
||||
class DataXceiverServer implements Runnable {
|
||||
public static final Log LOG = DataNode.LOG;
|
||||
|
||||
ServerSocket ss;
|
||||
|
@ -119,8 +119,8 @@ class DataXceiverServer implements Runnable, FSConstants {
|
|||
conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
|
||||
|
||||
this.estimateBlockSize =
|
||||
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||
this.estimateBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
||||
|
||||
//set up parameter for cluster balancing
|
||||
this.balanceThrottler = new BlockBalanceThrottler(
|
||||
|
|
|
@ -75,7 +75,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
*
|
||||
***************************************************/
|
||||
@InterfaceAudience.Private
|
||||
public class FSDataset implements FSConstants, FSDatasetInterface {
|
||||
public class FSDataset implements FSDatasetInterface {
|
||||
|
||||
/**
|
||||
* A node type that can be built into a tree reflecting the
|
||||
|
@ -465,7 +465,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|||
}
|
||||
checksumIn = new DataInputStream(
|
||||
new BufferedInputStream(new FileInputStream(metaFile),
|
||||
BUFFER_SIZE));
|
||||
FSConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
|
|
|
@ -67,7 +67,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
|
|||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
|
@ -78,6 +78,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -146,7 +149,7 @@ import org.mortbay.util.ajax.JSON;
|
|||
***************************************************/
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(context="dfs")
|
||||
public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
||||
public class FSNamesystem implements RwLock, FSClusterStats,
|
||||
FSNamesystemMBean, NameNodeMXBean {
|
||||
static final Log LOG = LogFactory.getLog(FSNamesystem.class);
|
||||
|
||||
|
@ -274,9 +277,9 @@ public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
|||
*/
|
||||
private void initialize(Configuration conf, FSImage fsImage)
|
||||
throws IOException {
|
||||
resourceRecheckInterval =
|
||||
conf.getLong(DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
|
||||
resourceRecheckInterval = conf.getLong(
|
||||
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
||||
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
|
||||
nnResourceChecker = new NameNodeResourceChecker(conf);
|
||||
checkAvailableResources();
|
||||
this.systemStart = now();
|
||||
|
@ -323,7 +326,7 @@ public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
|||
}
|
||||
|
||||
public static Collection<URI> getNamespaceDirs(Configuration conf) {
|
||||
return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
||||
return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
|
||||
}
|
||||
|
||||
private static Collection<URI> getStorageDirs(Configuration conf,
|
||||
|
@ -357,7 +360,7 @@ public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
|||
}
|
||||
|
||||
public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
|
||||
return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
|
||||
return getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -429,29 +432,30 @@ public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
|||
|
||||
LOG.info("fsOwner=" + fsOwner);
|
||||
|
||||
this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
|
||||
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
|
||||
this.isPermissionEnabled = conf.getBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
|
||||
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
|
||||
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
|
||||
DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||
LOG.info("supergroup=" + supergroup);
|
||||
LOG.info("isPermissionEnabled=" + isPermissionEnabled);
|
||||
short filePermission = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
|
||||
short filePermission = (short)conf.getInt(DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
|
||||
DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
|
||||
this.defaultPermission = PermissionStatus.createImmutable(
|
||||
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
|
||||
|
||||
this.serverDefaults = new FsServerDefaults(
|
||||
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
|
||||
conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
|
||||
conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DEFAULT_WRITE_PACKET_SIZE),
|
||||
(short) conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DEFAULT_REPLICATION_FACTOR),
|
||||
conf.getInt("io.file.buffer.size", DEFAULT_FILE_BUFFER_SIZE));
|
||||
this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
||||
conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
|
||||
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
|
||||
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
|
||||
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
|
||||
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
|
||||
|
||||
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
|
||||
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
||||
|
||||
this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
|
||||
this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
|
||||
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
|
||||
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
|
||||
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY,
|
||||
DFS_SUPPORT_APPEND_DEFAULT);
|
||||
|
||||
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
|
||||
}
|
||||
|
@ -2798,16 +2802,17 @@ public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
|||
* @param conf configuration
|
||||
*/
|
||||
SafeModeInfo(Configuration conf) {
|
||||
this.threshold = conf.getFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
|
||||
this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
|
||||
DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
|
||||
this.datanodeThreshold = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
|
||||
this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
|
||||
this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
||||
DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
|
||||
DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
|
||||
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
|
||||
this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY,
|
||||
DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
||||
// default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
|
||||
this.replQueueThreshold =
|
||||
conf.getFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
|
||||
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
|
||||
(float) threshold);
|
||||
this.blockTotal = 0;
|
||||
this.blockSafe = 0;
|
||||
|
@ -3977,14 +3982,12 @@ public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
|
|||
private DelegationTokenSecretManager createDelegationTokenSecretManager(
|
||||
Configuration conf) {
|
||||
return new DelegationTokenSecretManager(conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT),
|
||||
conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
|
||||
conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
|
||||
DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
|
||||
DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT),
|
||||
conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
||||
DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
|
||||
conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
||||
DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
|
||||
DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this);
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
|
||||
import static org.apache.hadoop.hdfs.protocol.FSConstants.MAX_PATH_LENGTH;
|
||||
import static org.apache.hadoop.hdfs.protocol.FSConstants.MAX_PATH_DEPTH;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -140,7 +145,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
* NameNode state, for example partial blocksMap etc.
|
||||
**********************************************************/
|
||||
@InterfaceAudience.Private
|
||||
public class NameNode implements NamenodeProtocols, FSConstants {
|
||||
public class NameNode implements NamenodeProtocols {
|
||||
static{
|
||||
HdfsConfiguration.init();
|
||||
}
|
||||
|
@ -718,8 +723,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|||
+src+" for "+clientName+" at "+clientMachine);
|
||||
}
|
||||
if (!checkPathLength(src)) {
|
||||
throw new IOException("create: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
throw new IOException("create: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
namesystem.startFile(src,
|
||||
new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
|
||||
|
@ -896,8 +901,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|||
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
|
||||
}
|
||||
if (!checkPathLength(dst)) {
|
||||
throw new IOException("rename: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
throw new IOException("rename: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
boolean ret = namesystem.renameTo(src, dst);
|
||||
if (ret) {
|
||||
|
@ -918,8 +923,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|||
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
|
||||
}
|
||||
if (!checkPathLength(dst)) {
|
||||
throw new IOException("rename: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
throw new IOException("rename: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
namesystem.renameTo(src, dst, options);
|
||||
metrics.incrFilesRenamed();
|
||||
|
@ -1274,7 +1279,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void verifyVersion(int version) throws IOException {
|
||||
if (version != LAYOUT_VERSION)
|
||||
if (version != FSConstants.LAYOUT_VERSION)
|
||||
throw new IncorrectVersionException(version, "data node");
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ import com.google.common.collect.Lists;
|
|||
/**
|
||||
* This class provides fetching a specified file from the NameNode.
|
||||
*/
|
||||
class TransferFsImage implements FSConstants {
|
||||
class TransferFsImage {
|
||||
|
||||
public final static String CONTENT_LENGTH = "Content-Length";
|
||||
public final static String MD5_HEADER = "X-MD5-Digest";
|
||||
|
@ -124,7 +124,7 @@ class TransferFsImage implements FSConstants {
|
|||
static void getFileServer(OutputStream outstream, File localfile,
|
||||
DataTransferThrottler throttler)
|
||||
throws IOException {
|
||||
byte buf[] = new byte[BUFFER_SIZE];
|
||||
byte buf[] = new byte[FSConstants.IO_FILE_BUFFER_SIZE];
|
||||
FileInputStream infile = null;
|
||||
try {
|
||||
infile = new FileInputStream(localfile);
|
||||
|
@ -139,7 +139,7 @@ class TransferFsImage implements FSConstants {
|
|||
&& localfile.getAbsolutePath().contains("fsimage")) {
|
||||
// Test sending image shorter than localfile
|
||||
long len = localfile.length();
|
||||
buf = new byte[(int)Math.min(len/2, BUFFER_SIZE)];
|
||||
buf = new byte[(int)Math.min(len/2, FSConstants.IO_FILE_BUFFER_SIZE)];
|
||||
// This will read at most half of the image
|
||||
// and the rest of the image will be sent over the wire
|
||||
infile.read(buf);
|
||||
|
@ -179,7 +179,7 @@ class TransferFsImage implements FSConstants {
|
|||
static MD5Hash getFileClient(String nnHostPort,
|
||||
String queryString, List<File> localPaths,
|
||||
NNStorage dstStorage, boolean getChecksum) throws IOException {
|
||||
byte[] buf = new byte[BUFFER_SIZE];
|
||||
byte[] buf = new byte[FSConstants.IO_FILE_BUFFER_SIZE];
|
||||
String proto = UserGroupInformation.isSecurityEnabled() ? "https://" : "http://";
|
||||
StringBuilder str = new StringBuilder(proto+nnHostPort+"/getimage?");
|
||||
str.append(queryString);
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
|
@ -669,7 +670,7 @@ public class DFSTestUtil {
|
|||
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
|
||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(s, writeTimeout),
|
||||
DataNode.SMALL_BUFFER_SIZE));
|
||||
FSConstants.SMALL_BUFFER_SIZE));
|
||||
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
|
||||
|
||||
// send the request
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
||||
public class TestDFSRemove extends junit.framework.TestCase {
|
||||
|
@ -73,7 +72,7 @@ public class TestDFSRemove extends junit.framework.TestCase {
|
|||
fs.delete(a, false);
|
||||
}
|
||||
// wait 3 heartbeat intervals, so that all blocks are deleted.
|
||||
Thread.sleep(3 * FSConstants.HEARTBEAT_INTERVAL * 1000);
|
||||
Thread.sleep(3 * DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000);
|
||||
// all blocks should be gone now.
|
||||
long dfsUsedFinal = getTotalDfsUsed(cluster);
|
||||
assertEquals("All blocks should be gone. start=" + dfsUsedStart
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FsServerDefaults;
|
|||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -100,23 +101,23 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
*/
|
||||
public void testServerDefaults() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, FSConstants.DEFAULT_BLOCK_SIZE);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, FSConstants.DEFAULT_BYTES_PER_CHECKSUM);
|
||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, FSConstants.DEFAULT_WRITE_PACKET_SIZE);
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, FSConstants.DEFAULT_REPLICATION_FACTOR + 1);
|
||||
conf.setInt("io.file.buffer.size", FSConstants.DEFAULT_FILE_BUFFER_SIZE);
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
|
||||
conf.setInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
||||
conf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||
conf.setInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT + 1);
|
||||
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(FSConstants.DEFAULT_REPLICATION_FACTOR + 1)
|
||||
.numDataNodes(DFSConfigKeys.DFS_REPLICATION_DEFAULT + 1)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
try {
|
||||
FsServerDefaults serverDefaults = fs.getServerDefaults();
|
||||
assertEquals(FSConstants.DEFAULT_BLOCK_SIZE, serverDefaults.getBlockSize());
|
||||
assertEquals(FSConstants.DEFAULT_BYTES_PER_CHECKSUM, serverDefaults.getBytesPerChecksum());
|
||||
assertEquals(FSConstants.DEFAULT_WRITE_PACKET_SIZE, serverDefaults.getWritePacketSize());
|
||||
assertEquals(FSConstants.DEFAULT_REPLICATION_FACTOR + 1, serverDefaults.getReplication());
|
||||
assertEquals(FSConstants.DEFAULT_FILE_BUFFER_SIZE, serverDefaults.getFileBufferSize());
|
||||
assertEquals(DFS_BLOCK_SIZE_DEFAULT, serverDefaults.getBlockSize());
|
||||
assertEquals(DFS_BYTES_PER_CHECKSUM_DEFAULT, serverDefaults.getBytesPerChecksum());
|
||||
assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, serverDefaults.getWritePacketSize());
|
||||
assertEquals(DFS_REPLICATION_DEFAULT + 1, serverDefaults.getReplication());
|
||||
assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, serverDefaults.getFileBufferSize());
|
||||
} finally {
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
@ -269,8 +270,8 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
*/
|
||||
public void testFileCreationError1() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
if (simulatedStorage) {
|
||||
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
|
||||
}
|
||||
|
@ -343,8 +344,8 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
long leasePeriod = 1000;
|
||||
System.out.println("testFileCreationError2 start");
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
if (simulatedStorage) {
|
||||
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
|
||||
}
|
||||
|
@ -412,8 +413,8 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
Configuration conf = new HdfsConfiguration();
|
||||
final int MAX_IDLE_TIME = 2000; // 2s
|
||||
conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
if (simulatedStorage) {
|
||||
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
|
||||
}
|
||||
|
@ -724,7 +725,7 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
*/
|
||||
public void testFileCreationSyncOnClose() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, true);
|
||||
conf.setBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, true);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
try {
|
||||
|
@ -763,8 +764,8 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
final int DATANODE_NUM = 3;
|
||||
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
|
||||
// create cluster
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
|
||||
|
@ -853,7 +854,7 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
final int DATANODE_NUM = 3;
|
||||
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 3);
|
||||
conf.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 3);
|
||||
conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 seconds
|
||||
conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ public class TestFileStatus {
|
|||
int fileSize, int blockSize) throws IOException {
|
||||
// Create and write a file that contains three blocks of data
|
||||
FSDataOutputStream stm = fileSys.create(name, true,
|
||||
FSConstants.BUFFER_SIZE, (short)repl, (long)blockSize);
|
||||
FSConstants.IO_FILE_BUFFER_SIZE, (short)repl, (long)blockSize);
|
||||
byte[] buffer = new byte[fileSize];
|
||||
Random rand = new Random(seed);
|
||||
rand.nextBytes(buffer);
|
||||
|
|
|
@ -63,7 +63,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|||
* Note the synchronization is coarse grained - it is at each method.
|
||||
*/
|
||||
|
||||
public class SimulatedFSDataset implements FSConstants, FSDatasetInterface, Configurable{
|
||||
public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
|
||||
|
||||
public static final String CONFIG_PROPERTY_SIMULATED =
|
||||
"dfs.datanode.simulateddatastorage";
|
||||
|
|
Loading…
Reference in New Issue