Merge trunk into branch.

Fixed a couple trivial conflicts due to HDFS-4363, which moved some methods
from HdfsProtoUtil to PBHelper


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1433133 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-14 21:12:16 +00:00
commit 12bf674e8e
168 changed files with 2469 additions and 669 deletions

View File

@ -980,12 +980,12 @@ fi
(( RESULT = RESULT + $JAVAC_RET ))
checkJavadocWarnings
(( RESULT = RESULT + $? ))
checkEclipseGeneration
(( RESULT = RESULT + $? ))
### Checkstyle not implemented yet
#checkStyle
#(( RESULT = RESULT + $? ))
buildAndInstall
checkEclipseGeneration
(( RESULT = RESULT + $? ))
checkFindbugsWarnings
(( RESULT = RESULT + $? ))
checkReleaseAuditWarnings

View File

@ -148,6 +148,8 @@ Trunk (Unreleased)
BUG FIXES
HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang)
HADOOP-9041. FsUrlStreamHandlerFactory could cause an infinite loop in
FileSystem initialization. (Yanbo Liang and Radim Kolar via llu)
@ -306,9 +308,14 @@ Trunk (Unreleased)
HADOOP-9131. Turn off TestLocalFileSystem#testListStatusWithColons on
Windows. (Chris Nauroth via suresh)
HADOOP-8957 AbstractFileSystem#IsValidName should be overridden for
HADOOP-8957. AbstractFileSystem#IsValidName should be overridden for
embedded file systems like ViewFs (Chris Nauroth via Sanjay Radia)
HADOOP-9139. improve killKdc.sh (Ivan A. Veselovsky via bobby)
HADOOP-9202. test-patch.sh fails during mvn eclipse:eclipse if patch adds
a new module to the build (Chris Nauroth via bobby)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -426,6 +433,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9119. Add test to FileSystemContractBaseTest to verify integrity
of overwritten files. (Steve Loughran via suresh)
HADOOP-9192. Move token related request/response messages to common.
(suresh)
OPTIMIZATIONS
HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang
@ -525,6 +535,11 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9181. Set daemon flag for HttpServer's QueuedThreadPool.
(Liang Xie via suresh)
HADOOP-9155. FsPermission should have different default value, 777 for
directory and 666 for file. (Binglin Chang via atm)
HADOOP-9183. Potential deadlock in ActiveStandbyElector. (tomwhite)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1238,6 +1253,8 @@ Release 0.23.6 - UNRELEASED
HADOOP-9105. FsShell -moveFromLocal erroneously fails (daryn via bobby)
HADOOP-9097. Maven RAT plugin is not checking all source files (tgraves)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -445,10 +445,11 @@
<exclude>dev-support/jdiff/**</exclude>
<exclude>src/main/native/*</exclude>
<exclude>src/main/native/config/*</exclude>
<exclude>src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo</exclude>
<exclude>src/main/native/m4/*</exclude>
<exclude>src/test/empty-file</exclude>
<exclude>src/test/all-tests</exclude>
<exclude>src/test/resources/kdc/ldif/users.ldif</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -1,3 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CONFIG_H
#define CONFIG_H

View File

@ -172,7 +172,25 @@ import org.apache.hadoop.util.ShutdownHookManager;
public final class FileContext {
public static final Log LOG = LogFactory.getLog(FileContext.class);
/**
* Default permission for directory and symlink
* In previous versions, this default permission was also used to
* create files, so files created end up with ugo+x permission.
* See HADOOP-9155 for detail.
* Two new constants are added to solve this, please use
* {@link FileContext#DIR_DEFAULT_PERM} for directory, and use
* {@link FileContext#FILE_DEFAULT_PERM} for file.
* This constant is kept for compatibility.
*/
public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
/**
* Default permission for directory
*/
public static final FsPermission DIR_DEFAULT_PERM = FsPermission.getDirDefault();
/**
* Default permission for file
*/
public static final FsPermission FILE_DEFAULT_PERM = FsPermission.getFileDefault();
/**
* Priority of the FileContext shutdown hook.
@ -656,7 +674,7 @@ public final class FileContext {
CreateOpts.Perms permOpt =
(CreateOpts.Perms) CreateOpts.getOpt(CreateOpts.Perms.class, opts);
FsPermission permission = (permOpt != null) ? permOpt.getValue() :
FsPermission.getDefault();
FILE_DEFAULT_PERM;
permission = permission.applyUMask(umask);
final CreateOpts[] updatedOpts =
@ -704,7 +722,7 @@ public final class FileContext {
IOException {
final Path absDir = fixRelativePart(dir);
final FsPermission absFerms = (permission == null ?
FsPermission.getDefault() : permission).applyUMask(umask);
FsPermission.getDirDefault() : permission).applyUMask(umask);
new FSLinkResolver<Void>() {
@Override
public Void next(final AbstractFileSystem fs, final Path p)
@ -2157,7 +2175,7 @@ public final class FileContext {
FileStatus fs = FileContext.this.getFileStatus(qSrc);
if (fs.isDirectory()) {
checkDependencies(qSrc, qDst);
mkdir(qDst, FsPermission.getDefault(), true);
mkdir(qDst, FsPermission.getDirDefault(), true);
FileStatus[] contents = listStatus(qSrc);
for (FileStatus content : contents) {
copy(makeQualified(content.getPath()), makeQualified(new Path(qDst,

View File

@ -79,8 +79,15 @@ public class FileStatus implements Writable, Comparable {
this.blocksize = blocksize;
this.modification_time = modification_time;
this.access_time = access_time;
this.permission = (permission == null) ?
FsPermission.getDefault() : permission;
if (permission != null) {
this.permission = permission;
} else if (isdir) {
this.permission = FsPermission.getDirDefault();
} else if (symlink!=null) {
this.permission = FsPermission.getDefault();
} else {
this.permission = FsPermission.getFileDefault();
}
this.owner = (owner == null) ? "" : owner;
this.group = (group == null) ? "" : group;
this.symlink = symlink;
@ -217,7 +224,7 @@ public class FileStatus implements Writable, Comparable {
*/
protected void setPermission(FsPermission permission) {
this.permission = (permission == null) ?
FsPermission.getDefault() : permission;
FsPermission.getFileDefault() : permission;
}
/**

View File

@ -850,7 +850,7 @@ public abstract class FileSystem extends Configured implements Closeable {
long blockSize,
Progressable progress
) throws IOException {
return this.create(f, FsPermission.getDefault().applyUMask(
return this.create(f, FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(getConf())), overwrite, bufferSize,
replication, blockSize, progress);
}
@ -1030,7 +1030,7 @@ public abstract class FileSystem extends Configured implements Closeable {
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return this.createNonRecursive(f, FsPermission.getDefault(),
return this.createNonRecursive(f, FsPermission.getFileDefault(),
overwrite, bufferSize, replication, blockSize, progress);
}
@ -1866,7 +1866,7 @@ public abstract class FileSystem extends Configured implements Closeable {
* Call {@link #mkdirs(Path, FsPermission)} with default permission.
*/
public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, FsPermission.getDefault());
return mkdirs(f, FsPermission.getDirDefault());
}
/**

View File

@ -224,7 +224,7 @@ public class FTPFileSystem extends FileSystem {
}
Path parent = absolute.getParent();
if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
parent = (parent == null) ? new Path("/") : parent;
disconnect(client);
throw new IOException("create(): Mkdirs failed to create: " + parent);
@ -484,7 +484,7 @@ public class FTPFileSystem extends FileSystem {
if (!exists(client, absolute)) {
Path parent = absolute.getParent();
created = (parent == null || mkdirs(client, parent, FsPermission
.getDefault()));
.getDirDefault()));
if (created) {
String parentDir = parent.toUri().getPath();
client.changeWorkingDirectory(parentDir);

View File

@ -85,7 +85,7 @@ public class RawLocalFs extends DelegateToFileSystem {
"system: "+target.toString());
}
if (createParent) {
mkdir(link.getParent(), FsPermission.getDefault(), true);
mkdir(link.getParent(), FsPermission.getDirDefault(), true);
}
// NB: Use createSymbolicLink in java.nio.file.Path once available
try {

View File

@ -275,11 +275,34 @@ public class FsPermission implements Writable {
conf.setInt(DEPRECATED_UMASK_LABEL, umask.toShort());
}
/** Get the default permission. */
/**
* Get the default permission for directory and symlink.
* In previous versions, this default permission was also used to
* create files, so files created end up with ugo+x permission.
* See HADOOP-9155 for detail.
* Two new methods are added to solve this, please use
* {@link FsPermission#getDirDefault()} for directory, and use
* {@link FsPermission#getFileDefault()} for file.
* This method is kept for compatibility.
*/
public static FsPermission getDefault() {
return new FsPermission((short)00777);
}
/**
* Get the default permission for directory.
*/
public static FsPermission getDirDefault() {
return new FsPermission((short)00777);
}
/**
* Get the default permission for file.
*/
public static FsPermission getFileDefault() {
return new FsPermission((short)00666);
}
/**
* Create a FsPermission from a Unix symbolic permission string
* @param unixSymbolicPermission e.g. "-rw-rw-rw-"

View File

@ -613,7 +613,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
// Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
// may trigger the Connected event immediately. So, if we register the
// watcher after constructing ZooKeeper, we may miss that event. Instead,
// we construct the watcher first, and have it queue any events it receives
// we construct the watcher first, and have it block any events it receives
// before we can set its ZooKeeper reference.
WatcherWithClientRef watcher = new WatcherWithClientRef();
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
@ -1002,17 +1002,15 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
/**
* If any events arrive before the reference to ZooKeeper is set,
* they get queued up and later forwarded when the reference is
* available.
* Latch used to wait until the reference to ZooKeeper is set.
*/
private final List<WatchedEvent> queuedEvents = Lists.newLinkedList();
private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
private WatcherWithClientRef() {
}
private WatcherWithClientRef(ZooKeeper zk) {
this.zk = zk;
setZooKeeperRef(zk);
}
/**
@ -1029,9 +1027,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
+ connectionTimeoutMs + " milliseconds");
synchronized (this) {
zk.close();
}
throw KeeperException.create(Code.CONNECTIONLOSS);
}
} catch (InterruptedException e) {
@ -1041,29 +1037,18 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
}
}
private synchronized void setZooKeeperRef(ZooKeeper zk) {
private void setZooKeeperRef(ZooKeeper zk) {
Preconditions.checkState(this.zk == null,
"zk already set -- must be set exactly once");
this.zk = zk;
for (WatchedEvent e : queuedEvents) {
forwardEvent(e);
}
queuedEvents.clear();
hasSetZooKeeper.countDown();
}
@Override
public synchronized void process(WatchedEvent event) {
if (zk != null) {
forwardEvent(event);
} else {
queuedEvents.add(event);
}
}
private void forwardEvent(WatchedEvent event) {
public void process(WatchedEvent event) {
hasReceivedEvent.countDown();
try {
hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS);
ActiveStandbyElector.this.processWatchEvent(
zk, event);
} catch (Throwable t) {

View File

@ -40,6 +40,11 @@ public class GzipCodec extends DefaultCodec {
protected static class GzipOutputStream extends CompressorStream {
private static class ResetableGZIPOutputStream extends GZIPOutputStream {
private static final int TRAILER_SIZE = 8;
public static final String JVMVendor= System.getProperty("java.vendor");
public static final String JVMVersion= System.getProperty("java.version");
private static final boolean HAS_BROKEN_FINISH =
(JVMVendor.contains("IBM") && JVMVersion.contains("1.6.0"));
public ResetableGZIPOutputStream(OutputStream out) throws IOException {
super(out);
@ -48,6 +53,61 @@ public class GzipCodec extends DefaultCodec {
public void resetState() throws IOException {
def.reset();
}
/**
* Override this method for HADOOP-8419.
* Override because IBM implementation calls def.end() which
* causes problem when reseting the stream for reuse.
*
*/
@Override
public void finish() throws IOException {
if (HAS_BROKEN_FINISH) {
if (!def.finished()) {
def.finish();
while (!def.finished()) {
int i = def.deflate(this.buf, 0, this.buf.length);
if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
writeTrailer(this.buf, i);
i += TRAILER_SIZE;
out.write(this.buf, 0, i);
return;
}
if (i > 0) {
out.write(this.buf, 0, i);
}
}
byte[] arrayOfByte = new byte[TRAILER_SIZE];
writeTrailer(arrayOfByte, 0);
out.write(arrayOfByte);
}
} else {
super.finish();
}
}
/** re-implement for HADOOP-8419 because the relative method in jdk is invisible */
private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
throws IOException {
writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
}
/** re-implement for HADOOP-8419 because the relative method in jdk is invisible */
private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
throws IOException {
writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
}
/** re-implement for HADOOP-8419 because the relative method in jdk is invisible */
private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
throws IOException {
paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
}
}
public GzipOutputStream(OutputStream out) throws IOException {

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Generic ID generator
* used for generating various types of number sequences.
*/
@InterfaceAudience.Private
public interface IdGenerator {
/** Increment and then return the next value. */
public long nextValue();
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
* This class is thread safe.
*/
@InterfaceAudience.Private
public abstract class SequentialNumber {
public abstract class SequentialNumber implements IdGenerator {
private final AtomicLong currentValue;
/** Create a new instance with the given initial value. */

View File

@ -32,3 +32,26 @@ message TokenProto {
required string service = 4;
}
message GetDelegationTokenRequestProto {
required string renewer = 1;
}
message GetDelegationTokenResponseProto {
optional hadoop.common.TokenProto token = 1;
}
message RenewDelegationTokenRequestProto {
required hadoop.common.TokenProto token = 1;
}
message RenewDelegationTokenResponseProto {
required uint64 newExpiryTime = 1;
}
message CancelDelegationTokenRequestProto {
required hadoop.common.TokenProto token = 1;
}
message CancelDelegationTokenResponseProto { // void response
}

View File

@ -1 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.security.AnnotatedSecurityInfo

View File

@ -95,7 +95,7 @@ public abstract class FileContextPermissionBase {
String filename = "foo";
Path f = getTestRootPath(fc, filename);
createFile(fc, filename);
doFilePermissionCheck(FileContext.DEFAULT_PERM.applyUMask(fc.getUMask()),
doFilePermissionCheck(FileContext.FILE_DEFAULT_PERM.applyUMask(fc.getUMask()),
fc.getFileStatus(f).getPermission());
}

View File

@ -1,3 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;

View File

@ -121,7 +121,7 @@ public class TestFileStatus {
FileStatus fileStatus = new FileStatus(LENGTH, isdir,
REPLICATION, BLKSIZE, MTIME, PATH);
validateAccessors(fileStatus, LENGTH, isdir, REPLICATION, BLKSIZE, MTIME,
0, FsPermission.getDefault(), "", "", null, PATH);
0, FsPermission.getDirDefault(), "", "", null, PATH);
}
/**
@ -131,7 +131,7 @@ public class TestFileStatus {
public void constructorBlank() throws IOException {
FileStatus fileStatus = new FileStatus();
validateAccessors(fileStatus, 0, false, 0, 0, 0,
0, FsPermission.getDefault(), "", "", null, null);
0, FsPermission.getFileDefault(), "", "", null, null);
}
/**

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.permission.FsPermission;
public class TestLocalFSFileContextMainOperations extends FileContextMainOperationsBaseTest {
@ -47,4 +49,14 @@ public class TestLocalFSFileContextMainOperations extends FileContextMainOperati
FileContext fc1 = FileContext.getLocalFSFileContext();
Assert.assertTrue(fc1 != fc);
}
@Test
public void testDefaultFilePermission() throws IOException {
Path file = FileContextTestHelper.getTestRootPath(fc,
"testDefaultFilePermission");
FileContextTestHelper.createFile(fc, file);
FsPermission expect = FileContext.FILE_DEFAULT_PERM.applyUMask(fc.getUMask());
Assert.assertEquals(expect, fc.getFileStatus(file)
.getPermission());
}
}

View File

@ -73,7 +73,7 @@ public class TestLocalFileSystemPermission extends TestCase {
try {
FsPermission initialPermission = getPermission(localfs, f);
System.out.println(filename + ": " + initialPermission);
assertEquals(FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf)), initialPermission);
assertEquals(FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), initialPermission);
}
catch(Exception e) {
System.out.println(StringUtils.stringifyException(e));

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.util.ReflectionUtils;
import junit.framework.TestCase;
public class TestCompressionStreamReuse extends TestCase {
private static final Log LOG = LogFactory
.getLog(TestCompressionStreamReuse.class);
private Configuration conf = new Configuration();
private int count = 10000;
private int seed = new Random().nextInt();
public void testBZip2Codec() throws IOException {
resetStateTest(conf, seed, count,
"org.apache.hadoop.io.compress.BZip2Codec");
}
public void testGzipCompressStreamReuse() throws IOException {
resetStateTest(conf, seed, count,
"org.apache.hadoop.io.compress.GzipCodec");
}
public void testGzipCompressStreamReuseWithParam() throws IOException {
Configuration conf = new Configuration(this.conf);
ZlibFactory
.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
ZlibFactory.setCompressionStrategy(conf,
CompressionStrategy.HUFFMAN_ONLY);
resetStateTest(conf, seed, count,
"org.apache.hadoop.io.compress.GzipCodec");
}
private static void resetStateTest(Configuration conf, int seed, int count,
String codecClass) throws IOException {
// Create the codec
CompressionCodec codec = null;
try {
codec = (CompressionCodec) ReflectionUtils.newInstance(conf
.getClassByName(codecClass), conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Illegal codec!");
}
LOG.info("Created a Codec object of type: " + codecClass);
// Generate data
DataOutputBuffer data = new DataOutputBuffer();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for (int i = 0; i < count; ++i) {
generator.next();
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
key.write(data);
value.write(data);
}
LOG.info("Generated " + count + " records");
// Compress data
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
DataOutputStream deflateOut = new DataOutputStream(
new BufferedOutputStream(compressedDataBuffer));
CompressionOutputStream deflateFilter = codec
.createOutputStream(deflateOut);
deflateFilter.write(data.getData(), 0, data.getLength());
deflateFilter.finish();
deflateFilter.flush();
LOG.info("Finished compressing data");
// reset deflator
deflateFilter.resetState();
LOG.info("Finished reseting deflator");
// re-generate data
data.reset();
generator = new RandomDatum.Generator(seed);
for (int i = 0; i < count; ++i) {
generator.next();
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
key.write(data);
value.write(data);
}
DataInputBuffer originalData = new DataInputBuffer();
DataInputStream originalIn = new DataInputStream(
new BufferedInputStream(originalData));
originalData.reset(data.getData(), 0, data.getLength());
// re-compress data
compressedDataBuffer.reset();
deflateOut = new DataOutputStream(new BufferedOutputStream(
compressedDataBuffer));
deflateFilter = codec.createOutputStream(deflateOut);
deflateFilter.write(data.getData(), 0, data.getLength());
deflateFilter.finish();
deflateFilter.flush();
LOG.info("Finished re-compressing data");
// De-compress data
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter = codec
.createInputStream(deCompressedDataBuffer);
DataInputStream inflateIn = new DataInputStream(
new BufferedInputStream(inflateFilter));
// Check
for (int i = 0; i < count; ++i) {
RandomDatum k1 = new RandomDatum();
RandomDatum v1 = new RandomDatum();
k1.readFields(originalIn);
v1.readFields(originalIn);
RandomDatum k2 = new RandomDatum();
RandomDatum v2 = new RandomDatum();
k2.readFields(inflateIn);
v2.readFields(inflateIn);
assertTrue(
"original and compressed-then-decompressed-output not equal",
k1.equals(k2) && v1.equals(v2));
}
LOG.info("SUCCESS! Completed checking " + count + " records");
}
}

View File

@ -1,2 +1,15 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.ipc.TestSaslRPC$TestTokenIdentifier
org.apache.hadoop.security.token.delegation.TestDelegationToken$TestDelegationTokenIdentifier

View File

@ -1,3 +1,19 @@
#!/bin/sh
ps -ef | grep apacheds | grep -v grep | cut -f4 -d ' ' |xargs kill -9
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ps -ef | grep apacheds | grep -v grep | awk '{printf $2"\n"}' | xargs -t --no-run-if-empty kill -9

View File

@ -49,9 +49,6 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<includes>
<include>pom.xml</include>
</includes>
</configuration>
</plugin>
</plugins>

View File

@ -66,9 +66,6 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<includes>
<include>pom.xml</include>
</includes>
</configuration>
</plugin>
</plugins>

View File

@ -359,6 +359,8 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/classutils.txt</exclude>
<exclude>src/main/conf/httpfs-signature.secret</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -285,9 +285,6 @@ Trunk (Unreleased)
HDFS-4310. fix test org.apache.hadoop.hdfs.server.datanode.
TestStartSecureDataNode (Ivan A. Veselovsky via atm)
HDFS-4274. BlockPoolSliceScanner does not close verification log during
shutdown. (Chris Nauroth via suresh)
HDFS-4275. MiniDFSCluster-based tests fail on Windows due to failure
to delete test namenode directory. (Chris Nauroth via suresh)
@ -297,6 +294,8 @@ Trunk (Unreleased)
HDFS-4261. Fix bugs in Balaner causing infinite loop and
TestBalancerWithNodeGroup timeing out. (Junping Du via szetszwo)
HDFS-4382. Fix typo MAX_NOT_CHANGED_INTERATIONS. (Ted Yu via suresh)
Release 2.0.3-alpha - Unreleased
INCOMPATIBLE CHANGES
@ -307,6 +306,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4362. GetDelegationTokenResponseProto does not handle null token.
(suresh)
HDFS-4367. GetDataEncryptionKeyResponseProto does not handle null
response. (suresh)
NEW FEATURES
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
@ -467,8 +469,21 @@ Release 2.0.3-alpha - Unreleased
HDFS-4035. LightWeightGSet and LightWeightHashSet increment a
volatile without synchronization. (eli)
HDFS-4032. Specify the charset explicitly rather than rely on the
default. (eli)
HDFS-4363. Combine PBHelper and HdfsProtoUtil and remove redundant
methods. (suresh)
HDFS-4377. Some trivial DN comment cleanup. (eli)
HDFS-4381. Document fsimage format details in FSImageFormat class javadoc.
(Jing Zhao via suresh)
OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
BUG FIXES
HDFS-3919. MiniDFSCluster:waitClusterUp can hang forever.
@ -676,6 +691,20 @@ Release 2.0.3-alpha - Unreleased
HDFS-3970. Fix bug causing rollback of HDFS upgrade to result in bad
VERSION file. (Vinay and Andrew Wang via atm)
HDFS-4306. PBHelper.convertLocatedBlock miss convert BlockToken. (Binglin
Chang via atm)
HDFS-4384. test_libhdfs_threaded gets SEGV if JNIEnv cannot be
initialized. (Colin Patrick McCabe via eli)
HDFS-4328. TestLargeBlock#testLargeBlockSize is timing out. (Chris Nauroth
via atm)
HDFS-4274. BlockPoolSliceScanner does not close verification log during
shutdown. (Chris Nauroth via suresh)
HDFS-1245. Pluggable block id generation. (shv)
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
@ -2175,6 +2204,8 @@ Release 0.23.6 - UNRELEASED
HDFS-4248. Renaming directories may incorrectly remove the paths in leases
under the tree. (daryn via szetszwo)
HDFS-4385. Maven RAT plugin is not checking all source files (tgraves)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -242,3 +242,30 @@ For the org.apache.hadoop.util.bloom.* classes:
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
For src/main/native/util/tree.h:
/*-
* Copyright 2002 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

View File

@ -516,6 +516,8 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<exclude>src/test/resources/data*</exclude>
<exclude>src/test/resources/editsStored*</exclude>
<exclude>src/test/resources/empty-file</exclude>
<exclude>src/main/native/util/tree.h</exclude>
<exclude>src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataXceiverAspects.aj</exclude>
<exclude>src/main/webapps/datanode/robots.txt</exclude>
<exclude>src/main/docs/releasenotes.html</exclude>
<exclude>src/contrib/**</exclude>

View File

@ -1,3 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CONFIG_H
#define CONFIG_H

View File

@ -29,9 +29,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -177,7 +177,7 @@ public class BlockReaderFactory {
DataInputStream in =
new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
PBHelper.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
switch (resp.getStatus()) {
case SUCCESS:

View File

@ -79,7 +79,6 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@ -115,7 +114,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@ -128,6 +126,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -380,7 +379,7 @@ public class DFSClient implements java.io.Closeable {
/**
* Same as this(nameNodeUri, conf, null);
* @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
* @see #DFSClient(URI, Configuration, FileSystem.Statistics)
*/
public DFSClient(URI nameNodeUri, Configuration conf
) throws IOException {
@ -389,7 +388,7 @@ public class DFSClient implements java.io.Closeable {
/**
* Same as this(nameNodeUri, null, conf, stats);
* @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
* @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
*/
public DFSClient(URI nameNodeUri, Configuration conf,
FileSystem.Statistics stats)
@ -1153,8 +1152,8 @@ public class DFSClient implements java.io.Closeable {
/**
* Call {@link #create(String, FsPermission, EnumSet, short, long,
* Progressable, int)} with default <code>permission</code>
* {@link FsPermission#getDefault()}.
* Progressable, int, ChecksumOpt)} with default <code>permission</code>
* {@link FsPermission#getFileDefault()}.
*
* @param src File name
* @param overwrite overwrite an existing file if true
@ -1172,7 +1171,7 @@ public class DFSClient implements java.io.Closeable {
Progressable progress,
int buffersize)
throws IOException {
return create(src, FsPermission.getDefault(),
return create(src, FsPermission.getFileDefault(),
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
buffersize, null);
@ -1203,7 +1202,7 @@ public class DFSClient implements java.io.Closeable {
*
* @param src File name
* @param permission The permission of the directory being created.
* If null, use default permission {@link FsPermission#getDefault()}
* If null, use default permission {@link FsPermission#getFileDefault()}
* @param flag indicates create a new file or create/overwrite an
* existing file or append to an existing file
* @param createParent create missing parent directory if true
@ -1229,7 +1228,7 @@ public class DFSClient implements java.io.Closeable {
ChecksumOpt checksumOpt) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
permission = FsPermission.getFileDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
@ -1264,7 +1263,7 @@ public class DFSClient implements java.io.Closeable {
/**
* Same as {{@link #create(String, FsPermission, EnumSet, short, long,
* Progressable, int)} except that the permission
* Progressable, int, ChecksumOpt)} except that the permission
* is absolute (ie has already been masked with umask.
*/
public DFSOutputStream primitiveCreate(String src,
@ -1449,7 +1448,7 @@ public class DFSClient implements java.io.Closeable {
}
/**
* Delete file or directory.
* See {@link ClientProtocol#delete(String)}.
* See {@link ClientProtocol#delete(String, boolean)}.
*/
@Deprecated
public boolean delete(String src) throws IOException {
@ -1674,7 +1673,7 @@ public class DFSClient implements java.io.Closeable {
new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
@ -1721,8 +1720,8 @@ public class DFSClient implements java.io.Closeable {
md5.write(md5out);
// read crc-type
final DataChecksum.Type ct = HdfsProtoUtil.
fromProto(checksumData.getCrcType());
final DataChecksum.Type ct = PBHelper.convert(checksumData
.getCrcType());
if (i == 0) { // first block
crcType = ct;
} else if (crcType != DataChecksum.Type.MIXED
@ -1884,7 +1883,7 @@ public class DFSClient implements java.io.Closeable {
* @param isChecked
* If true, then check only active namenode's safemode status, else
* check first namenode's status.
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeActio,boolean)
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
*/
public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
return namenode.setSafeMode(action, isChecked);

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@ -66,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@ -883,7 +883,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
//ack
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
}
@ -1073,7 +1073,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(blockReplyStream));
PBHelper.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -222,12 +223,7 @@ public class DFSUtil {
* Converts a string to a byte array using UTF8 encoding.
*/
public static byte[] string2Bytes(String str) {
try {
return str.getBytes("UTF8");
} catch(UnsupportedEncodingException e) {
assert false : "UTF8 encoding is not supported ";
}
return null;
return str.getBytes(Charsets.UTF_8);
}
/**
@ -239,19 +235,14 @@ public class DFSUtil {
if (pathComponents.length == 1 && pathComponents[0].length == 0) {
return Path.SEPARATOR;
}
try {
StringBuilder result = new StringBuilder();
for (int i = 0; i < pathComponents.length; i++) {
result.append(new String(pathComponents[i], "UTF-8"));
result.append(new String(pathComponents[i], Charsets.UTF_8));
if (i < pathComponents.length - 1) {
result.append(Path.SEPARATOR_CHAR);
}
}
return result.toString();
} catch (UnsupportedEncodingException ex) {
assert false : "UTF8 encoding is not supported ";
}
return null;
}
/** Convert an object representing a path to a string. */

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@ -41,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.io.IOUtils;
@ -379,7 +378,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum);
//
// Get bytes in block, set streams
@ -389,7 +389,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
new BufferedInputStream(peer.getInputStream(), bufferSize));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
PBHelper.vintPrefixed(in));
RemoteBlockReader2.checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@ -43,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.security.token.Token;
@ -371,7 +370,8 @@ public class RemoteBlockReader2 implements BlockReader {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum);
//
// Get bytes in block
@ -379,7 +379,7 @@ public class RemoteBlockReader2 implements BlockReader {
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
PBHelper.vintPrefixed(in));
checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();

View File

@ -67,7 +67,10 @@ public class HdfsFileStatus {
this.modification_time = modification_time;
this.access_time = access_time;
this.permission = (permission == null) ?
FsPermission.getDefault() : permission;
((isdir || symlink!=null) ?
FsPermission.getDefault() :
FsPermission.getFileDefault()) :
permission;
this.owner = (owner == null) ? "" : owner;
this.group = (group == null) ? "" : group;
this.symlink = symlink;

View File

@ -1,180 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
/**
* Utilities for converting to and from protocol buffers used in the
* HDFS wire protocol, as well as some generic utilities useful
* for dealing with protocol buffers.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class HdfsProtoUtil {
//// Block Token ////
public static TokenProto toProto(Token<?> blockToken) {
return TokenProto.newBuilder()
.setIdentifier(ByteString.copyFrom(blockToken.getIdentifier()))
.setPassword(ByteString.copyFrom(blockToken.getPassword()))
.setKind(blockToken.getKind().toString())
.setService(blockToken.getService().toString())
.build();
}
public static Token<BlockTokenIdentifier> fromProto(TokenProto proto) {
return new Token<BlockTokenIdentifier>(proto.getIdentifier().toByteArray(),
proto.getPassword().toByteArray(),
new Text(proto.getKind()),
new Text(proto.getService()));
}
//// Extended Block ////
public static HdfsProtos.ExtendedBlockProto toProto(ExtendedBlock block) {
return HdfsProtos.ExtendedBlockProto.newBuilder()
.setBlockId(block.getBlockId())
.setPoolId(block.getBlockPoolId())
.setNumBytes(block.getNumBytes())
.setGenerationStamp(block.getGenerationStamp())
.build();
}
public static ExtendedBlock fromProto(HdfsProtos.ExtendedBlockProto proto) {
return new ExtendedBlock(
proto.getPoolId(), proto.getBlockId(),
proto.getNumBytes(), proto.getGenerationStamp());
}
//// DatanodeID ////
private static HdfsProtos.DatanodeIDProto toProto(
DatanodeID dni) {
return HdfsProtos.DatanodeIDProto.newBuilder()
.setIpAddr(dni.getIpAddr())
.setHostName(dni.getHostName())
.setStorageID(dni.getStorageID())
.setXferPort(dni.getXferPort())
.setInfoPort(dni.getInfoPort())
.setIpcPort(dni.getIpcPort())
.build();
}
private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) {
return new DatanodeID(
idProto.getIpAddr(),
idProto.getHostName(),
idProto.getStorageID(),
idProto.getXferPort(),
idProto.getInfoPort(),
idProto.getIpcPort());
}
//// DatanodeInfo ////
public static HdfsProtos.DatanodeInfoProto toProto(DatanodeInfo dni) {
return HdfsProtos.DatanodeInfoProto.newBuilder()
.setId(toProto((DatanodeID)dni))
.setCapacity(dni.getCapacity())
.setDfsUsed(dni.getDfsUsed())
.setRemaining(dni.getRemaining())
.setBlockPoolUsed(dni.getBlockPoolUsed())
.setLastUpdate(dni.getLastUpdate())
.setXceiverCount(dni.getXceiverCount())
.setLocation(dni.getNetworkLocation())
.setAdminState(HdfsProtos.DatanodeInfoProto.AdminState.valueOf(
dni.getAdminState().name()))
.build();
}
public static DatanodeInfo fromProto(HdfsProtos.DatanodeInfoProto dniProto) {
DatanodeInfo dniObj = new DatanodeInfo(fromProto(dniProto.getId()),
dniProto.getLocation());
dniObj.setCapacity(dniProto.getCapacity());
dniObj.setDfsUsed(dniProto.getDfsUsed());
dniObj.setRemaining(dniProto.getRemaining());
dniObj.setBlockPoolUsed(dniProto.getBlockPoolUsed());
dniObj.setLastUpdate(dniProto.getLastUpdate());
dniObj.setXceiverCount(dniProto.getXceiverCount());
dniObj.setAdminState(DatanodeInfo.AdminStates.valueOf(
dniProto.getAdminState().name()));
return dniObj;
}
public static ArrayList<? extends HdfsProtos.DatanodeInfoProto> toProtos(
DatanodeInfo[] dnInfos, int startIdx) {
ArrayList<HdfsProtos.DatanodeInfoProto> protos =
Lists.newArrayListWithCapacity(dnInfos.length);
for (int i = startIdx; i < dnInfos.length; i++) {
protos.add(toProto(dnInfos[i]));
}
return protos;
}
public static DatanodeInfo[] fromProtos(
List<HdfsProtos.DatanodeInfoProto> targetsList) {
DatanodeInfo[] ret = new DatanodeInfo[targetsList.size()];
int i = 0;
for (HdfsProtos.DatanodeInfoProto proto : targetsList) {
ret[i++] = fromProto(proto);
}
return ret;
}
public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}
int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslOutputStream;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
@ -399,7 +400,7 @@ public class DataTransferEncryptor {
DataEncryptionKey encryptionKey) {
return encryptionKey.keyId + NAME_DELIMITER +
encryptionKey.blockPoolId + NAME_DELIMITER +
new String(Base64.encodeBase64(encryptionKey.nonce, false));
new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
}
/**
@ -427,7 +428,7 @@ public class DataTransferEncryptor {
}
private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
return new String(Base64.encodeBase64(encryptionKey, false)).toCharArray();
return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
}
/**

View File

@ -21,12 +21,12 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -41,18 +41,16 @@ import org.apache.hadoop.util.DataChecksum;
public abstract class DataTransferProtoUtil {
static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) {
return BlockConstructionStage.valueOf(BlockConstructionStage.class,
stage.name());
return BlockConstructionStage.valueOf(stage.name());
}
static OpWriteBlockProto.BlockConstructionStage toProto(
BlockConstructionStage stage) {
return OpWriteBlockProto.BlockConstructionStage.valueOf(
stage.name());
return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
}
public static ChecksumProto toProto(DataChecksum checksum) {
ChecksumTypeProto type = HdfsProtoUtil.toProto(checksum.getChecksumType());
ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
// ChecksumType#valueOf never returns null
return ChecksumProto.newBuilder()
.setBytesPerChecksum(checksum.getBytesPerChecksum())
@ -64,8 +62,7 @@ public abstract class DataTransferProtoUtil {
if (proto == null) return null;
int bytesPerChecksum = proto.getBytesPerChecksum();
DataChecksum.Type type = HdfsProtoUtil.fromProto(proto.getType());
DataChecksum.Type type = PBHelper.convert(proto.getType());
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
}
@ -82,8 +79,8 @@ public abstract class DataTransferProtoUtil {
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken) {
return BaseHeaderProto.newBuilder()
.setBlock(HdfsProtoUtil.toProto(blk))
.setToken(HdfsProtoUtil.toProto(blockToken))
.setBlock(PBHelper.convert(blk))
.setToken(PBHelper.convert(blockToken))
.build();
}
}

View File

@ -55,12 +55,15 @@ public interface DataTransferProtocol {
* @param clientName client's name.
* @param blockOffset offset of the block.
* @param length maximum number of bytes for this read.
* @param sendChecksum if false, the DN should skip reading and sending
* checksums
*/
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException;
final long length,
final boolean sendChecksum) throws IOException;
/**
* Write a block to a datanode pipeline.

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException;
import java.io.InputStream;

View File

@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
import java.io.DataInputStream;
@ -34,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockPr
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
/** Receiver */
@InterfaceAudience.Private
@ -89,21 +88,22 @@ public abstract class Receiver implements DataTransferProtocol {
/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen());
proto.getLen(),
proto.getSendChecksums());
}
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
fromProtos(proto.getTargetsList()),
fromProto(proto.getSource()),
PBHelper.convert(proto.getTargetsList()),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
@ -115,42 +115,42 @@ public abstract class Receiver implements DataTransferProtocol {
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
fromProtos(proto.getTargetsList()));
PBHelper.convert(proto.getTargetsList()));
}
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
final OpRequestShortCircuitAccessProto proto =
OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
requestShortCircuitFds(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()),
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getMaxVersion());
}
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
replaceBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()),
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
fromProto(proto.getSource()));
PBHelper.convert(proto.getSource()));
}
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
copyBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()));
}
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
blockChecksum(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()));
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
import java.io.DataOutput;
@ -38,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockPr
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -64,6 +63,10 @@ public class Sender implements DataTransferProtocol {
private static void send(final DataOutputStream out, final Op opcode,
final Message proto) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
+ ": " + proto);
}
op(out, opcode);
proto.writeDelimitedTo(out);
out.flush();
@ -74,12 +77,14 @@ public class Sender implements DataTransferProtocol {
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
final long length,
final boolean sendChecksum) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.build();
send(out, Op.READ_BLOCK, proto);
@ -106,7 +111,7 @@ public class Sender implements DataTransferProtocol {
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.addAllTargets(toProtos(targets, 1))
.addAllTargets(PBHelper.convert(targets, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
@ -115,7 +120,7 @@ public class Sender implements DataTransferProtocol {
.setRequestedChecksum(checksumProto);
if (source != null) {
proto.setSource(toProto(source));
proto.setSource(PBHelper.convertDatanodeInfo(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
@ -130,7 +135,7 @@ public class Sender implements DataTransferProtocol {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken))
.addAllTargets(toProtos(targets, 0))
.addAllTargets(PBHelper.convert(targets))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@ -155,7 +160,7 @@ public class Sender implements DataTransferProtocol {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setDelHint(delHint)
.setSource(toProto(source))
.setSource(PBHelper.convertDatanodeInfo(source))
.build();
send(out, Op.REPLACE_BLOCK, proto);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@ -131,7 +130,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.RpcController;
@ -494,10 +492,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetDatanodeReportRequestProto req)
throws ServiceException {
try {
DatanodeInfoProto[] result = PBHelper.convert(server
List<? extends DatanodeInfoProto> result = PBHelper.convert(server
.getDatanodeReport(PBHelper.convert(req.getType())));
return GetDatanodeReportResponseProto.newBuilder()
.addAllDi(Arrays.asList(result)).build();
.addAllDi(result).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -841,10 +839,13 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetDataEncryptionKeyRequestProto request)
throws ServiceException {
try {
GetDataEncryptionKeyResponseProto.Builder builder =
GetDataEncryptionKeyResponseProto.newBuilder();
DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
return GetDataEncryptionKeyResponseProto.newBuilder()
.setDataEncryptionKey(PBHelper.convert(encryptionKey))
.build();
if (encryptionKey != null) {
builder.setDataEncryptionKey(PBHelper.convert(encryptionKey));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenResponseProto;
@ -111,7 +112,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
@ -282,7 +282,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (previous != null)
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
try {
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
@ -300,8 +300,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.newBuilder()
.setSrc(src)
.setBlk(PBHelper.convert(blk))
.addAllExistings(Arrays.asList(PBHelper.convert(existings)))
.addAllExcludes(Arrays.asList(PBHelper.convert(excludes)))
.addAllExistings(PBHelper.convert(existings))
.addAllExcludes(PBHelper.convert(excludes))
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)
.build();
@ -819,8 +819,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
.newBuilder().build();
try {
return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
.getDataEncryptionKey());
GetDataEncryptionKeyResponseProto rsp =
rpcProxy.getDataEncryptionKey(null, req);
return rsp.hasDataEncryptionKey() ?
PBHelper.convert(rsp.getDataEncryptionKey()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@ -40,10 +43,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
@ -127,15 +130,20 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
/**
* Utilities for converting protobuf classes to and from implementation classes.
* Utilities for converting protobuf classes to and from implementation classes
* and other helper utilities to help in dealing with protobuf.
*
* Note that when converting from an internal type to protobuf type, the
* converter never return null for protobuf type. The check for internal type
@ -219,7 +227,8 @@ public class PBHelper {
// Arrays of DatanodeId
public static DatanodeIDProto[] convert(DatanodeID[] did) {
if (did == null) return null;
if (did == null)
return null;
final int len = did.length;
DatanodeIDProto[] result = new DatanodeIDProto[len];
for (int i = 0; i < len; ++i) {
@ -483,13 +492,25 @@ public class PBHelper {
return result;
}
static public DatanodeInfoProto[] convert(DatanodeInfo[] di) {
if (di == null) return null;
DatanodeInfoProto[] result = new DatanodeInfoProto[di.length];
for (int i = 0; i < di.length; i++) {
result[i] = PBHelper.convertDatanodeInfo(di[i]);
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos) {
return convert(dnInfos, 0);
}
return result;
/**
* Copy from {@code dnInfos} to a target of list of same size starting at
* {@code startIdx}.
*/
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos, int startIdx) {
if (dnInfos == null)
return null;
ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
.newArrayListWithCapacity(dnInfos.length);
for (int i = startIdx; i < dnInfos.length; i++) {
protos.add(convert(dnInfos[i]));
}
return protos;
}
public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
@ -694,7 +715,7 @@ public class PBHelper {
DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
for (int i = 0; i < targets.length; i++) {
ret[i] = DatanodeInfosProto.newBuilder()
.addAllDatanodes(Arrays.asList(PBHelper.convert(targets[i]))).build();
.addAllDatanodes(PBHelper.convert(targets[i])).build();
}
return Arrays.asList(ret);
}
@ -863,25 +884,14 @@ public class PBHelper {
// Located Block Arrays and Lists
public static LocatedBlockProto[] convertLocatedBlock(LocatedBlock[] lb) {
if (lb == null) return null;
final int len = lb.length;
LocatedBlockProto[] result = new LocatedBlockProto[len];
for (int i = 0; i < len; ++i) {
result[i] = PBHelper.convert(lb[i]);
}
return result;
return convertLocatedBlock2(Arrays.asList(lb)).toArray(
new LocatedBlockProto[lb.length]);
}
public static LocatedBlock[] convertLocatedBlock(LocatedBlockProto[] lb) {
if (lb == null) return null;
final int len = lb.length;
LocatedBlock[] result = new LocatedBlock[len];
for (int i = 0; i < len; ++i) {
result[i] = new LocatedBlock(
PBHelper.convert(lb[i].getB()),
PBHelper.convert(lb[i].getLocsList()),
lb[i].getOffset(), lb[i].getCorrupt());
}
return result;
return convertLocatedBlock(Arrays.asList(lb)).toArray(
new LocatedBlock[lb.length]);
}
public static List<LocatedBlock> convertLocatedBlock(
@ -963,7 +973,7 @@ public class PBHelper {
fs.getFileBufferSize(),
fs.getEncryptDataTransfer(),
fs.getTrashInterval(),
HdfsProtoUtil.fromProto(fs.getChecksumType()));
PBHelper.convert(fs.getChecksumType()));
}
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@ -976,7 +986,7 @@ public class PBHelper {
.setFileBufferSize(fs.getFileBufferSize())
.setEncryptDataTransfer(fs.getEncryptDataTransfer())
.setTrashInterval(fs.getTrashInterval())
.setChecksumType(HdfsProtoUtil.toProto(fs.getChecksumType()))
.setChecksumType(PBHelper.convert(fs.getChecksumType()))
.build();
}
@ -1314,4 +1324,24 @@ public class PBHelper {
.setLayoutVersion(j.getLayoutVersion())
.setNamespaceID(j.getNamespaceId()).build();
}
public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}
int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.SecurityUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
@ -934,7 +935,7 @@ class Journal implements Closeable {
fos.write('\n');
// Write human-readable data after the protobuf. This is only
// to assist in debugging -- it's not parsed at all.
OutputStreamWriter writer = new OutputStreamWriter(fos);
OutputStreamWriter writer = new OutputStreamWriter(fos, Charsets.UTF_8);
writer.write(String.valueOf(newData));
writer.write('\n');

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -189,7 +190,7 @@ public class Balancer {
* balancing purpose at a datanode
*/
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
public static final int MAX_NO_PENDING_BLOCK_INTERATIONS = 5;
private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName()
@ -781,7 +782,7 @@ public class Balancer {
noPendingBlockIteration++;
// in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations.
if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_INTERATIONS) {
if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
scheduledSize = 0;
}
}

View File

@ -52,7 +52,7 @@ import org.apache.hadoop.util.Daemon;
class NameNodeConnector {
private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_INTERATIONS = 5;
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
final URI nameNodeUri;
final String blockpoolID;
@ -127,7 +127,7 @@ class NameNodeConnector {
notChangedIterations = 0;
} else {
notChangedIterations++;
if (notChangedIterations >= MAX_NOT_CHANGED_INTERATIONS) {
if (notChangedIterations >= MAX_NOT_CHANGED_ITERATIONS) {
System.out.println("No block has been moved for "
+ notChangedIterations + " iterations. Exiting...");
return false;

View File

@ -171,20 +171,19 @@ public class BlockManager {
*/
private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
//
// Keeps a TreeSet for every named node. Each treeset contains
// a list of the blocks that are "extra" at that location. We'll
// eventually remove these extras.
// Mapping: StorageID -> TreeSet<Block>
//
/**
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
new TreeMap<String, LightWeightLinkedSet<Block>>();
//
// Store set of Blocks that need to be replicated 1 or more times.
// We also store pending replication-orders.
//
/**
* Store set of Blocks that need to be replicated 1 or more times.
* We also store pending replication-orders.
*/
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
@VisibleForTesting
final PendingReplicationBlocks pendingReplications;

View File

@ -72,6 +72,8 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
import com.google.common.base.Charsets;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
@ -233,7 +235,7 @@ public class JspHelper {
readOffset += numRead;
}
blockReader.close(null, null);
out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
}
public static void addTableHeader(JspWriter out) throws IOException {

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.util.VersionInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Charsets;
/**
@ -658,7 +660,7 @@ public abstract class Storage extends StorageInfo {
FileLock res = null;
try {
res = file.getChannel().tryLock();
file.write(jvmName.getBytes());
file.write(jvmName.getBytes(Charsets.UTF_8));
LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
} catch(OverlappingFileLockException oe) {
LOG.error("It appears that another namenode " + file.readLine()

View File

@ -388,8 +388,8 @@ class BlockPoolSliceScanner {
try {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, datanode,
null);
blockSender = new BlockSender(block, 0, -1, false, true, true,
datanode, null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
/**
* Reads a block from the disk and sends it to a recipient.
*
@ -158,12 +160,14 @@ class BlockSender implements java.io.Closeable {
* @param length length of data to read
* @param corruptChecksumOk
* @param verifyChecksum verify checksum while reading the data
* @param sendChecksum send checksum to client.
* @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs
* @throws IOException
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
boolean sendChecksum,
DataNode datanode, String clientTraceFmt)
throws IOException {
try {
@ -175,6 +179,13 @@ class BlockSender implements java.io.Closeable {
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
this.datanode = datanode;
if (verifyChecksum) {
// To simplify implementation, callers may not specify verification
// without sending.
Preconditions.checkArgument(sendChecksum,
"If verifying checksum, currently must also send it.");
}
final Replica replica;
final long replicaVisibleLength;
synchronized(datanode.data) {
@ -213,7 +224,8 @@ class BlockSender implements java.io.Closeable {
* False, True: will verify checksum
* False, False: throws IOException file not found
*/
DataChecksum csum;
DataChecksum csum = null;
if (verifyChecksum || sendChecksum) {
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
@ -234,8 +246,15 @@ class BlockSender implements java.io.Closeable {
csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
// This only decides the buffer size. Use BUFFER_SIZE?
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 16 * 1024);
}
}
if (csum == null) {
// The number of bytes per checksum here determines the alignment
// of reads: we always start reading at a checksum chunk boundary,
// even if the checksum type is NULL. So, choosing too big of a value
// would risk sending too much unnecessary data. 512 (1 disk sector)
// is likely to result in minimal extra IO.
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
}
/*
@ -648,7 +667,7 @@ class BlockSender implements java.io.Closeable {
ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
while (endOffset > offset) {
while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
@ -656,6 +675,8 @@ class BlockSender implements java.io.Closeable {
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
// If this thread was interrupted, then it did not send the full block.
if (!Thread.currentThread().isInterrupted()) {
try {
// send an empty packet to mark the end of the block
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
@ -666,6 +687,7 @@ class BlockSender implements java.io.Closeable {
}
sentEntireByteRange = true;
}
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();

View File

@ -99,7 +99,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
@ -116,6 +115,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@ -1022,29 +1022,27 @@ public class DataNode extends Configured
dnId.setStorageID(createNewStorageId(dnId.getXferPort()));
}
static String createNewStorageId(int port) {
/* Return
* "DS-randInt-ipaddr-currentTimeMillis"
* It is considered extermely rare for all these numbers to match
* on a different machine accidentally for the following
* a) SecureRandom(INT_MAX) is pretty much random (1 in 2 billion), and
* b) Good chance ip address would be different, and
* c) Even on the same machine, Datanode is designed to use different ports.
* d) Good chance that these are started at different times.
* For a confict to occur all the 4 above have to match!.
* The format of this string can be changed anytime in future without
* affecting its functionality.
/**
* @return a unique storage ID of form "DS-randInt-ipaddr-port-timestamp"
*/
static String createNewStorageId(int port) {
// It is unlikely that we will create a non-unique storage ID
// for the following reasons:
// a) SecureRandom is a cryptographically strong random number generator
// b) IP addresses will likely differ on different hosts
// c) DataNode xfer ports will differ on the same host
// d) StorageIDs will likely be generated at different times (in ms)
// A conflict requires that all four conditions are violated.
// NB: The format of this string can be changed in the future without
// requiring that old SotrageIDs be updated.
String ip = "unknownIP";
try {
ip = DNS.getDefaultIP("default");
} catch (UnknownHostException ignored) {
LOG.warn("Could not find ip address of \"default\" inteface.");
LOG.warn("Could not find an IP address for the \"default\" inteface.");
}
int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
return "DS-" + rand + "-" + ip + "-" + port + "-"
+ Time.now();
return "DS-" + rand + "-" + ip + "-" + port + "-" + Time.now();
}
/** Ensure the authentication method is kerberos */
@ -1544,7 +1542,7 @@ public class DataNode extends Configured
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, DataNode.this, null);
false, false, true, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
@ -1569,7 +1567,7 @@ public class DataNode extends Configured
// read ack
if (isClient) {
DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
PBHelper.vintPrefixed(in));
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
}

View File

@ -62,7 +62,7 @@ import org.apache.hadoop.util.DiskChecker;
*/
@InterfaceAudience.Private
public class DataStorage extends Storage {
// Constants
public final static String BLOCK_SUBDIR_PREFIX = "subdir";
final static String BLOCK_FILE_PREFIX = "blk_";
final static String COPY_FILE_PREFIX = "dncp_";
@ -71,13 +71,13 @@ public class DataStorage extends Storage {
public final static String STORAGE_DIR_FINALIZED = "finalized";
public final static String STORAGE_DIR_TMP = "tmp";
/** Access to this variable is guarded by "this" */
/** Unique storage ID. {@see DataNode#createNewStorageId(int)} for details */
private String storageID;
// flag to ensure initialzing storage occurs only once
private boolean initilized = false;
// Flag to ensure we only initialize storage once
private boolean initialized = false;
// BlockPoolStorage is map of <Block pool Id, BlockPoolStorage>
// Maps block pool IDs to block pool storage
private Map<String, BlockPoolSliceStorage> bpStorageMap
= Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>());
@ -130,7 +130,7 @@ public class DataStorage extends Storage {
synchronized void recoverTransitionRead(DataNode datanode,
NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
throws IOException {
if (initilized) {
if (initialized) {
// DN storage has been initialized, no need to do anything
return;
}
@ -200,7 +200,7 @@ public class DataStorage extends Storage {
this.writeAll();
// 4. mark DN storage is initilized
this.initilized = true;
this.initialized = true;
}
/**

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@ -60,6 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -147,7 +147,7 @@ class DataXceiver extends Receiver implements Runnable {
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
private OutputStream getOutputStream() throws IOException {
private OutputStream getOutputStream() {
return socketOut;
}
@ -305,7 +305,8 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
final long length,
final boolean sendChecksum) throws IOException {
previousOpClientName = clientName;
OutputStream baseStream = getOutputStream();
@ -330,7 +331,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
true, false, datanode, clientTraceFmt);
true, false, sendChecksum, datanode, clientTraceFmt);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
@ -348,7 +349,7 @@ class DataXceiver extends Receiver implements Runnable {
// to respond with a Status enum.
try {
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
PBHelper.vintPrefixed(in));
if (!stat.hasStatus()) {
LOG.warn("Client " + peer.getRemoteAddressString() +
" did not send a valid status code after reading. " +
@ -510,7 +511,7 @@ class DataXceiver extends Receiver implements Runnable {
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@ -671,7 +672,7 @@ class DataXceiver extends Receiver implements Runnable {
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
.setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
.setCrcType(PBHelper.convert(checksum.getChecksumType()))
)
.build()
.writeDelimitedTo(out);
@ -720,7 +721,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, datanode,
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
null);
// set up response stream
@ -832,7 +833,7 @@ class DataXceiver extends Receiver implements Runnable {
// receive the response from the proxy
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(proxyReply));
PBHelper.vintPrefixed(proxyReply));
if (copyResponse.getStatus() != SUCCESS) {
if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {

View File

@ -19,16 +19,20 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import com.google.common.base.Charsets;
class RollingLogsImpl implements RollingLogs {
private static final String CURR_SUFFIX = ".curr";
private static final String PREV_SUFFIX = ".prev";
@ -40,7 +44,7 @@ class RollingLogsImpl implements RollingLogs {
private final File curr;
private final File prev;
private PrintStream out; //require synchronized access
private PrintWriter out; //require synchronized access
private Appender appender = new Appender() {
@Override
@ -82,7 +86,8 @@ class RollingLogsImpl implements RollingLogs {
RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
curr = new File(dir, filePrefix + CURR_SUFFIX);
prev = new File(dir, filePrefix + PREV_SUFFIX);
out = new PrintStream(new FileOutputStream(curr, true));
out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
curr, true), Charsets.UTF_8));
}
@Override
@ -108,7 +113,8 @@ class RollingLogsImpl implements RollingLogs {
synchronized(this) {
appender.close();
final boolean renamed = curr.renameTo(prev);
out = new PrintStream(new FileOutputStream(curr, true));
out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
curr, true), Charsets.UTF_8));
if (!renamed) {
throw new IOException("Failed to rename " + curr + " to " + prev);
}
@ -163,7 +169,8 @@ class RollingLogsImpl implements RollingLogs {
reader = null;
}
reader = new BufferedReader(new FileReader(file));
reader = new BufferedReader(new InputStreamReader(new FileInputStream(
file), Charsets.UTF_8));
return true;
}

View File

@ -1,3 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;

View File

@ -48,6 +48,8 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.znerd.xmlenc.XMLOutputter;
import com.google.common.base.Charsets;
/**
* This class generates the data that is needed to be displayed on cluster web
* console.
@ -873,7 +875,7 @@ class ClusterJspHelper {
URLConnection connection = url.openConnection();
BufferedReader in = new BufferedReader(
new InputStreamReader(
connection.getInputStream()));
connection.getInputStream(), Charsets.UTF_8));
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.append(inputLine);

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.IdGenerator;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@ -92,6 +93,7 @@ public class FSImage implements Closeable {
final private Configuration conf;
protected NNStorageRetentionManager archivalManager;
protected IdGenerator blockIdGenerator;
/**
* Construct an FSImage
@ -137,6 +139,9 @@ public class FSImage implements Closeable {
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
// BlockIdGenerator is defined during formatting
// currently there is only one BlockIdGenerator
blockIdGenerator = createBlockIdGenerator(fsn);
NamespaceInfo ns = NNStorage.newNamespaceInfo();
ns.clusterID = clusterId;
@ -253,6 +258,7 @@ public class FSImage implements Closeable {
doRollback();
break;
case REGULAR:
default:
// just load the image
}
@ -737,6 +743,9 @@ public class FSImage implements Closeable {
FSImageFormat.Loader loader = new FSImageFormat.Loader(
conf, target);
loader.load(curFile);
// BlockIdGenerator is determined after loading image
// currently there is only one BlockIdGenerator
blockIdGenerator = createBlockIdGenerator(target);
target.setBlockPoolId(this.getBlockPoolID());
// Check that the image digest we loaded matches up with what
@ -1165,4 +1174,12 @@ public class FSImage implements Closeable {
public synchronized long getMostRecentCheckpointTxId() {
return storage.getMostRecentCheckpointTxId();
}
public long getUniqueBlockId() {
return blockIdGenerator.nextValue();
}
public IdGenerator createBlockIdGenerator(FSNamesystem fsn) {
return new RandomBlockIdGenerator(fsn);
}
}

View File

@ -49,7 +49,57 @@ import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
/**
* Contains inner classes for reading or writing the on-disk format for FSImages.
* Contains inner classes for reading or writing the on-disk format for
* FSImages.
*
* In particular, the format of the FSImage looks like:
* <pre>
* FSImage {
* LayoutVersion: int, NamespaceID: int, NumberItemsInFSDirectoryTree: long,
* NamesystemGenerationStamp: long, TransactionID: long
* {FSDirectoryTree, FilesUnderConstruction, SecretManagerState} (can be compressed)
* }
*
* FSDirectoryTree (if {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is supported) {
* INodeInfo of root, NumberOfChildren of root: int
* [list of INodeInfo of root's children],
* [list of INodeDirectoryInfo of root's directory children]
* }
*
* FSDirectoryTree (if {@link Feature#FSIMAGE_NAME_OPTIMIZATION} not supported){
* [list of INodeInfo of INodes in topological order]
* }
*
* INodeInfo {
* {
* LocalName: short + byte[]
* } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is supported
* or
* {
* FullPath: byte[]
* } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is not supported
* ReplicationFactor: short, ModificationTime: long,
* AccessTime: long, PreferredBlockSize: long,
* NumberOfBlocks: int (-1 for INodeDirectory, -2 for INodeSymLink),
* {
* NsQuota: long, DsQuota: long, FsPermission: short, PermissionStatus
* } for INodeDirectory
* or
* {
* SymlinkString, FsPermission: short, PermissionStatus
* } for INodeSymlink
* or
* {
* [list of BlockInfo], FsPermission: short, PermissionStatus
* } for INodeFile
* }
*
* INodeDirectoryInfo {
* FullPath of the directory: short + byte[],
* NumberOfChildren: int, [list of INodeInfo of children INode]
* [list of INodeDirectoryInfo of the directory children]
* }
* </pre>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving

View File

@ -78,8 +78,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
@ -204,6 +205,7 @@ import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -1089,8 +1091,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
checkSuperuserPrivilege();
File file = new File(System.getProperty("hadoop.log.dir"), filename);
PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
true)));
PrintWriter out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
metaSave(out);
out.flush();
out.close();
@ -2536,10 +2538,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private Block allocateBlock(String src, INodesInPath inodesInPath,
DatanodeDescriptor targets[]) throws IOException {
assert hasWriteLock();
Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(DFSUtil.getRandom().nextLong());
}
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
// Increment the generation stamp for every new block.
b.setGenerationStamp(nextGenerationStamp());
b = dir.addBlock(src, inodesInPath, b, targets);
@ -4551,13 +4550,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
/**
* Returns whether the given block is one pointed-to by a file.
*/
private boolean isValidBlock(Block b) {
return (blockManager.getBlockCollection(b) != null);
}
PermissionStatus createFsOwnerPermissions(FsPermission permission) {
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.util.IdGenerator;
/**
* Generator of random block IDs.
*/
@InterfaceAudience.Private
public class RandomBlockIdGenerator implements IdGenerator {
private final BlockManager blockManager;
RandomBlockIdGenerator(FSNamesystem namesystem) {
this.blockManager = namesystem.getBlockManager();
}
@Override // NumberGenerator
public long nextValue() {
Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(DFSUtil.getRandom().nextLong());
}
return b.getBlockId();
}
/**
* Returns whether the given block is one pointed-to by a file.
*/
private boolean isValidBlock(Block b) {
return (blockManager.getBlockCollection(b) != null);
}
}

View File

@ -17,7 +17,8 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.io.PrintStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletContext;
@ -32,6 +33,8 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Charsets;
/**
* Renew delegation tokens over http for use in hftp.
*/
@ -73,7 +76,8 @@ public class RenewDelegationTokenServlet extends DfsServlet {
return nn.getRpcServer().renewDelegationToken(token);
}
});
PrintStream os = new PrintStream(resp.getOutputStream());
final PrintWriter os = new PrintWriter(new OutputStreamWriter(
resp.getOutputStream(), Charsets.UTF_8));
os.println(result);
os.close();
} catch(Exception e) {

View File

@ -20,7 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode.web.resources;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
@ -102,6 +103,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import com.google.common.base.Charsets;
import com.sun.jersey.spi.container.ResourceFilters;
/** Web-hdfs NameNode implementation. */
@ -713,7 +715,8 @@ public class NamenodeWebHdfsMethods {
return new StreamingOutput() {
@Override
public void write(final OutputStream outstream) throws IOException {
final PrintStream out = new PrintStream(outstream);
final PrintWriter out = new PrintWriter(new OutputStreamWriter(
outstream, Charsets.UTF_8));
out.println("{\"" + FileStatus.class.getSimpleName() + "es\":{\""
+ FileStatus.class.getSimpleName() + "\":[");
@ -736,6 +739,7 @@ public class NamenodeWebHdfsMethods {
out.println();
out.println("]}}");
out.flush();
}
};
}

View File

@ -55,6 +55,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.GenericOptionsParser;
import com.google.common.base.Charsets;
/**
* Fetch a DelegationToken from the current Namenode and store it in the
* specified file.
@ -269,8 +271,8 @@ public class DelegationTokenFetcher {
throw new IOException("Error renewing token: " +
connection.getResponseMessage());
}
in = new BufferedReader(new InputStreamReader
(connection.getInputStream()));
in = new BufferedReader(
new InputStreamReader(connection.getInputStream(), Charsets.UTF_8));
long result = Long.parseLong(in.readLine());
in.close();
return result;

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Stack;
import org.apache.hadoop.classification.InterfaceAudience;
@ -39,6 +40,8 @@ import org.xml.sax.XMLReader;
import org.xml.sax.helpers.DefaultHandler;
import org.xml.sax.helpers.XMLReaderFactory;
import com.google.common.base.Charsets;
/**
* OfflineEditsXmlLoader walks an EditsVisitor over an OEV XML file
*/
@ -48,7 +51,7 @@ class OfflineEditsXmlLoader
extends DefaultHandler implements OfflineEditsLoader {
private final boolean fixTxIds;
private final OfflineEditsVisitor visitor;
private final FileReader fileReader;
private final InputStreamReader fileReader;
private ParseState state;
private Stanza stanza;
private Stack<Stanza> stanzaStack;
@ -70,7 +73,8 @@ class OfflineEditsXmlLoader
public OfflineEditsXmlLoader(OfflineEditsVisitor visitor,
File inputFile, OfflineEditsViewer.Flags flags) throws FileNotFoundException {
this.visitor = visitor;
this.fileReader = new FileReader(inputFile);
this.fileReader =
new InputStreamReader(new FileInputStream(inputFile), Charsets.UTF_8);
this.fixTxIds = flags.getFixTxIds();
}

View File

@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Map;
import java.util.HashMap;
@ -29,6 +30,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import com.google.common.base.Charsets;
/**
* StatisticsEditsVisitor implements text version of EditsVisitor
* that aggregates counts of op codes processed
@ -37,7 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class StatisticsEditsVisitor implements OfflineEditsVisitor {
final private PrintStream out;
final private PrintWriter out;
private int version = -1;
private final Map<FSEditLogOpCodes, Long> opCodeCount =
@ -52,7 +55,7 @@ public class StatisticsEditsVisitor implements OfflineEditsVisitor {
* @param printToScreen Mirror output to screen?
*/
public StatisticsEditsVisitor(OutputStream out) throws IOException {
this.out = new PrintStream(out);
this.out = new PrintWriter(new OutputStreamWriter(out, Charsets.UTF_8));
}
/** Start the visitor */

View File

@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import com.google.common.base.Charsets;
/**
* TextWriterImageProcessor mixes in the ability for ImageVisitor
@ -34,7 +38,7 @@ import java.io.IOException;
abstract class TextWriterImageVisitor extends ImageVisitor {
private boolean printToScreen = false;
private boolean okToWrite = false;
final private FileWriter fw;
final private OutputStreamWriter fw;
/**
* Create a processor that writes to the file named.
@ -56,7 +60,7 @@ abstract class TextWriterImageVisitor extends ImageVisitor {
throws IOException {
super();
this.printToScreen = printToScreen;
fw = new FileWriter(filename);
fw = new OutputStreamWriter(new FileOutputStream(filename), Charsets.UTF_8);
okToWrite = true;
}

View File

@ -96,7 +96,12 @@ public class DataTransferThrottler {
// Wait for next period so that curReserve can be increased.
try {
wait( curPeriodEnd - now );
} catch (InterruptedException ignored) {}
} catch (InterruptedException e) {
// Abort throttle and reset interrupted status to make sure other
// interrupt handling higher in the call stack executes.
Thread.currentThread().interrupt();
break;
}
} else if ( now < (curPeriodStart + periodExtension)) {
curPeriodStart = curPeriodEnd;
curReserve += bytesPerPeriod;

View File

@ -20,9 +20,9 @@ package org.apache.hadoop.hdfs.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.regex.Matcher;
@ -34,6 +34,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Charsets;
/**
* Static functions for dealing with files of the same format
* that the Unix "md5sum" utility writes.
@ -78,7 +80,8 @@ public abstract class MD5FileUtils {
}
BufferedReader reader =
new BufferedReader(new FileReader(md5File));
new BufferedReader(new InputStreamReader(new FileInputStream(
md5File), Charsets.UTF_8));
try {
md5Line = reader.readLine();
if (md5Line == null) { md5Line = ""; }
@ -138,7 +141,7 @@ public abstract class MD5FileUtils {
String md5Line = digestString + " *" + dataFile.getName() + "\n";
AtomicFileOutputStream afos = new AtomicFileOutputStream(md5File);
afos.write(md5Line.getBytes());
afos.write(md5Line.getBytes(Charsets.UTF_8));
afos.close();
LOG.debug("Saved MD5 " + digest + " to " + md5File);
}

View File

@ -19,14 +19,18 @@ package org.apache.hadoop.hdfs.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Charsets;
/**
* Class that represents a file on disk which persistently stores
* a single <code>long</code> value. The file is updated atomically
@ -74,7 +78,7 @@ public class PersistentLongFile {
public static void writeFile(File file, long val) throws IOException {
AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
try {
fos.write(String.valueOf(val).getBytes());
fos.write(String.valueOf(val).getBytes(Charsets.UTF_8));
fos.write('\n');
fos.close();
fos = null;
@ -88,7 +92,9 @@ public class PersistentLongFile {
public static long readFile(File file, long defaultVal) throws IOException {
long val = defaultVal;
if (file.exists()) {
BufferedReader br = new BufferedReader(new FileReader(file));
BufferedReader br =
new BufferedReader(new InputStreamReader(new FileInputStream(
file), Charsets.UTF_8));
try {
val = Long.valueOf(br.readLine());
br.close();

View File

@ -105,6 +105,8 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelect
import org.apache.hadoop.util.Progressable;
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Charsets;
/** A FileSystem for HDFS over the web. */
public class WebHdfsFileSystem extends FileSystem
implements DelegationTokenRenewer.Renewable {
@ -281,7 +283,7 @@ public class WebHdfsFileSystem extends FileSystem
+ "\" (parsed=\"" + parsed + "\")");
}
}
return (Map<?, ?>)JSON.parse(new InputStreamReader(in));
return (Map<?, ?>)JSON.parse(new InputStreamReader(in, Charsets.UTF_8));
}
private static Map<?, ?> validateResponse(final HttpOpParam.Op op,

View File

@ -52,7 +52,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
if (!env) {
fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
goto error;
return NULL;
}
cl = calloc(1, sizeof(struct NativeMiniDfsCluster));
if (!cl) {

View File

@ -443,7 +443,7 @@ message GetDataEncryptionKeyRequestProto { // no parameters
}
message GetDataEncryptionKeyResponseProto {
required DataEncryptionKeyProto dataEncryptionKey = 1;
optional DataEncryptionKeyProto dataEncryptionKey = 1;
}
service ClientNamenodeProtocol {

View File

@ -52,6 +52,7 @@ message OpReadBlockProto {
required ClientOperationHeaderProto header = 1;
required uint64 offset = 2;
required uint64 len = 3;
optional bool sendChecksums = 4 [default = true];
}

View File

@ -1,2 +1,15 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier

View File

@ -1,3 +1,16 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.hdfs.DFSClient$Renewer
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
org.apache.hadoop.hdfs.HftpFileSystem$TokenManager

View File

@ -444,21 +444,21 @@ public class TestDataTransferProtocol {
recvBuf.reset();
blk.setBlockId(blkid-1);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen);
0L, fileLen, true);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-1L, fileLen);
-1L, fileLen, true);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
fileLen, fileLen);
fileLen, fileLen, true);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@ -475,7 +475,7 @@ public class TestDataTransferProtocol {
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil));
0L, -1L-random.nextInt(oneMil), true);
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@ -488,14 +488,14 @@ public class TestDataTransferProtocol {
recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1);
0L, fileLen+1, true);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen);
0L, fileLen, true);
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();

View File

@ -158,7 +158,7 @@ public class TestLargeBlock {
* Test for block size of 2GB + 512B
* @throws IOException in case of errors
*/
@Test
@Test(timeout = 120000)
public void testLargeBlockSize() throws IOException {
final long blockSize = 2L * 1024L * 1024L * 1024L + 512L; // 2GB + 512B
runTest(blockSize);

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;

View File

@ -52,6 +52,7 @@ public class TestParallelReadUtil {
static final int FILE_SIZE_K = 256;
static Random rand = null;
static final int DEFAULT_REPLICATION_FACTOR = 2;
protected boolean verifyChecksums = true;
static {
// The client-trace log ends up causing a lot of blocking threads
@ -323,7 +324,8 @@ public class TestParallelReadUtil {
testInfo.filepath = new Path("/TestParallelRead.dat." + i);
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
testInfo.dis = dfsClient.open(testInfo.filepath.toString());
testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
dfsClient.dfsClientConf.ioBufferSize, verifyChecksums);
for (int j = 0; j < nWorkerEach; ++j) {
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);
@ -413,4 +415,10 @@ public class TestParallelReadUtil {
public void testParallelReadMixed() throws IOException {
runTestWorkload(new MixedWorkloadHelper());
}
@Test
public void testParallelNoChecksums() throws IOException {
verifyChecksums = false;
runTestWorkload(new MixedWorkloadHelper());
}
}

View File

@ -24,11 +24,14 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.log4j.Level;
import org.junit.Test;
/**
@ -194,11 +197,19 @@ public class TestPread {
*/
@Test
public void testPreadDFS() throws IOException {
dfsPreadTest(false); //normal pread
dfsPreadTest(true); //trigger read code path without transferTo.
dfsPreadTest(false, true); //normal pread
dfsPreadTest(true, true); //trigger read code path without transferTo.
}
private void dfsPreadTest(boolean disableTransferTo) throws IOException {
@Test
public void testPreadDFSNoChecksum() throws IOException {
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
dfsPreadTest(false, false);
dfsPreadTest(true, false);
}
private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
@ -210,6 +221,7 @@ public class TestPread {
}
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fileSys = cluster.getFileSystem();
fileSys.setVerifyChecksum(verifyChecksum);
try {
Path file1 = new Path("preadtest.dat");
writeFile(fileSys, file1);

View File

@ -1,42 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestHdfsProtoUtil {
@Test
public void testChecksumTypeProto() {
assertEquals(DataChecksum.Type.NULL,
HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
assertEquals(DataChecksum.Type.CRC32,
HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
assertEquals(DataChecksum.Type.CRC32C,
HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.NULL),
HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32C),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@ -70,6 +71,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Test;
import com.google.common.base.Joiner;
@ -404,25 +406,69 @@ public class TestPBHelper {
assertEquals(expected.getService(), actual.getService());
}
@Test
public void testConvertLocatedBlock() {
DatanodeInfo [] dnInfos = {
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1", AdminStates.DECOMMISSION_INPROGRESS),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2", AdminStates.DECOMMISSIONED),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", AdminStates.NORMAL)
private void compare(LocatedBlock expected, LocatedBlock actual) {
assertEquals(expected.getBlock(), actual.getBlock());
compare(expected.getBlockToken(), actual.getBlockToken());
assertEquals(expected.getStartOffset(), actual.getStartOffset());
assertEquals(expected.isCorrupt(), actual.isCorrupt());
DatanodeInfo [] ei = expected.getLocations();
DatanodeInfo [] ai = actual.getLocations();
assertEquals(ei.length, ai.length);
for (int i = 0; i < ei.length ; i++) {
compare(ei[i], ai[i]);
}
}
private LocatedBlock createLocatedBlock() {
DatanodeInfo[] dnInfos = {
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1",
AdminStates.DECOMMISSION_INPROGRESS),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
AdminStates.DECOMMISSIONED),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL)
};
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
lb.setBlockToken(new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service")));
return lb;
}
@Test
public void testConvertLocatedBlock() {
LocatedBlock lb = createLocatedBlock();
LocatedBlockProto lbProto = PBHelper.convert(lb);
LocatedBlock lb2 = PBHelper.convert(lbProto);
assertEquals(lb.getBlock(), lb2.getBlock());
compare(lb.getBlockToken(), lb2.getBlockToken());
assertEquals(lb.getStartOffset(), lb2.getStartOffset());
assertEquals(lb.isCorrupt(), lb2.isCorrupt());
DatanodeInfo [] dnInfos2 = lb2.getLocations();
assertEquals(dnInfos.length, dnInfos2.length);
for (int i = 0; i < dnInfos.length ; i++) {
compare(dnInfos[i], dnInfos2[i]);
compare(lb,lb2);
}
@Test
public void testConvertLocatedBlockList() {
ArrayList<LocatedBlock> lbl = new ArrayList<LocatedBlock>();
for (int i=0;i<3;i++) {
lbl.add(createLocatedBlock());
}
List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlock2(lbl);
List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlock(lbpl);
assertEquals(lbl.size(), lbl2.size());
for (int i=0;i<lbl.size();i++) {
compare(lbl.get(i), lbl2.get(2));
}
}
@Test
public void testConvertLocatedBlockArray() {
LocatedBlock [] lbl = new LocatedBlock[3];
for (int i=0;i<3;i++) {
lbl[i] = createLocatedBlock();
}
LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl);
LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl);
assertEquals(lbl.length, lbl2.length);
for (int i=0;i<lbl.length;i++) {
compare(lbl[i], lbl2[i]);
}
}
@ -471,4 +517,20 @@ public class TestPBHelper {
}
}
}
@Test
public void testChecksumTypeProto() {
assertEquals(DataChecksum.Type.NULL,
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
assertEquals(DataChecksum.Type.CRC32,
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
assertEquals(DataChecksum.Type.CRC32C,
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
assertEquals(PBHelper.convert(DataChecksum.Type.NULL),
HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
}
}

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.junit.Test;
import com.google.common.base.Charsets;
/**
*
@ -45,7 +47,7 @@ public class TestPathComponents {
String pathString = str;
byte[][] oldPathComponents = INode.getPathComponents(pathString);
byte[][] newPathComponents =
DFSUtil.bytes2byteArray(pathString.getBytes("UTF-8"),
DFSUtil.bytes2byteArray(pathString.getBytes(Charsets.UTF_8),
(byte) Path.SEPARATOR_CHAR);
if (oldPathComponents[0] == null) {
assertTrue(oldPathComponents[0] == newPathComponents[0]);

View File

@ -1,3 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.IOException;

View File

@ -1 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.tools.FakeRenewer

View File

@ -48,9 +48,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<includes>
<include>pom.xml</include>
</includes>
</configuration>
</plugin>
</plugins>

View File

@ -204,6 +204,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4920. Use security token protobuf definition from hadoop common.
(Suresh Srinivas via vinodkv)
MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus
calls. (sandyr via tucu)
OPTIMIZATIONS
BUG FIXES
@ -685,6 +688,14 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally
exits (Jason Lowe via tgraves)
MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
Chen via jlowe)
MAPREDUCE-4921. JobClient should acquire HS token with RM principal
(daryn via bobby)
MAPREDUCE-4934. Maven RAT plugin is not checking all source files (tgraves)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1,5 +1,18 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->

View File

@ -579,7 +579,7 @@ public class MRAppMaster extends CompositeService {
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext.getApplicationAttemptId(),
appContext.getClock(), getCommitter());
appContext.getClock(), getCommitter(), isNewApiCommitter());
}
/** Create and initialize (but don't start) a single job.

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -100,6 +101,7 @@ public class RecoveryService extends CompositeService implements Recovery {
private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
private final boolean newApiCommitter;
private final Dispatcher dispatcher;
private final ControlledClock clock;
@ -113,10 +115,11 @@ public class RecoveryService extends CompositeService implements Recovery {
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
Clock clock, OutputCommitter committer) {
Clock clock, OutputCommitter committer, boolean newApiCommitter) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
this.newApiCommitter = newApiCommitter;
this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@ -360,8 +363,17 @@ public class RecoveryService extends CompositeService implements Recovery {
switch (state) {
case SUCCEEDED:
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
// check the committer type and construct corresponding context
TaskAttemptContext taskContext = null;
if(newApiCommitter) {
taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
} else {
taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
TypeConverter.fromYarn(aId));
}
try {
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);

View File

@ -1 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo

View File

@ -626,6 +626,115 @@ public class TestRecovery {
validateOutput();
}
@Test
public void testRecoveryWithOldCommiter() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", false);
conf.setBoolean("mapred.reducer.new-api", false);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", false);
conf.setBoolean("mapred.reducer.new-api", false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
reduceTask1 = it.next();
Task reduceTask2 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask2, TaskState.RUNNING);
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
.iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd reduce task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,

Some files were not shown because too many files have changed in this diff Show More