HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client module. Contributed by Mingliang Liu.
This commit is contained in:
parent
6a5220cab3
commit
8f712dd2f0
|
@ -23,15 +23,10 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
|
@ -46,9 +41,6 @@ public class PipelineAck {
|
||||||
public final static long UNKOWN_SEQNO = -2;
|
public final static long UNKOWN_SEQNO = -2;
|
||||||
final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
|
final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
|
||||||
final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
|
final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
|
||||||
final static int NUM_OOB_TYPES = OOB_END - OOB_START + 1;
|
|
||||||
// place holder for timeout value of each OOB type
|
|
||||||
final static long[] OOB_TIMEOUT;
|
|
||||||
|
|
||||||
public enum ECN {
|
public enum ECN {
|
||||||
DISABLED(0),
|
DISABLED(0),
|
||||||
|
@ -99,16 +91,6 @@ public class PipelineAck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static {
|
|
||||||
OOB_TIMEOUT = new long[NUM_OOB_TYPES];
|
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
|
||||||
String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
|
|
||||||
DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
|
|
||||||
for (int i = 0; i < NUM_OOB_TYPES; i++) {
|
|
||||||
OOB_TIMEOUT[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** default constructor **/
|
/** default constructor **/
|
||||||
public PipelineAck() {
|
public PipelineAck() {
|
||||||
}
|
}
|
||||||
|
@ -216,19 +198,6 @@ public class PipelineAck {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the timeout to be used for transmitting the OOB type
|
|
||||||
* @return the timeout in milliseconds
|
|
||||||
*/
|
|
||||||
public static long getOOBTimeout(Status status) throws IOException {
|
|
||||||
int index = status.getNumber() - OOB_START;
|
|
||||||
if (index >= 0 && index < NUM_OOB_TYPES) {
|
|
||||||
return OOB_TIMEOUT[index];
|
|
||||||
}
|
|
||||||
// Not an OOB.
|
|
||||||
throw new IOException("Not an OOB status: " + status);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get the Restart OOB ack status */
|
/** Get the Restart OOB ack status */
|
||||||
public static Status getRestartOOBStatus() {
|
public static Status getRestartOOBStatus() {
|
||||||
return Status.OOB_RESTART;
|
return Status.OOB_RESTART;
|
|
@ -543,6 +543,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-9021. Use a yellow elephant rather than a blue one in diagram. (wang)
|
HDFS-9021. Use a yellow elephant rather than a blue one in diagram. (wang)
|
||||||
|
|
||||||
|
HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to
|
||||||
|
hadoop-hdfs-client module. (Mingliang Liu via wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -1153,7 +1153,7 @@ class BlockReceiver implements Closeable {
|
||||||
|
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
if (sending) {
|
if (sending) {
|
||||||
wait(PipelineAck.getOOBTimeout(ackStatus));
|
wait(datanode.getOOBTimeout(ackStatus));
|
||||||
// Didn't get my turn in time. Give up.
|
// Didn't get my turn in time. Give up.
|
||||||
if (sending) {
|
if (sending) {
|
||||||
throw new IOException("Could not send OOB reponse in time: "
|
throw new IOException("Could not send OOB reponse in time: "
|
||||||
|
|
|
@ -40,6 +40,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||||
|
@ -362,6 +364,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
.availableProcessors();
|
.availableProcessors();
|
||||||
private static final double CONGESTION_RATIO = 1.5;
|
private static final double CONGESTION_RATIO = 1.5;
|
||||||
|
|
||||||
|
private long[] oobTimeouts; /** timeout value of each OOB type */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a dummy DataNode for testing purpose.
|
* Creates a dummy DataNode for testing purpose.
|
||||||
*/
|
*/
|
||||||
|
@ -377,6 +381,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
this.connectToDnViaHostname = false;
|
this.connectToDnViaHostname = false;
|
||||||
this.getHdfsBlockLocationsEnabled = false;
|
this.getHdfsBlockLocationsEnabled = false;
|
||||||
this.pipelineSupportECN = false;
|
this.pipelineSupportECN = false;
|
||||||
|
initOOBTimeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -453,6 +458,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
initOOBTimeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ReconfigurableBase
|
@Override // ReconfigurableBase
|
||||||
|
@ -3256,4 +3263,35 @@ public class DataNode extends ReconfigurableBase
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
spanReceiverHost.removeSpanReceiver(id);
|
spanReceiverHost.removeSpanReceiver(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get timeout value of each OOB type from configuration
|
||||||
|
*/
|
||||||
|
private void initOOBTimeout() {
|
||||||
|
final int oobStart = Status.OOB_RESTART_VALUE; // the first OOB type
|
||||||
|
final int oobEnd = Status.OOB_RESERVED3_VALUE; // the last OOB type
|
||||||
|
final int numOobTypes = oobEnd - oobStart + 1;
|
||||||
|
oobTimeouts = new long[numOobTypes];
|
||||||
|
|
||||||
|
final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
|
||||||
|
DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
|
||||||
|
for (int i = 0; i < numOobTypes; i++) {
|
||||||
|
oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the timeout to be used for transmitting the OOB type
|
||||||
|
* @return the timeout in milliseconds
|
||||||
|
*/
|
||||||
|
public long getOOBTimeout(Status status)
|
||||||
|
throws IOException {
|
||||||
|
if (status.getNumber() < Status.OOB_RESTART_VALUE ||
|
||||||
|
status.getNumber() > Status.OOB_RESERVED3_VALUE) {
|
||||||
|
// Not an OOB.
|
||||||
|
throw new IOException("Not an OOB status: " + status);
|
||||||
|
}
|
||||||
|
|
||||||
|
return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue