HDFS-2562. Refactor DN configuration variables out of DataNode class. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1203542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1f0fc2da93
commit
a27f99c4a0
|
@ -14,6 +14,9 @@ Release 0.23.1 - UNRELEASED
|
||||||
|
|
||||||
HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite)
|
HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite)
|
||||||
|
|
||||||
|
HDFS-2562. Refactor DN configuration variables out of DataNode class
|
||||||
|
(todd)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2130. Switch default checksum to CRC32C. (todd)
|
HDFS-2130. Switch default checksum to CRC32C. (todd)
|
||||||
|
|
|
@ -185,8 +185,8 @@ class BlockReceiver implements Closeable {
|
||||||
" while receiving block " + block + " from " + inAddr);
|
" while receiving block " + block + " from " + inAddr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
|
this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
|
||||||
this.syncBehindWrites = datanode.shouldSyncBehindWrites();
|
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
|
||||||
|
|
||||||
final boolean isCreate = isDatanode || isTransfer
|
final boolean isCreate = isDatanode || isTransfer
|
||||||
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||||
|
@ -249,7 +249,7 @@ class BlockReceiver implements Closeable {
|
||||||
try {
|
try {
|
||||||
if (checksumOut != null) {
|
if (checksumOut != null) {
|
||||||
checksumOut.flush();
|
checksumOut.flush();
|
||||||
if (datanode.syncOnClose && (cout instanceof FileOutputStream)) {
|
if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
|
||||||
((FileOutputStream)cout).getChannel().force(true);
|
((FileOutputStream)cout).getChannel().force(true);
|
||||||
}
|
}
|
||||||
checksumOut.close();
|
checksumOut.close();
|
||||||
|
@ -265,7 +265,7 @@ class BlockReceiver implements Closeable {
|
||||||
try {
|
try {
|
||||||
if (out != null) {
|
if (out != null) {
|
||||||
out.flush();
|
out.flush();
|
||||||
if (datanode.syncOnClose && (out instanceof FileOutputStream)) {
|
if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
|
||||||
((FileOutputStream)out).getChannel().force(true);
|
((FileOutputStream)out).getChannel().force(true);
|
||||||
}
|
}
|
||||||
out.close();
|
out.close();
|
||||||
|
@ -435,7 +435,7 @@ class BlockReceiver implements Closeable {
|
||||||
* calculation in DFSClient to make the guess accurate.
|
* calculation in DFSClient to make the guess accurate.
|
||||||
*/
|
*/
|
||||||
int chunkSize = bytesPerChecksum + checksumSize;
|
int chunkSize = bytesPerChecksum + checksumSize;
|
||||||
int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN
|
int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
|
||||||
+ chunkSize - 1)/chunkSize;
|
+ chunkSize - 1)/chunkSize;
|
||||||
buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
|
buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
|
||||||
Math.max(chunksPerPacket, 1) * chunkSize);
|
Math.max(chunksPerPacket, 1) * chunkSize);
|
||||||
|
|
|
@ -185,8 +185,8 @@ class BlockSender implements java.io.Closeable {
|
||||||
this.corruptChecksumOk = corruptChecksumOk;
|
this.corruptChecksumOk = corruptChecksumOk;
|
||||||
this.verifyChecksum = verifyChecksum;
|
this.verifyChecksum = verifyChecksum;
|
||||||
this.clientTraceFmt = clientTraceFmt;
|
this.clientTraceFmt = clientTraceFmt;
|
||||||
this.readaheadLength = datanode.getReadaheadLength();
|
this.readaheadLength = datanode.getDnConf().readaheadLength;
|
||||||
this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
|
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
|
||||||
|
|
||||||
synchronized(datanode.data) {
|
synchronized(datanode.data) {
|
||||||
this.replica = getReplica(block, datanode);
|
this.replica = getReplica(block, datanode);
|
||||||
|
@ -215,7 +215,7 @@ class BlockSender implements java.io.Closeable {
|
||||||
|
|
||||||
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
|
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
|
||||||
// use normal transfer in those cases
|
// use normal transfer in those cases
|
||||||
this.transferToAllowed = datanode.transferToAllowed &&
|
this.transferToAllowed = datanode.getDnConf().transferToAllowed &&
|
||||||
(!is32Bit || length <= Integer.MAX_VALUE);
|
(!is32Bit || length <= Integer.MAX_VALUE);
|
||||||
|
|
||||||
DataChecksum csum;
|
DataChecksum csum;
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple class encapsulating all of the configuration that the DataNode
|
||||||
|
* loads at startup time.
|
||||||
|
*/
|
||||||
|
class DNConf {
|
||||||
|
final int socketTimeout;
|
||||||
|
final int socketWriteTimeout;
|
||||||
|
final int socketKeepaliveTimeout;
|
||||||
|
|
||||||
|
final boolean transferToAllowed;
|
||||||
|
final boolean dropCacheBehindWrites;
|
||||||
|
final boolean syncBehindWrites;
|
||||||
|
final boolean dropCacheBehindReads;
|
||||||
|
final boolean syncOnClose;
|
||||||
|
|
||||||
|
|
||||||
|
final long readaheadLength;
|
||||||
|
final long heartBeatInterval;
|
||||||
|
final long blockReportInterval;
|
||||||
|
final long initialBlockReportDelay;
|
||||||
|
final int writePacketSize;
|
||||||
|
|
||||||
|
public DNConf(Configuration conf) {
|
||||||
|
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||||
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
|
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
||||||
|
HdfsServerConstants.WRITE_TIMEOUT);
|
||||||
|
socketKeepaliveTimeout = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
|
||||||
|
|
||||||
|
/* Based on results on different platforms, we might need set the default
|
||||||
|
* to false on some of them. */
|
||||||
|
transferToAllowed = conf.getBoolean(
|
||||||
|
DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
|
||||||
|
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
|
||||||
|
|
||||||
|
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||||
|
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||||
|
|
||||||
|
readaheadLength = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
dropCacheBehindWrites = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
|
||||||
|
syncBehindWrites = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
|
||||||
|
dropCacheBehindReads = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
|
||||||
|
|
||||||
|
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
||||||
|
|
||||||
|
long initBRDelay = conf.getLong(
|
||||||
|
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||||
|
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
|
||||||
|
if (initBRDelay >= blockReportInterval) {
|
||||||
|
initBRDelay = 0;
|
||||||
|
DataNode.LOG.info("dfs.blockreport.initialDelay is greater than " +
|
||||||
|
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
|
||||||
|
}
|
||||||
|
initialBlockReportDelay = initBRDelay;
|
||||||
|
|
||||||
|
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
|
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
||||||
|
|
||||||
|
// do we need to sync block file contents to disk when blockfile is closed?
|
||||||
|
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
||||||
|
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,15 +19,8 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
|
@ -51,17 +44,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOUR
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||||
|
@ -104,7 +90,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
@ -396,9 +381,7 @@ public class DataNode extends Configured
|
||||||
AtomicInteger xmitsInProgress = new AtomicInteger();
|
AtomicInteger xmitsInProgress = new AtomicInteger();
|
||||||
Daemon dataXceiverServer = null;
|
Daemon dataXceiverServer = null;
|
||||||
ThreadGroup threadGroup = null;
|
ThreadGroup threadGroup = null;
|
||||||
long blockReportInterval;
|
private DNConf dnConf;
|
||||||
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
|
|
||||||
long heartBeatInterval;
|
|
||||||
private boolean heartbeatsDisabledForTests = false;
|
private boolean heartbeatsDisabledForTests = false;
|
||||||
private DataStorage storage = null;
|
private DataStorage storage = null;
|
||||||
private HttpServer infoServer = null;
|
private HttpServer infoServer = null;
|
||||||
|
@ -408,18 +391,9 @@ public class DataNode extends Configured
|
||||||
private volatile String hostName; // Host name of this datanode
|
private volatile String hostName; // Host name of this datanode
|
||||||
|
|
||||||
private static String dnThreadName;
|
private static String dnThreadName;
|
||||||
int socketTimeout;
|
|
||||||
int socketWriteTimeout = 0;
|
|
||||||
boolean transferToAllowed = true;
|
|
||||||
private boolean dropCacheBehindWrites = false;
|
|
||||||
private boolean syncBehindWrites = false;
|
|
||||||
private boolean dropCacheBehindReads = false;
|
|
||||||
private long readaheadLength = 0;
|
|
||||||
|
|
||||||
int writePacketSize = 0;
|
|
||||||
boolean isBlockTokenEnabled;
|
boolean isBlockTokenEnabled;
|
||||||
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
|
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
|
||||||
boolean syncOnClose;
|
|
||||||
|
|
||||||
public DataBlockScanner blockScanner = null;
|
public DataBlockScanner blockScanner = null;
|
||||||
private DirectoryScanner directoryScanner = null;
|
private DirectoryScanner directoryScanner = null;
|
||||||
|
@ -487,49 +461,6 @@ public class DataNode extends Configured
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initConfig(Configuration conf) {
|
|
||||||
this.socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
|
||||||
this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
|
||||||
HdfsServerConstants.WRITE_TIMEOUT);
|
|
||||||
/* Based on results on different platforms, we might need set the default
|
|
||||||
* to false on some of them. */
|
|
||||||
this.transferToAllowed = conf.getBoolean(
|
|
||||||
DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
|
|
||||||
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
|
|
||||||
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
||||||
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
||||||
|
|
||||||
this.readaheadLength = conf.getLong(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
|
||||||
this.dropCacheBehindWrites = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
|
|
||||||
this.syncBehindWrites = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
|
|
||||||
this.dropCacheBehindReads = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
|
|
||||||
|
|
||||||
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
|
||||||
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
|
||||||
this.initialBlockReportDelay = conf.getLong(
|
|
||||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
|
||||||
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
|
|
||||||
if (this.initialBlockReportDelay >= blockReportInterval) {
|
|
||||||
this.initialBlockReportDelay = 0;
|
|
||||||
LOG.info("dfs.blockreport.initialDelay is greater than " +
|
|
||||||
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
|
|
||||||
}
|
|
||||||
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
|
||||||
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
|
||||||
|
|
||||||
// do we need to sync block file contents to disk when blockfile is closed?
|
|
||||||
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
|
||||||
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startInfoServer(Configuration conf) throws IOException {
|
private void startInfoServer(Configuration conf) throws IOException {
|
||||||
// create a servlet to serve full-file content
|
// create a servlet to serve full-file content
|
||||||
|
@ -688,7 +619,7 @@ public class DataNode extends Configured
|
||||||
// find free port or use privileged port provided
|
// find free port or use privileged port provided
|
||||||
ServerSocket ss;
|
ServerSocket ss;
|
||||||
if(secureResources == null) {
|
if(secureResources == null) {
|
||||||
ss = (socketWriteTimeout > 0) ?
|
ss = (dnConf.socketWriteTimeout > 0) ?
|
||||||
ServerSocketChannel.open().socket() : new ServerSocket();
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
||||||
Server.bind(ss, socAddr, 0);
|
Server.bind(ss, socAddr, 0);
|
||||||
} else {
|
} else {
|
||||||
|
@ -760,11 +691,13 @@ public class DataNode extends Configured
|
||||||
private volatile boolean shouldServiceRun = true;
|
private volatile boolean shouldServiceRun = true;
|
||||||
UpgradeManagerDatanode upgradeManager = null;
|
UpgradeManagerDatanode upgradeManager = null;
|
||||||
private final DataNode dn;
|
private final DataNode dn;
|
||||||
|
private final DNConf dnConf;
|
||||||
|
|
||||||
BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
|
BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
|
||||||
this.dn = dn;
|
this.dn = dn;
|
||||||
this.bpRegistration = dn.createRegistration();
|
this.bpRegistration = dn.createRegistration();
|
||||||
this.nnAddr = nnAddr;
|
this.nnAddr = nnAddr;
|
||||||
|
this.dnConf = dn.getDnConf();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -866,9 +799,9 @@ public class DataNode extends Configured
|
||||||
void scheduleBlockReport(long delay) {
|
void scheduleBlockReport(long delay) {
|
||||||
if (delay > 0) { // send BR after random delay
|
if (delay > 0) { // send BR after random delay
|
||||||
lastBlockReport = System.currentTimeMillis()
|
lastBlockReport = System.currentTimeMillis()
|
||||||
- ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
||||||
} else { // send at next heartbeat
|
} else { // send at next heartbeat
|
||||||
lastBlockReport = lastHeartbeat - dn.blockReportInterval;
|
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
||||||
}
|
}
|
||||||
resetBlockReportTime = true; // reset future BRs for randomness
|
resetBlockReportTime = true; // reset future BRs for randomness
|
||||||
}
|
}
|
||||||
|
@ -965,7 +898,7 @@ public class DataNode extends Configured
|
||||||
// send block report if timer has expired.
|
// send block report if timer has expired.
|
||||||
DatanodeCommand cmd = null;
|
DatanodeCommand cmd = null;
|
||||||
long startTime = now();
|
long startTime = now();
|
||||||
if (startTime - lastBlockReport > dn.blockReportInterval) {
|
if (startTime - lastBlockReport > dnConf.blockReportInterval) {
|
||||||
|
|
||||||
// Create block report
|
// Create block report
|
||||||
long brCreateStartTime = now();
|
long brCreateStartTime = now();
|
||||||
|
@ -987,7 +920,7 @@ public class DataNode extends Configured
|
||||||
// If we have sent the first block report, then wait a random
|
// If we have sent the first block report, then wait a random
|
||||||
// time before we start the periodic block reports.
|
// time before we start the periodic block reports.
|
||||||
if (resetBlockReportTime) {
|
if (resetBlockReportTime) {
|
||||||
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval));
|
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
|
||||||
resetBlockReportTime = false;
|
resetBlockReportTime = false;
|
||||||
} else {
|
} else {
|
||||||
/* say the last block report was at 8:20:14. The current report
|
/* say the last block report was at 8:20:14. The current report
|
||||||
|
@ -997,7 +930,7 @@ public class DataNode extends Configured
|
||||||
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
||||||
*/
|
*/
|
||||||
lastBlockReport += (now() - lastBlockReport) /
|
lastBlockReport += (now() - lastBlockReport) /
|
||||||
dn.blockReportInterval * dn.blockReportInterval;
|
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
||||||
}
|
}
|
||||||
LOG.info("sent block report, processed command:" + cmd);
|
LOG.info("sent block report, processed command:" + cmd);
|
||||||
}
|
}
|
||||||
|
@ -1059,9 +992,9 @@ public class DataNode extends Configured
|
||||||
*/
|
*/
|
||||||
private void offerService() throws Exception {
|
private void offerService() throws Exception {
|
||||||
LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
|
LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
|
||||||
+ dn.blockReportInterval + "msec" + " Initial delay: "
|
+ dnConf.blockReportInterval + "msec" + " Initial delay: "
|
||||||
+ dn.initialBlockReportDelay + "msec" + "; heartBeatInterval="
|
+ dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
|
||||||
+ dn.heartBeatInterval);
|
+ dnConf.heartBeatInterval);
|
||||||
|
|
||||||
//
|
//
|
||||||
// Now loop for a long time....
|
// Now loop for a long time....
|
||||||
|
@ -1073,7 +1006,7 @@ public class DataNode extends Configured
|
||||||
//
|
//
|
||||||
// Every so often, send heartbeat or block-report
|
// Every so often, send heartbeat or block-report
|
||||||
//
|
//
|
||||||
if (startTime - lastHeartbeat > dn.heartBeatInterval) {
|
if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
|
||||||
//
|
//
|
||||||
// All heartbeat messages include following info:
|
// All heartbeat messages include following info:
|
||||||
// -- Datanode name
|
// -- Datanode name
|
||||||
|
@ -1111,7 +1044,7 @@ public class DataNode extends Configured
|
||||||
// There is no work to do; sleep until hearbeat timer elapses,
|
// There is no work to do; sleep until hearbeat timer elapses,
|
||||||
// or work arrives, and then iterate again.
|
// or work arrives, and then iterate again.
|
||||||
//
|
//
|
||||||
long waitTime = dn.heartBeatInterval -
|
long waitTime = dnConf.heartBeatInterval -
|
||||||
(System.currentTimeMillis() - lastHeartbeat);
|
(System.currentTimeMillis() - lastHeartbeat);
|
||||||
synchronized(receivedBlockList) {
|
synchronized(receivedBlockList) {
|
||||||
if (waitTime > 0 && receivedBlockList.size() == 0) {
|
if (waitTime > 0 && receivedBlockList.size() == 0) {
|
||||||
|
@ -1134,7 +1067,7 @@ public class DataNode extends Configured
|
||||||
}
|
}
|
||||||
LOG.warn("RemoteException in offerService", re);
|
LOG.warn("RemoteException in offerService", re);
|
||||||
try {
|
try {
|
||||||
long sleepTime = Math.min(1000, dn.heartBeatInterval);
|
long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -1202,7 +1135,7 @@ public class DataNode extends Configured
|
||||||
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
|
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
|
||||||
|
|
||||||
// random short delay - helps scatter the BR from all DNs
|
// random short delay - helps scatter the BR from all DNs
|
||||||
scheduleBlockReport(dn.initialBlockReportDelay);
|
scheduleBlockReport(dnConf.initialBlockReportDelay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1412,11 +1345,11 @@ public class DataNode extends Configured
|
||||||
this.secureResources = resources;
|
this.secureResources = resources;
|
||||||
this.dataDirs = dataDirs;
|
this.dataDirs = dataDirs;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
this.dnConf = new DNConf(conf);
|
||||||
|
|
||||||
storage = new DataStorage();
|
storage = new DataStorage();
|
||||||
|
|
||||||
// global DN settings
|
// global DN settings
|
||||||
initConfig(conf);
|
|
||||||
registerMXBean();
|
registerMXBean();
|
||||||
initDataXceiver(conf);
|
initDataXceiver(conf);
|
||||||
startInfoServer(conf);
|
startInfoServer(conf);
|
||||||
|
@ -1664,7 +1597,7 @@ public class DataNode extends Configured
|
||||||
* Creates either NIO or regular depending on socketWriteTimeout.
|
* Creates either NIO or regular depending on socketWriteTimeout.
|
||||||
*/
|
*/
|
||||||
protected Socket newSocket() throws IOException {
|
protected Socket newSocket() throws IOException {
|
||||||
return (socketWriteTimeout > 0) ?
|
return (dnConf.socketWriteTimeout > 0) ?
|
||||||
SocketChannel.open().socket() : new Socket();
|
SocketChannel.open().socket() : new Socket();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2091,10 +2024,10 @@ public class DataNode extends Configured
|
||||||
InetSocketAddress curTarget =
|
InetSocketAddress curTarget =
|
||||||
NetUtils.createSocketAddr(targets[0].getName());
|
NetUtils.createSocketAddr(targets[0].getName());
|
||||||
sock = newSocket();
|
sock = newSocket();
|
||||||
NetUtils.connect(sock, curTarget, socketTimeout);
|
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
|
||||||
sock.setSoTimeout(targets.length * socketTimeout);
|
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
|
||||||
|
|
||||||
long writeTimeout = socketWriteTimeout +
|
long writeTimeout = dnConf.socketWriteTimeout +
|
||||||
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
||||||
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
|
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
|
||||||
out = new DataOutputStream(new BufferedOutputStream(baseStream,
|
out = new DataOutputStream(new BufferedOutputStream(baseStream,
|
||||||
|
@ -2537,7 +2470,7 @@ public class DataNode extends Configured
|
||||||
DatanodeRegistration bpReg = bpos.bpRegistration;
|
DatanodeRegistration bpReg = bpos.bpRegistration;
|
||||||
InterDatanodeProtocol datanode = bpReg.equals(id)?
|
InterDatanodeProtocol datanode = bpReg.equals(id)?
|
||||||
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
|
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
|
||||||
socketTimeout);
|
dnConf.socketTimeout);
|
||||||
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
|
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
|
||||||
if (info != null &&
|
if (info != null &&
|
||||||
info.getGenerationStamp() >= block.getGenerationStamp() &&
|
info.getGenerationStamp() >= block.getGenerationStamp() &&
|
||||||
|
@ -2926,20 +2859,8 @@ public class DataNode extends Configured
|
||||||
(DataXceiverServer) this.dataXceiverServer.getRunnable();
|
(DataXceiverServer) this.dataXceiverServer.getRunnable();
|
||||||
return dxcs.balanceThrottler.getBandwidth();
|
return dxcs.balanceThrottler.getBandwidth();
|
||||||
}
|
}
|
||||||
|
|
||||||
long getReadaheadLength() {
|
|
||||||
return readaheadLength;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean shouldDropCacheBehindWrites() {
|
|
||||||
return dropCacheBehindWrites;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean shouldDropCacheBehindReads() {
|
|
||||||
return dropCacheBehindReads;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean shouldSyncBehindWrites() {
|
DNConf getDnConf() {
|
||||||
return syncBehindWrites;
|
return dnConf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,6 @@ import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
@ -82,9 +81,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
private final String remoteAddress; // address of remote side
|
private final String remoteAddress; // address of remote side
|
||||||
private final String localAddress; // local address of this daemon
|
private final String localAddress; // local address of this daemon
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
|
private final DNConf dnConf;
|
||||||
private final DataXceiverServer dataXceiverServer;
|
private final DataXceiverServer dataXceiverServer;
|
||||||
|
|
||||||
private int socketKeepaliveTimeout;
|
|
||||||
private long opStartTime; //the start time of receiving an Op
|
private long opStartTime; //the start time of receiving an Op
|
||||||
|
|
||||||
public DataXceiver(Socket s, DataNode datanode,
|
public DataXceiver(Socket s, DataNode datanode,
|
||||||
|
@ -95,14 +94,11 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
this.s = s;
|
this.s = s;
|
||||||
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
|
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
|
this.dnConf = datanode.getDnConf();
|
||||||
this.dataXceiverServer = dataXceiverServer;
|
this.dataXceiverServer = dataXceiverServer;
|
||||||
remoteAddress = s.getRemoteSocketAddress().toString();
|
remoteAddress = s.getRemoteSocketAddress().toString();
|
||||||
localAddress = s.getLocalSocketAddress().toString();
|
localAddress = s.getLocalSocketAddress().toString();
|
||||||
|
|
||||||
socketKeepaliveTimeout = datanode.getConf().getInt(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Number of active connections is: "
|
LOG.debug("Number of active connections is: "
|
||||||
+ datanode.getXceiverCount());
|
+ datanode.getXceiverCount());
|
||||||
|
@ -144,8 +140,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (opsProcessed != 0) {
|
if (opsProcessed != 0) {
|
||||||
assert socketKeepaliveTimeout > 0;
|
assert dnConf.socketKeepaliveTimeout > 0;
|
||||||
s.setSoTimeout(socketKeepaliveTimeout);
|
s.setSoTimeout(dnConf.socketKeepaliveTimeout);
|
||||||
}
|
}
|
||||||
op = readOp();
|
op = readOp();
|
||||||
} catch (InterruptedIOException ignored) {
|
} catch (InterruptedIOException ignored) {
|
||||||
|
@ -180,7 +176,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
opStartTime = now();
|
opStartTime = now();
|
||||||
processOp(op);
|
processOp(op);
|
||||||
++opsProcessed;
|
++opsProcessed;
|
||||||
} while (!s.isClosed() && socketKeepaliveTimeout > 0);
|
} while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error(datanode.getMachineName() + ":DataXceiver error processing " +
|
LOG.error(datanode.getMachineName() + ":DataXceiver error processing " +
|
||||||
((op == null) ? "unknown" : op.name()) + " operation " +
|
((op == null) ? "unknown" : op.name()) + " operation " +
|
||||||
|
@ -205,7 +201,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final long blockOffset,
|
final long blockOffset,
|
||||||
final long length) throws IOException {
|
final long length) throws IOException {
|
||||||
OutputStream baseStream = NetUtils.getOutputStream(s,
|
OutputStream baseStream = NetUtils.getOutputStream(s,
|
||||||
datanode.socketWriteTimeout);
|
dnConf.socketWriteTimeout);
|
||||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||||
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
checkAccess(out, true, block, blockToken,
|
checkAccess(out, true, block, blockToken,
|
||||||
|
@ -231,13 +227,13 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
String msg = "opReadBlock " + block + " received exception " + e;
|
String msg = "opReadBlock " + block + " received exception " + e;
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
|
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send op status
|
// send op status
|
||||||
writeSuccessWithChecksumInfo(blockSender,
|
writeSuccessWithChecksumInfo(blockSender,
|
||||||
getStreamWithTimeout(s, datanode.socketWriteTimeout));
|
getStreamWithTimeout(s, dnConf.socketWriteTimeout));
|
||||||
|
|
||||||
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
||||||
|
|
||||||
|
@ -335,7 +331,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
// reply to upstream datanode or client
|
// reply to upstream datanode or client
|
||||||
final DataOutputStream replyOut = new DataOutputStream(
|
final DataOutputStream replyOut = new DataOutputStream(
|
||||||
new BufferedOutputStream(
|
new BufferedOutputStream(
|
||||||
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
|
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
|
||||||
HdfsConstants.SMALL_BUFFER_SIZE));
|
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
checkAccess(replyOut, isClient, block, blockToken,
|
checkAccess(replyOut, isClient, block, blockToken,
|
||||||
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
|
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
|
||||||
|
@ -370,9 +366,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
||||||
mirrorSock = datanode.newSocket();
|
mirrorSock = datanode.newSocket();
|
||||||
try {
|
try {
|
||||||
int timeoutValue = datanode.socketTimeout
|
int timeoutValue = dnConf.socketTimeout
|
||||||
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
||||||
int writeTimeout = datanode.socketWriteTimeout +
|
int writeTimeout = dnConf.socketWriteTimeout +
|
||||||
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
|
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
|
||||||
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
|
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
|
||||||
mirrorSock.setSoTimeout(timeoutValue);
|
mirrorSock.setSoTimeout(timeoutValue);
|
||||||
|
@ -508,7 +504,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
||||||
|
|
||||||
final DataOutputStream out = new DataOutputStream(
|
final DataOutputStream out = new DataOutputStream(
|
||||||
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
|
||||||
try {
|
try {
|
||||||
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
|
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
|
||||||
writeResponse(Status.SUCCESS, null, out);
|
writeResponse(Status.SUCCESS, null, out);
|
||||||
|
@ -521,7 +517,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
public void blockChecksum(final ExtendedBlock block,
|
public void blockChecksum(final ExtendedBlock block,
|
||||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||||
final DataOutputStream out = new DataOutputStream(
|
final DataOutputStream out = new DataOutputStream(
|
||||||
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
|
||||||
checkAccess(out, true, block, blockToken,
|
checkAccess(out, true, block, blockToken,
|
||||||
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
|
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
|
||||||
updateCurrentThreadName("Reading metadata for block " + block);
|
updateCurrentThreadName("Reading metadata for block " + block);
|
||||||
|
@ -581,7 +577,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
LOG.warn("Invalid access token in request from " + remoteAddress
|
||||||
+ " for OP_COPY_BLOCK for block " + block + " : "
|
+ " for OP_COPY_BLOCK for block " + block + " : "
|
||||||
+ e.getLocalizedMessage());
|
+ e.getLocalizedMessage());
|
||||||
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout);
|
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,7 +587,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
String msg = "Not able to copy block " + block.getBlockId() + " to "
|
String msg = "Not able to copy block " + block.getBlockId() + " to "
|
||||||
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
|
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
|
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -606,7 +602,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
// set up response stream
|
// set up response stream
|
||||||
OutputStream baseStream = NetUtils.getOutputStream(
|
OutputStream baseStream = NetUtils.getOutputStream(
|
||||||
s, datanode.socketWriteTimeout);
|
s, dnConf.socketWriteTimeout);
|
||||||
reply = new DataOutputStream(new BufferedOutputStream(
|
reply = new DataOutputStream(new BufferedOutputStream(
|
||||||
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
|
|
||||||
|
@ -659,7 +655,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
||||||
+ e.getLocalizedMessage());
|
+ e.getLocalizedMessage());
|
||||||
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
|
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
|
||||||
datanode.socketWriteTimeout);
|
dnConf.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -668,7 +664,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
String msg = "Not able to receive block " + block.getBlockId() + " from "
|
String msg = "Not able to receive block " + block.getBlockId() + " from "
|
||||||
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
|
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
|
||||||
LOG.warn(msg);
|
LOG.warn(msg);
|
||||||
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
|
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -684,11 +680,11 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
|
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
|
||||||
proxySource.getName());
|
proxySource.getName());
|
||||||
proxySock = datanode.newSocket();
|
proxySock = datanode.newSocket();
|
||||||
NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
|
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
|
||||||
proxySock.setSoTimeout(datanode.socketTimeout);
|
proxySock.setSoTimeout(dnConf.socketTimeout);
|
||||||
|
|
||||||
OutputStream baseStream = NetUtils.getOutputStream(proxySock,
|
OutputStream baseStream = NetUtils.getOutputStream(proxySock,
|
||||||
datanode.socketWriteTimeout);
|
dnConf.socketWriteTimeout);
|
||||||
proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
|
proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
|
||||||
HdfsConstants.SMALL_BUFFER_SIZE));
|
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
|
|
||||||
|
@ -750,7 +746,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
// send response back
|
// send response back
|
||||||
try {
|
try {
|
||||||
sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
|
sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
||||||
}
|
}
|
||||||
|
@ -826,7 +822,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (reply) {
|
if (reply) {
|
||||||
if (out == null) {
|
if (out == null) {
|
||||||
out = new DataOutputStream(
|
out = new DataOutputStream(
|
||||||
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
|
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class TestInterDatanodeProtocol {
|
||||||
//connect to a data node
|
//connect to a data node
|
||||||
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
|
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
|
||||||
InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
|
InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
|
||||||
datanodeinfo[0], conf, datanode.socketTimeout);
|
datanodeinfo[0], conf, datanode.getDnConf().socketTimeout);
|
||||||
assertTrue(datanode != null);
|
assertTrue(datanode != null);
|
||||||
|
|
||||||
//stop block scanner, so we could compare lastScanTime
|
//stop block scanner, so we could compare lastScanTime
|
||||||
|
|
Loading…
Reference in New Issue