Merge remote-tracking branch 'apache-commit/trunk' into HDFS-6581

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
arp 2014-08-28 19:06:46 -07:00
commit 7e32be8768
20 changed files with 218 additions and 94 deletions

View File

@ -125,6 +125,8 @@ Trunk (Unreleased)
HADOOP-10485. Remove dead classes in hadoop-streaming. (wheat9)
HADOOP-11013. CLASSPATH handling should be consolidated, debuggable (aw)
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.
@ -468,6 +470,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10880. Move HTTP delegation tokens out of URL querystring to
a header. (tucu)
HADOOP-11005. Fix HTTP content type for ReconfigurationServlet.
(Lei Xu via wang)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)

View File

@ -114,6 +114,7 @@ case ${COMMAND} in
;;
archive)
CLASS=org.apache.hadoop.tools.HadoopArchives
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
;;
checknative)
@ -136,10 +137,12 @@ case ${COMMAND} in
;;
distch)
CLASS=org.apache.hadoop.tools.DistCh
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
;;
distcp)
CLASS=org.apache.hadoop.tools.DistCp
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
;;
fs)
@ -168,11 +171,11 @@ case ${COMMAND} in
esac
# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
hadoop_add_param HADOOP_OPTS Xmx "${JAVA_HEAP_MAX}"
hadoop_finalize
export CLASSPATH
hadoop_java_exec "${COMMAND}" "${CLASS}" "$@"

View File

@ -129,6 +129,11 @@ while [[ -z "${_hadoop_common_done}" ]]; do
hadoop_exit_with_usage 1
fi
;;
--debug)
shift
# shellcheck disable=SC2034
HADOOP_SHELL_SCRIPT_DEBUG=true
;;
--help|-help|-h|help|--h|--\?|-\?|\?)
hadoop_exit_with_usage 0
;;

View File

@ -21,6 +21,13 @@ function hadoop_error
echo "$*" 1>&2
}
function hadoop_debug
{
if [[ -n "${HADOOP_SHELL_SCRIPT_DEBUG}" ]]; then
echo "DEBUG: $*" 1>&2
fi
}
function hadoop_bootstrap_init
{
# NOTE: This function is not user replaceable.
@ -62,6 +69,7 @@ function hadoop_bootstrap_init
# defaults
export HADOOP_OPTS=${HADOOP_OPTS:-"-Djava.net.preferIPv4Stack=true"}
hadoop_debug "Initial HADOOP_OPTS=${HADOOP_OPTS}"
}
function hadoop_find_confdir
@ -80,6 +88,8 @@ function hadoop_find_confdir
conf_dir="etc/hadoop"
fi
export HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-${HADOOP_PREFIX}/${conf_dir}}"
hadoop_debug "HADOOP_CONF_DIR=${HADOOP_CONF_DIR}"
}
function hadoop_exec_hadoopenv
@ -105,6 +115,7 @@ function hadoop_basic_init
# CLASSPATH initially contains $HADOOP_CONF_DIR
CLASSPATH="${HADOOP_CONF_DIR}"
hadoop_debug "Initial CLASSPATH=${HADOOP_CONF_DIR}"
if [[ -z "${HADOOP_COMMON_HOME}" ]] &&
[[ -d "${HADOOP_PREFIX}/${HADOOP_COMMON_DIR}" ]]; then
@ -116,19 +127,19 @@ function hadoop_basic_init
# define HADOOP_HDFS_HOME
if [[ -z "${HADOOP_HDFS_HOME}" ]] &&
[[ -d "${HADOOP_PREFIX}/${HDFS_DIR}" ]]; then
[[ -d "${HADOOP_PREFIX}/${HDFS_DIR}" ]]; then
export HADOOP_HDFS_HOME="${HADOOP_PREFIX}"
fi
# define HADOOP_YARN_HOME
if [[ -z "${HADOOP_YARN_HOME}" ]] &&
[[ -d "${HADOOP_PREFIX}/${YARN_DIR}" ]]; then
[[ -d "${HADOOP_PREFIX}/${YARN_DIR}" ]]; then
export HADOOP_YARN_HOME="${HADOOP_PREFIX}"
fi
# define HADOOP_MAPRED_HOME
if [[ -z "${HADOOP_MAPRED_HOME}" ]] &&
[[ -d "${HADOOP_PREFIX}/${MAPRED_DIR}" ]]; then
[[ -d "${HADOOP_PREFIX}/${MAPRED_DIR}" ]]; then
export HADOOP_MAPRED_HOME="${HADOOP_PREFIX}"
fi
@ -274,6 +285,9 @@ function hadoop_add_param
if [[ ! ${!1} =~ $2 ]] ; then
# shellcheck disable=SC2086
eval $1="'${!1} $3'"
hadoop_debug "$1 accepted $3"
else
hadoop_debug "$1 declined $3"
fi
}
@ -283,8 +297,8 @@ function hadoop_add_classpath
# $1 = directory, file, wildcard, whatever to add
# $2 = before or after, which determines where in the
# classpath this object should go. default is after
# return 0 = success
# return 1 = failure (duplicate, doesn't exist, whatever)
# return 0 = success (added or duplicate)
# return 1 = failure (doesn't exist, whatever)
# However, with classpath (& JLP), we can do dedupe
# along with some sanity checking (e.g., missing directories)
@ -295,23 +309,29 @@ function hadoop_add_classpath
if [[ $1 =~ ^.*\*$ ]]; then
local mp=$(dirname "$1")
if [[ ! -d "${mp}" ]]; then
hadoop_debug "Rejected CLASSPATH: $1 (not a dir)"
return 1
fi
# no wildcard in the middle, so check existence
# (doesn't matter *what* it is)
elif [[ ! $1 =~ ^.*\*.*$ ]] && [[ ! -e "$1" ]]; then
hadoop_debug "Rejected CLASSPATH: $1 (does not exist)"
return 1
fi
if [[ -z "${CLASSPATH}" ]]; then
CLASSPATH=$1
hadoop_debug "Initial CLASSPATH=$1"
elif [[ ":${CLASSPATH}:" != *":$1:"* ]]; then
if [[ "$2" = "before" ]]; then
CLASSPATH="$1:${CLASSPATH}"
hadoop_debug "Prepend CLASSPATH: $1"
else
CLASSPATH+=:$1
hadoop_debug "Append CLASSPATH: $1"
fi
else
hadoop_debug "Dupe CLASSPATH: $1"
fi
return 0
}
@ -331,14 +351,20 @@ function hadoop_add_colonpath
if [[ -z "${!1}" ]]; then
# shellcheck disable=SC2086
eval $1="'$2'"
hadoop_debug "Initial colonpath($1): $2"
elif [[ "$3" = "before" ]]; then
# shellcheck disable=SC2086
eval $1="'$2:${!1}'"
hadoop_debug "Prepend colonpath($1): $2"
else
# shellcheck disable=SC2086
eval $1+="'$2'"
hadoop_debug "Append colonpath($1): $2"
fi
return 0
fi
hadoop_debug "Rejected colonpath($1): $2"
return 1
}
function hadoop_add_javalibpath
@ -397,6 +423,7 @@ function hadoop_add_to_classpath_hdfs
function hadoop_add_to_classpath_yarn
{
local i
#
# get all of the yarn jars+config in the path
#
@ -715,6 +742,11 @@ function hadoop_java_exec
local command=$1
local class=$2
shift 2
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
export CLASSPATH
#shellcheck disable=SC2086
exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
}
@ -727,6 +759,11 @@ function hadoop_start_daemon
local command=$1
local class=$2
shift 2
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
export CLASSPATH
#shellcheck disable=SC2086
exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
}
@ -808,6 +845,9 @@ function hadoop_start_secure_daemon
# bogus for-our-use-case 2086 here.
# it doesn't properly support multi-line situations
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
exec "${jsvc}" \
"-Dproc_${daemonname}" \
-outfile "${daemonoutfile}" \

View File

@ -23,6 +23,7 @@ this="$bin/$script"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
# shellcheck disable=SC2034
HADOOP_NEW_CONFIG=true
. "$HADOOP_LIBEXEC_DIR/hadoop-config.sh"
@ -33,10 +34,10 @@ fi
CLASS='org.apache.hadoop.record.compiler.generated.Rcc'
# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
hadoop_add_param HADOOP_OPTS Xmx "$JAVA_HEAP_MAX"
hadoop_add_param HADOOP_OPTS Xmx "${JAVA_HEAP_MAX}"
hadoop_finalize
export CLASSPATH
hadoop_java_exec rcc "${CLASS}" "$@"

View File

@ -200,6 +200,7 @@ public class ReconfigurationServlet extends HttpServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
LOG.info("GET");
resp.setContentType("text/html");
PrintWriter out = resp.getWriter();
Reconfigurable reconf = getReconfigurable(req);
@ -214,6 +215,7 @@ public class ReconfigurationServlet extends HttpServlet {
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
LOG.info("POST");
resp.setContentType("text/html");
PrintWriter out = resp.getWriter();
Reconfigurable reconf = getReconfigurable(req);

View File

@ -381,7 +381,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
long blockSize,
Progressable progress)
throws IOException {
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
int bytesPerSum = fs.getBytesPerSum();
this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
replication, blockSize, progress);
@ -405,10 +406,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum);
sums.write(checksum, ckoff, cklen);
}
@Override

View File

@ -337,7 +337,8 @@ public abstract class ChecksumFs extends FilterFs {
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final boolean createParent) throws IOException {
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
// checksumOpt is passed down to the raw fs. Unless it implements
// checksum impelemts internally, checksumOpt will be ignored.
@ -370,10 +371,11 @@ public abstract class ChecksumFs extends FilterFs {
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum);
sums.write(checksum, ckoff, cklen);
}
@Override

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.DataChecksum;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is a generic output stream for generating checksums for
* data before it is written to the underlying stream
@ -33,7 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
abstract public class FSOutputSummer extends OutputStream {
// data checksum
private Checksum sum;
private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
private byte buf[];
// internal buffer for storing checksum
@ -41,18 +42,24 @@ abstract public class FSOutputSummer extends OutputStream {
// The number of valid bytes in the buffer.
private int count;
protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
// We want this value to be a multiple of 3 because the native code checksums
// 3 chunks simultaneously. The chosen value of 9 strikes a balance between
// limiting the number of JNI calls and flushing to the underlying stream
// relatively frequently.
private static final int BUFFER_NUM_CHUNKS = 9;
protected FSOutputSummer(DataChecksum sum) {
this.sum = sum;
this.buf = new byte[maxChunkSize];
this.checksum = new byte[checksumSize];
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0;
}
/* write the data chunk in <code>b</code> staring at <code>offset</code> with
* a length of <code>len</code>, and its checksum
* a length of <code>len > 0</code>, and its checksum
*/
protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException;
protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
byte[] checksum, int checksumOffset, int checksumLen) throws IOException;
/**
* Check if the implementing OutputStream is closed and should no longer
@ -66,7 +73,6 @@ abstract public class FSOutputSummer extends OutputStream {
/** Write one byte */
@Override
public synchronized void write(int b) throws IOException {
sum.update(b);
buf[count++] = (byte)b;
if(count == buf.length) {
flushBuffer();
@ -111,18 +117,17 @@ abstract public class FSOutputSummer extends OutputStream {
*/
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user data has one chunk
// checksum and output data
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;
sum.update(b, off, length);
writeChecksumChunk(b, off, length, false);
writeChecksumChunks(b, off, length);
return length;
}
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
@ -136,22 +141,45 @@ abstract public class FSOutputSummer extends OutputStream {
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false);
flushBuffer(false, true);
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream. If keep is true, then the state of
* this object remains intact.
/* Forces buffered output bytes to be checksummed and written out to
* the underlying output stream. If there is a trailing partial chunk in the
* buffer,
* 1) flushPartial tells us whether to flush that chunk
* 2) if flushPartial is true, keep tells us whether to keep that chunk in the
* buffer (if flushPartial is false, it is always kept in the buffer)
*
* Returns the number of bytes that were flushed but are still left in the
* buffer (can only be non-zero if keep is true).
*/
protected synchronized void flushBuffer(boolean keep) throws IOException {
if (count != 0) {
int chunkLen = count;
protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
writeChecksumChunk(buf, 0, chunkLen, keep);
if (keep) {
count = chunkLen;
}
}
// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}
/**
* Checksums all complete data chunks and flushes them to the underlying
* stream. If there is a trailing partial chunk, it is not flushed and is
* maintained in the buffer.
*/
public void flush() throws IOException {
flushBuffer(false, false);
}
/**
@ -161,18 +189,18 @@ abstract public class FSOutputSummer extends OutputStream {
return count;
}
/** Generate checksum for the data chunk and output data chunk & checksum
* to the underlying output stream. If keep is true then keep the
* current checksum intact, do not reset it.
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
int tempChecksum = (int)sum.getValue();
if (!keep) {
sum.reset();
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
sum.getChecksumSize());
}
int2byte(tempChecksum, checksum);
writeChunk(b, off, len, checksum);
}
/**
@ -196,9 +224,14 @@ abstract public class FSOutputSummer extends OutputStream {
/**
* Resets existing buffer with a new one of the specified size.
*/
protected synchronized void resetChecksumChunk(int size) {
sum.reset();
protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size];
this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
sum.getChecksumSize()];
this.count = 0;
}
protected synchronized void resetChecksumBufSize() {
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
}
}

View File

@ -339,6 +339,7 @@ public class DataChecksum implements Checksum {
byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
@ -421,6 +422,7 @@ public class DataChecksum implements Checksum {
public void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id,

View File

@ -42,7 +42,7 @@ class NativeCrc32 {
* modified.
*
* @param bytesPerSum the chunk size (eg 512 bytes)
* @param checksumType the DataChecksum type constant
* @param checksumType the DataChecksum type constant (NULL is not supported)
* @param sums the DirectByteBuffer pointing at the beginning of the
* stored checksums
* @param data the DirectByteBuffer pointing at the beginning of the

View File

@ -434,6 +434,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6773. MiniDFSCluster should skip edit log fsync by default (Stephen
Chu via Colin Patrick McCabe)
HDFS-6865. Byte array native checksumming on client side
(James Thomas via todd)
BUG FIXES
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for

View File

@ -80,6 +80,7 @@ shift
case ${COMMAND} in
balancer)
CLASS=org.apache.hadoop.hdfs.server.balancer.Balancer
hadoop_debug "Appending HADOOP_BALANCER_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_BALANCER_OPTS}"
;;
cacheadmin)
@ -105,19 +106,24 @@ case ${COMMAND} in
HADOOP_SECURE_PID_DIR="${HADOOP_SECURE_PID_DIR:-$HADOOP_SECURE_DN_PID_DIR}"
HADOOP_SECURE_LOG_DIR="${HADOOP_SECURE_LOG_DIR:-$HADOOP_SECURE_DN_LOG_DIR}"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_DN_SECURE_EXTRA_OPTS} ${HADOOP_DATANODE_OPTS}"
hadoop_debug "Appending HADOOP_DATANODE_OPTS onto HADOOP_OPTS"
hadoop_debug "Appending HADOOP_DN_SECURE_EXTRA_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_DATANODE_OPTS} ${HADOOP_DN_SECURE_EXTRA_OPTS}"
CLASS="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter"
else
hadoop_debug "Appending HADOOP_DATANODE_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_DATANODE_OPTS}"
CLASS='org.apache.hadoop.hdfs.server.datanode.DataNode'
fi
;;
dfs)
CLASS=org.apache.hadoop.fs.FsShell
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
dfsadmin)
CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
fetchdt)
@ -125,6 +131,7 @@ case ${COMMAND} in
;;
fsck)
CLASS=org.apache.hadoop.hdfs.tools.DFSck
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
getconf)
@ -135,12 +142,15 @@ case ${COMMAND} in
;;
haadmin)
CLASS=org.apache.hadoop.hdfs.tools.DFSHAAdmin
CLASSPATH="${CLASSPATH}:${TOOL_PATH}"
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
journalnode)
daemon="true"
CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
hadoop_debug "Appending HADOOP_JOURNALNODE_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_JOURNALNODE_OPTS}"
;;
jmxget)
@ -152,6 +162,7 @@ case ${COMMAND} in
namenode)
daemon="true"
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
hadoop_debug "Appending HADOOP_NAMENODE_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_NAMENODE_OPTS}"
;;
nfs3)
@ -164,9 +175,12 @@ case ${COMMAND} in
HADOOP_SECURE_PID_DIR="${HADOOP_SECURE_PID_DIR:-$HADOOP_SECURE_NFS3_PID_DIR}"
HADOOP_SECURE_LOG_DIR="${HADOOP_SECURE_LOG_DIR:-$HADOOP_SECURE_NFS3_LOG_DIR}"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_NFS3_SECURE_EXTRA_OPTS} ${HADOOP_NFS3_OPTS}"
hadoop_debug "Appending HADOOP_NFS3_OPTS onto HADOOP_OPTS"
hadoop_debug "Appending HADOOP_NFS3_SECURE_EXTRA_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_NFS3_OPTS} ${HADOOP_NFS3_SECURE_EXTRA_OPTS}"
CLASS=org.apache.hadoop.hdfs.nfs.nfs3.PrivilegedNfsGatewayStarter
else
hadoop_debug "Appending HADOOP_NFS3_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_NFS3_OPTS}"
CLASS=org.apache.hadoop.hdfs.nfs.nfs3.Nfs3
fi
@ -183,11 +197,13 @@ case ${COMMAND} in
portmap)
daemon="true"
CLASS=org.apache.hadoop.portmap.Portmap
hadoop_debug "Appending HADOOP_PORTMAP_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_PORTMAP_OPTS}"
;;
secondarynamenode)
daemon="true"
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
hadoop_debug "Appending HADOOP_SECONDARYNAMENODE_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_SECONDARYNAMENODE_OPTS}"
;;
snapshotDiff)
@ -196,6 +212,7 @@ case ${COMMAND} in
zkfc)
daemon="true"
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
hadoop_debug "Appending HADOOP_ZKFC_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_ZKFC_OPTS}"
;;
-*)
@ -236,8 +253,6 @@ fi
hadoop_add_param HADOOP_OPTS Xmx "${JAVA_HEAP_MAX}"
hadoop_finalize
export CLASSPATH
if [[ -n "${daemon}" ]]; then
if [[ -n "${secure_service}" ]]; then
hadoop_secure_daemon_handler \

View File

@ -401,7 +401,7 @@ public class DFSOutputStream extends FSOutputSummer
// one chunk that fills up the partial chunk.
//
computePacketChunkSize(0, freeInCksum);
resetChecksumChunk(freeInCksum);
setChecksumBufSize(freeInCksum);
appendChunk = true;
} else {
// if the remaining space in the block is smaller than
@ -1566,7 +1566,7 @@ public class DFSOutputStream extends FSOutputSummer
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
super(checksum);
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
@ -1720,22 +1720,21 @@ public class DFSOutputStream extends FSOutputSummer
// @see FSOutputSummer#writeChunk()
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException {
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (checksum.length != this.checksum.getChecksumSize()) {
if (cklen != this.checksum.getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() +
" but found to be " + checksum.length);
" but found to be " + cklen);
}
if (currentPacket == null) {
@ -1751,7 +1750,7 @@ public class DFSOutputStream extends FSOutputSummer
}
}
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
@ -1775,7 +1774,7 @@ public class DFSOutputStream extends FSOutputSummer
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumChunk(bytesPerChecksum);
resetChecksumBufSize();
}
if (!appendChunk) {
@ -1856,20 +1855,13 @@ public class DFSOutputStream extends FSOutputSummer
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
synchronized (this) {
/* Record current blockOffset. This might be changed inside
* flushBuffer() where a partial checksum chunk might be flushed.
* After the flush, reset the bytesCurBlock back to its previous value,
* any partial checksum chunk will be sent now and in next packet.
*/
long saveOffset = bytesCurBlock;
Packet oldCurrentPacket = currentPacket;
// flush checksum buffer, but keep checksum buffer intact
flushBuffer(true);
int numKept = flushBuffer(true, true);
// bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"DFSClient flush() : saveOffset " + saveOffset +
"DFSClient flush() :" +
" bytesCurBlock " + bytesCurBlock +
" lastFlushOffset " + lastFlushOffset);
}
@ -1886,14 +1878,6 @@ public class DFSOutputStream extends FSOutputSummer
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
}
} else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
// (or the beginning of the file). Hence, we should not have any
// packet queued prior to this call, since the last flush set
// currentPacket = null.
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
if (isSync && bytesCurBlock > 0) {
// Nothing to send right now,
// and the block was partially written,
@ -1913,7 +1897,7 @@ public class DFSOutputStream extends FSOutputSummer
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
bytesCurBlock = saveOffset;
bytesCurBlock -= numKept;
toWaitFor = lastQueuedSeqno;
} // end synchronized

View File

@ -261,7 +261,9 @@ public class TestFileAppend{
start += 29;
}
stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);
// need to make sure we completely write out all full blocks before
// the checkFile() call (see FSOutputSummer#flush)
stm.flush();
// verify that full blocks are sane
checkFile(fs, file1, 1);
stm.close();

View File

@ -394,6 +394,8 @@ public class TestBlockToken {
Path filePath = new Path(fileName);
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(new byte[1000]);
// ensure that the first block is written out (see FSOutputSummer#flush)
out.flush();
LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
fileName, 0, 1000);
while (locatedBlocks.getLastLocatedBlock() == null) {

View File

@ -70,6 +70,9 @@ public class TestBlockUnderConstruction {
long blocksBefore = stm.getPos() / BLOCK_SIZE;
TestFileCreation.writeFile(stm, BLOCK_SIZE);
// need to make sure the full block is completely flushed to the DataNodes
// (see FSOutputSummer#flush)
stm.flush();
int blocksAfter = 0;
// wait until the block is allocated by DataStreamer
BlockLocation[] locatedBlocks;

View File

@ -141,6 +141,9 @@ public class TestDecommissioningStatus {
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
// need to make sure that we actually write out both file blocks
// (see FSOutputSummer#flush)
stm.flush();
// Do not close stream, return it
// so that it is not garbage collected
return stm;

View File

@ -64,13 +64,15 @@ shift
case ${COMMAND} in
mradmin|jobtracker|tasktracker|groups)
echo "Sorry, the ${COMMAND} command is no longer supported."
echo "You may find similar functionality with the \"yarn\" shell command."
hadoop_error "Sorry, the ${COMMAND} command is no longer supported."
hadoop_error "You may find similar functionality with the \"yarn\" shell command."
hadoop_exit_with_usage 1
;;
archive)
CLASS=org.apache.hadoop.tools.HadoopArchives
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
classpath)
@ -80,12 +82,15 @@ case ${COMMAND} in
;;
distcp)
CLASS=org.apache.hadoop.tools.DistCp
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
historyserver)
daemon="true"
CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
hadoop_debug "Appending HADOOP_JOB_HISTORYSERVER_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_JOB_HISTORYSERVER_OPTS}"
if [ -n "${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}" ]; then
JAVA_HEAP_MAX="-Xmx${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}m"
@ -97,6 +102,7 @@ case ${COMMAND} in
;;
pipes)
CLASS=org.apache.hadoop.mapred.pipes.Submitter
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
queue)
@ -104,10 +110,12 @@ case ${COMMAND} in
;;
sampler)
CLASS=org.apache.hadoop.mapred.lib.InputSampler
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
-*|*)
@ -130,8 +138,6 @@ fi
hadoop_add_param HADOOP_OPTS Xmx "${JAVA_HEAP_MAX}"
hadoop_finalize
export CLASSPATH
if [[ -n "${daemon}" ]]; then
if [[ -n "${secure_service}" ]]; then
hadoop_secure_daemon_handler "${HADOOP_DAEMON_MODE}" "${COMMAND}"\

View File

@ -72,6 +72,7 @@ shift
case "${COMMAND}" in
application|applicationattempt|container)
CLASS=org.apache.hadoop.yarn.client.cli.ApplicationCLI
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
set -- "${COMMAND}" "$@"
;;
@ -82,10 +83,12 @@ case "${COMMAND}" in
;;
daemonlog)
CLASS=org.apache.hadoop.log.LogLevel
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
jar)
CLASS=org.apache.hadoop.util.RunJar
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
historyserver)
@ -97,15 +100,18 @@ case "${COMMAND}" in
;;
logs)
CLASS=org.apache.hadoop.yarn.logaggregation.LogDumper
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
node)
CLASS=org.apache.hadoop.yarn.client.cli.NodeCLI
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
nodemanager)
daemon="true"
CLASS='org.apache.hadoop.yarn.server.nodemanager.NodeManager'
hadoop_debug "Append YARN_NODEMANAGER_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_NODEMANAGER_OPTS}"
if [[ -n "${YARN_NODEMANAGER_HEAPSIZE}" ]]; then
JAVA_HEAP_MAX="-Xmx${YARN_NODEMANAGER_HEAPSIZE}m"
@ -114,6 +120,7 @@ case "${COMMAND}" in
proxyserver)
daemon="true"
CLASS='org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer'
hadoop_debug "Append YARN_PROXYSERVER_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_PROXYSERVER_OPTS}"
if [[ -n "${YARN_PROXYSERVER_HEAPSIZE}" ]]; then
JAVA_HEAP_MAX="-Xmx${YARN_PROXYSERVER_HEAPSIZE}m"
@ -123,17 +130,20 @@ case "${COMMAND}" in
daemon="true"
CLASS='org.apache.hadoop.yarn.server.resourcemanager.ResourceManager'
YARN_OPTS="${YARN_OPTS} ${YARN_RESOURCEMANAGER_OPTS}"
hadoop_debug "Append YARN_RESOURCEMANAGER_OPTS onto YARN_OPTS"
if [[ -n "${YARN_RESOURCEMANAGER_HEAPSIZE}" ]]; then
JAVA_HEAP_MAX="-Xmx${YARN_RESOURCEMANAGER_HEAPSIZE}m"
fi
;;
rmadmin)
CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
timelineserver)
daemon="true"
CLASS='org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer'
hadoop_debug "Append YARN_TIMELINESERVER_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_TIMELINESERVER_OPTS}"
if [[ -n "${YARN_TIMELINESERVER_HEAPSIZE}" ]]; then
JAVA_HEAP_MAX="-Xmx${YARN_TIMELINESERVER_HEAPSIZE}m"
@ -141,6 +151,7 @@ case "${COMMAND}" in
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
-*)
@ -153,6 +164,8 @@ esac
# set HADOOP_OPTS to YARN_OPTS so that we can use
# finalize, etc, without doing anything funky
hadoop_debug "Resetting HADOOP_OPTS=YARN_OPTS"
# shellcheck disable=SC2034
HADOOP_OPTS="${YARN_OPTS}"
daemon_outfile="${HADOOP_LOG_DIR}/hadoop-${HADOOP_IDENT_STRING}-${COMMAND}-${HOSTNAME}.out"
@ -180,8 +193,6 @@ hadoop_add_param HADOOP_OPTS yarn.root.logger "-Dyarn.root.logger=${YARN_ROOT_LO
hadoop_finalize
export CLASSPATH
if [[ -n "${daemon}" ]]; then
if [[ -n "${secure_service}" ]]; then
hadoop_secure_daemon_handler "${HADOOP_DAEMON_MODE}" "${COMMAND}" \