Merge r1412283 through r1414454 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1414456 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-27 23:02:00 +00:00
commit 1ce3274dd5
55 changed files with 1658 additions and 769 deletions

View File

@ -138,6 +138,9 @@ Trunk (Unreleased)
HADOOP-9075. FileContext#FSLinkResolver should be made static.
(Arpit Agarwal via suresh)
HADOOP-9093. Move all the Exception in PathExceptions to o.a.h.fs package.
(suresh)
BUG FIXES
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
@ -447,6 +450,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
should register/deregister to/from. (Karthik Kambatla via tomwhite)
HADOOP-9064. Augment DelegationTokenRenewer API to cancel the tokens on
calls to removeRenewAction. (kkambatl via tucu)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1152,6 +1158,12 @@ Release 0.23.6 - UNRELEASED
HADOOP-9072. Hadoop-Common-0.23-Build Fails to build in Jenkins
(Robert Parker via tgraves)
HADOOP-8992. Enhance unit-test coverage of class HarFileSystem (Ivan A.
Veselovsky via bobby)
HADOOP-9038. unit-tests for AllocatorPerContext.PathIterator (Ivan A.
Veselovsky via bobby)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -291,5 +291,13 @@
<Field name="previousSnapshot" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
The method uses a generic type T that extends two other types
T1 and T2. Findbugs complains of a cast from T1 to T2.
-->
<Match>
<Class name="org.apache.hadoop.fs.DelegationTokenRenewer" />
<Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
</FindBugsFilter>

View File

@ -24,6 +24,8 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -35,6 +37,9 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private
public class DelegationTokenRenewer
extends Thread {
private static final Log LOG = LogFactory
.getLog(DelegationTokenRenewer.class);
/** The renewable interface used by the renewer. */
public interface Renewable {
/** @return the renew token. */
@ -168,11 +173,24 @@ public class DelegationTokenRenewer
}
}
/** Remove the associated renew action from the queue */
/**
* Remove the associated renew action from the queue
*
* @throws IOException
*/
public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
final T fs) {
final T fs) throws IOException {
for (RenewAction<?> action : queue) {
if (action.weakFs.get() == fs) {
try {
fs.getRenewToken().cancel(fs.getConf());
} catch (InterruptedException ie) {
LOG.error("Interrupted while canceling token for " + fs.getUri()
+ "filesystem");
if (LOG.isDebugEnabled()) {
LOG.debug(ie.getStackTrace());
}
}
queue.remove(action);
return;
}

View File

@ -584,13 +584,6 @@ public class HarFileSystem extends FilterFileSystem {
public String getName() {
return name;
}
public List<String> getChildren() {
return children;
}
public String getFileName() {
return name;
}
public String getPartName() {
return partName;
}
@ -662,15 +655,6 @@ public class HarFileSystem extends FilterFileSystem {
hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
}
/*
* create throws an exception in Har filesystem.
* The archive once created cannot be changed.
*/
public FSDataOutputStream create(Path f, int bufferSize)
throws IOException {
throw new IOException("Har: Create not allowed");
}
@Override
public FSDataOutputStream create(Path f,
FsPermission permission,
@ -1106,4 +1090,11 @@ public class HarFileSystem extends FilterFileSystem {
}
}
}
/*
* testing purposes only:
*/
HarMetaData getMetadata() {
return metadata;
}
}

View File

@ -481,12 +481,15 @@ public class LocalDirAllocator {
@Override
public Path next() {
Path result = next;
final Path result = next;
try {
advance();
} catch (IOException ie) {
throw new RuntimeException("Can't check existance of " + next, ie);
}
if (result == null) {
throw new NoSuchElementException();
}
return result;
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/** EACCES */
public class PathAccessDeniedException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathAccessDeniedException(String path) {
super(path, "Permission denied");
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/**
* Exception corresponding to File Exists - EEXISTS
*/
public class PathExistsException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathExistsException(String path) {
super(path, "File exists");
}
protected PathExistsException(String path, String error) {
super(path, error);
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
/**
* Exceptions based on standard posix/linux style exceptions for path related
* errors. Returns an exception with the format "path: standard error string".
*
* This exception corresponds to Error Input/ouput(EIO)
*/
public class PathIOException extends IOException {
static final long serialVersionUID = 0L;
private static final String EIO = "Input/output error";
// NOTE: this really should be a Path, but a Path is buggy and won't
// return the exact string used to construct the path, and it mangles
// uris with no authority
private String operation;
private String path;
private String targetPath;
/**
* Constructor a generic I/O error exception
* @param path for the exception
*/
public PathIOException(String path) {
this(path, EIO, null);
}
/**
* Appends the text of a Throwable to the default error message
* @param path for the exception
* @param cause a throwable to extract the error message
*/
public PathIOException(String path, Throwable cause) {
this(path, EIO, cause);
}
/**
* Avoid using this method. Use a subclass of PathIOException if
* possible.
* @param path for the exception
* @param error custom string to use an the error text
*/
public PathIOException(String path, String error) {
this(path, error, null);
}
protected PathIOException(String path, String error, Throwable cause) {
super(error, cause);
this.path = path;
}
/** Format:
* cmd: {operation} `path' {to `target'}: error string
*/
@Override
public String getMessage() {
StringBuilder message = new StringBuilder();
if (operation != null) {
message.append(operation + " ");
}
message.append(formatPath(path));
if (targetPath != null) {
message.append(" to " + formatPath(targetPath));
}
message.append(": " + super.getMessage());
if (getCause() != null) {
message.append(": " + getCause().getMessage());
}
return message.toString();
}
/** @return Path that generated the exception */
public Path getPath() { return new Path(path); }
/** @return Path if the operation involved copying or moving, else null */
public Path getTargetPath() {
return (targetPath != null) ? new Path(targetPath) : null;
}
/**
* Optional operation that will preface the path
* @param operation a string
*/
public void setOperation(String operation) {
this.operation = operation;
}
/**
* Optional path if the exception involved two paths, ex. a copy operation
* @param targetPath the of the operation
*/
public void setTargetPath(String targetPath) {
this.targetPath = targetPath;
}
private String formatPath(String path) {
return "`" + path + "'";
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/** EISDIR */
public class PathIsDirectoryException extends PathExistsException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathIsDirectoryException(String path) {
super(path, "Is a directory");
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/** ENOTDIR */
public class PathIsNotDirectoryException extends PathExistsException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathIsNotDirectoryException(String path) {
super(path, "Is not a directory");
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/** Generated by rm commands */
public class PathIsNotEmptyDirectoryException extends PathExistsException {
/** @param path for the exception */
public PathIsNotEmptyDirectoryException(String path) {
super(path, "Directory is not empty");
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/**
* Exception corresponding to Permission denied - ENOENT
*/
public class PathNotFoundException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathNotFoundException(String path) {
super(path, "No such file or directory");
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/** ENOTSUP */
public class PathOperationException extends PathExistsException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathOperationException(String path) {
super(path, "Operation not supported");
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/**
* Exception corresponding to Operation Not Permitted - EPERM
*/
public class PathPermissionException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathPermissionException(String path) {
super(path, "Operation not permitted");
}
}

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.util.StringUtils;
/**

View File

@ -27,12 +27,12 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
import org.apache.hadoop.fs.shell.PathExceptions.PathOperationException;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.PathOperationException;
import org.apache.hadoop.io.IOUtils;
/**

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.IOUtils;
/** Various commands for copy files */

View File

@ -24,11 +24,11 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotEmptyDirectoryException;
/**
* Classes that delete paths

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;

View File

@ -24,10 +24,10 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
/**
* Create the given dir

View File

@ -23,8 +23,8 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
/** Various commands for moving files */
@InterfaceAudience.Private

View File

@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
/**
* Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).

View File

@ -1,203 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.shell;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
/**
* Standardized posix/linux style exceptions for path related errors.
* Returns an IOException with the format "path: standard error string".
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings("serial")
public class PathExceptions {
/** EIO */
public static class PathIOException extends IOException {
static final long serialVersionUID = 0L;
private static final String EIO = "Input/output error";
// NOTE: this really should be a Path, but a Path is buggy and won't
// return the exact string used to construct the path, and it mangles
// uris with no authority
private String operation;
private String path;
private String targetPath;
/**
* Constructor a generic I/O error exception
* @param path for the exception
*/
public PathIOException(String path) {
this(path, EIO, null);
}
/**
* Appends the text of a Throwable to the default error message
* @param path for the exception
* @param cause a throwable to extract the error message
*/
public PathIOException(String path, Throwable cause) {
this(path, EIO, cause);
}
/**
* Avoid using this method. Use a subclass of PathIOException if
* possible.
* @param path for the exception
* @param error custom string to use an the error text
*/
public PathIOException(String path, String error) {
this(path, error, null);
}
protected PathIOException(String path, String error, Throwable cause) {
super(error, cause);
this.path = path;
}
/** Format:
* cmd: {operation} `path' {to `target'}: error string
*/
@Override
public String getMessage() {
StringBuilder message = new StringBuilder();
if (operation != null) {
message.append(operation + " ");
}
message.append(formatPath(path));
if (targetPath != null) {
message.append(" to " + formatPath(targetPath));
}
message.append(": " + super.getMessage());
if (getCause() != null) {
message.append(": " + getCause().getMessage());
}
return message.toString();
}
/** @return Path that generated the exception */
public Path getPath() { return new Path(path); }
/** @return Path if the operation involved copying or moving, else null */
public Path getTargetPath() {
return (targetPath != null) ? new Path(targetPath) : null;
}
/**
* Optional operation that will preface the path
* @param operation a string
*/
public void setOperation(String operation) {
this.operation = operation;
}
/**
* Optional path if the exception involved two paths, ex. a copy operation
* @param targetPath the of the operation
*/
public void setTargetPath(String targetPath) {
this.targetPath = targetPath;
}
private String formatPath(String path) {
return "`" + path + "'";
}
}
/** ENOENT */
public static class PathNotFoundException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathNotFoundException(String path) {
super(path, "No such file or directory");
}
}
/** EEXISTS */
public static class PathExistsException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathExistsException(String path) {
super(path, "File exists");
}
protected PathExistsException(String path, String error) {
super(path, error);
}
}
/** EISDIR */
public static class PathIsDirectoryException extends PathExistsException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathIsDirectoryException(String path) {
super(path, "Is a directory");
}
}
/** ENOTDIR */
public static class PathIsNotDirectoryException extends PathExistsException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathIsNotDirectoryException(String path) {
super(path, "Is not a directory");
}
}
/** Generated by rm commands */
public static class PathIsNotEmptyDirectoryException extends PathExistsException {
/** @param path for the exception */
public PathIsNotEmptyDirectoryException(String path) {
super(path, "Directory is not empty");
}
}
/** EACCES */
public static class PathAccessDeniedException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathAccessDeniedException(String path) {
super(path, "Permission denied");
}
}
/** EPERM */
public static class PathPermissionException extends PathIOException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathPermissionException(String path) {
super(path, "Operation not permitted");
}
}
/** ENOTSUP */
public static class PathOperationException extends PathExistsException {
static final long serialVersionUID = 0L;
/** @param path for the exception */
public PathOperationException(String path) {
super(path, "Operation not supported");
}
}
}

View File

@ -25,7 +25,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.PathIOException;
/**
* Modifies the replication factor

View File

@ -23,7 +23,7 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
/**
* Snapshot related operations

View File

@ -25,7 +25,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.IOUtils;
/**

View File

@ -23,7 +23,6 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
/**
* Perform shell-like file tests

View File

@ -23,9 +23,9 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
/**
* Unix touch like commands

View File

@ -23,6 +23,7 @@ public class TestDelegationTokenRenewer {
@SuppressWarnings("rawtypes")
static class TestToken extends Token {
public volatile int renewCount = 0;
public volatile boolean cancelled = false;
@Override
public long renew(Configuration conf) {
@ -33,6 +34,11 @@ public class TestDelegationTokenRenewer {
}
return renewCount;
}
@Override
public void cancel(Configuration conf) {
cancelled = true;
}
}
static class TestFileSystem extends FileSystem implements
@ -123,27 +129,12 @@ public class TestDelegationTokenRenewer {
}
@Test
public void testAddRenewAction() throws IOException, InterruptedException {
public void testAddRemoveRenewAction() throws IOException,
InterruptedException {
TestFileSystem tfs = new TestFileSystem();
renewer.addRenewAction(tfs);
for (int i = 0; i < 10; i++) {
Thread.sleep(RENEW_CYCLE);
if (tfs.testToken.renewCount > 0) {
return;
}
}
assertTrue("Token not renewed even after 10 seconds",
(tfs.testToken.renewCount > 0));
}
@Test
public void testRemoveRenewAction() throws IOException, InterruptedException {
TestFileSystem tfs = new TestFileSystem();
renewer.addRenewAction(tfs);
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 60; i++) {
Thread.sleep(RENEW_CYCLE);
if (tfs.testToken.renewCount > 0) {
renewer.removeRenewAction(tfs);
@ -151,9 +142,9 @@ public class TestDelegationTokenRenewer {
}
}
assertTrue("Token not renewed even once",
assertTrue("Token not renewed even after 1 minute",
(tfs.testToken.renewCount > 0));
assertTrue("Token not removed",
(tfs.testToken.renewCount < MAX_RENEWALS));
assertTrue("Token not removed", (tfs.testToken.renewCount < MAX_RENEWALS));
assertTrue("Token not cancelled", tfs.testToken.cancelled);
}
}

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* This test class checks basic operations with {@link HarFileSystem} including
* various initialization cases, getters, and modification methods.
*
* NB: to run this test from an IDE make sure the folder
* "hadoop-common-project/hadoop-common/src/main/resources/" is added as a
* source path. This will allow the system to pick up the "core-default.xml" and
* "META-INF/services/..." resources from the class-path in the runtime.
*/
public class TestHarFileSystemBasics {
private static final String ROOT_PATH = System.getProperty("test.build.data",
"build/test/data");
private static final Path rootPath = new Path(
new File(ROOT_PATH).getAbsolutePath() + "/localfs");
// NB: .har suffix is necessary
private static final Path harPath = new Path(rootPath, "path1/path2/my.har");
private FileSystem localFileSystem;
private HarFileSystem harFileSystem;
private Configuration conf;
/*
* creates and returns fully initialized HarFileSystem
*/
private HarFileSystem createHarFileSysten(final Configuration conf)
throws Exception {
localFileSystem = FileSystem.getLocal(conf);
localFileSystem.initialize(new URI("file:///"), conf);
localFileSystem.mkdirs(rootPath);
localFileSystem.mkdirs(harPath);
final Path indexPath = new Path(harPath, "_index");
final Path masterIndexPath = new Path(harPath, "_masterindex");
localFileSystem.createNewFile(indexPath);
assertTrue(localFileSystem.exists(indexPath));
localFileSystem.createNewFile(masterIndexPath);
assertTrue(localFileSystem.exists(masterIndexPath));
writeVersionToMasterIndexImpl(HarFileSystem.VERSION);
final HarFileSystem harFileSystem = new HarFileSystem(localFileSystem);
final URI uri = new URI("har://" + harPath.toString());
harFileSystem.initialize(uri, conf);
return harFileSystem;
}
private void writeVersionToMasterIndexImpl(int version) throws IOException {
final Path masterIndexPath = new Path(harPath, "_masterindex");
// write Har version into the master index:
final FSDataOutputStream fsdos = localFileSystem.create(masterIndexPath);
try {
String versionString = version + "\n";
fsdos.write(versionString.getBytes("UTF-8"));
fsdos.flush();
} finally {
fsdos.close();
}
}
@Before
public void before() throws Exception {
final File rootDirIoFile = new File(rootPath.toUri().getPath());
rootDirIoFile.mkdirs();
if (!rootDirIoFile.exists()) {
throw new IOException("Failed to create temp directory ["
+ rootDirIoFile.getAbsolutePath() + "]");
}
// create Har to test:
conf = new Configuration();
harFileSystem = createHarFileSysten(conf);
}
@After
public void after() throws Exception {
// close Har FS:
final FileSystem harFS = harFileSystem;
if (harFS != null) {
harFS.close();
harFileSystem = null;
}
// cleanup: delete all the temporary files:
final File rootDirIoFile = new File(rootPath.toUri().getPath());
if (rootDirIoFile.exists()) {
FileUtil.fullyDelete(rootDirIoFile);
}
if (rootDirIoFile.exists()) {
throw new IOException("Failed to delete temp directory ["
+ rootDirIoFile.getAbsolutePath() + "]");
}
}
// ======== Positive tests:
@Test
public void testPositiveHarFileSystemBasics() throws Exception {
// check Har version:
assertEquals(HarFileSystem.VERSION, harFileSystem.getHarVersion());
// check Har URI:
final URI harUri = harFileSystem.getUri();
assertEquals(harPath.toUri().getPath(), harUri.getPath());
assertEquals("har", harUri.getScheme());
// check Har home path:
final Path homePath = harFileSystem.getHomeDirectory();
assertEquals(harPath.toUri().getPath(), homePath.toUri().getPath());
// check working directory:
final Path workDirPath0 = harFileSystem.getWorkingDirectory();
assertEquals(homePath, workDirPath0);
// check that its impossible to reset the working directory
// (#setWorkingDirectory should have no effect):
harFileSystem.setWorkingDirectory(new Path("/foo/bar"));
assertEquals(workDirPath0, harFileSystem.getWorkingDirectory());
}
@Test
public void testPositiveNewHarFsOnTheSameUnderlyingFs() throws Exception {
// Init 2nd har file system on the same underlying FS, so the
// metadata gets reused:
final HarFileSystem hfs = new HarFileSystem(localFileSystem);
final URI uri = new URI("har://" + harPath.toString());
hfs.initialize(uri, new Configuration());
// the metadata should be reused from cache:
assertTrue(hfs.getMetadata() == harFileSystem.getMetadata());
}
@Test
public void testPositiveInitWithoutUnderlyingFS() throws Exception {
// Init HarFS with no constructor arg, so that the underlying FS object
// is created on demand or got from cache in #initialize() method.
final HarFileSystem hfs = new HarFileSystem();
final URI uri = new URI("har://" + harPath.toString());
hfs.initialize(uri, new Configuration());
}
// ========== Negative:
@Test
public void testNegativeInitWithoutIndex() throws Exception {
// delete the index file:
final Path indexPath = new Path(harPath, "_index");
localFileSystem.delete(indexPath, false);
// now init the HarFs:
final HarFileSystem hfs = new HarFileSystem(localFileSystem);
final URI uri = new URI("har://" + harPath.toString());
try {
hfs.initialize(uri, new Configuration());
Assert.fail("Exception expected.");
} catch (IOException ioe) {
// ok, expected.
}
}
@Test
public void testNegativeGetHarVersionOnNotInitializedFS() throws Exception {
final HarFileSystem hfs = new HarFileSystem(localFileSystem);
try {
int version = hfs.getHarVersion();
Assert.fail("Exception expected, but got a Har version " + version + ".");
} catch (IOException ioe) {
// ok, expected.
}
}
@Test
public void testNegativeInitWithAnUnsupportedVersion() throws Exception {
// NB: should wait at least 1 second to ensure the timestamp of the master
// index will change upon the writing, because Linux seems to update the
// file modification
// time with 1 second accuracy:
Thread.sleep(1000);
// write an unsupported version:
writeVersionToMasterIndexImpl(7777);
// init the Har:
final HarFileSystem hfs = new HarFileSystem(localFileSystem);
// the metadata should *not* be reused from cache:
assertFalse(hfs.getMetadata() == harFileSystem.getMetadata());
final URI uri = new URI("har://" + harPath.toString());
try {
hfs.initialize(uri, new Configuration());
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
}
@Test
public void testNegativeHarFsModifications() throws Exception {
// all the modification methods of HarFS must lead to IOE.
final Path fooPath = new Path(rootPath, "foo/bar");
localFileSystem.createNewFile(fooPath);
try {
harFileSystem.create(fooPath, new FsPermission("+rwx"), true, 1024,
(short) 88, 1024, null);
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.setReplication(fooPath, (short) 55);
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.delete(fooPath, true);
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.mkdirs(fooPath, new FsPermission("+rwx"));
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
final Path indexPath = new Path(harPath, "_index");
try {
harFileSystem.copyFromLocalFile(false, indexPath, fooPath);
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.startLocalOutput(fooPath, indexPath);
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.completeLocalOutput(fooPath, indexPath);
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.setOwner(fooPath, "user", "group");
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
try {
harFileSystem.setPermission(fooPath, new FsPermission("+x"));
Assert.fail("IOException expected.");
} catch (IOException ioe) {
// ok, expected.
}
}
}

View File

@ -22,6 +22,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
@ -32,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
/** This test LocalDirAllocator works correctly;
* Every test case uses different buffer dirs to
@ -318,7 +321,7 @@ public class TestLocalDirAllocator {
*/
@Test
public void testNoSideEffects() throws IOException {
if (isWindows) return;
assumeTrue(!isWindows);
String dir = buildBufferDir(ROOT, 0);
try {
conf.set(CONTEXT, dir);
@ -339,8 +342,7 @@ public class TestLocalDirAllocator {
*/
@Test
public void testGetLocalPathToRead() throws IOException {
if (isWindows)
return;
assumeTrue(!isWindows);
String dir = buildBufferDir(ROOT, 0);
try {
conf.set(CONTEXT, dir);
@ -354,7 +356,60 @@ public class TestLocalDirAllocator {
Shell.execCommand(new String[] { "chmod", "u+w", BUFFER_DIR_ROOT });
rmBufferDirs();
}
}
/**
* Test that {@link LocalDirAllocator#getAllLocalPathsToRead(String, Configuration)}
* returns correct filenames and "file" schema.
*
* @throws IOException
*/
@Test
public void testGetAllLocalPathsToRead() throws IOException {
assumeTrue(!isWindows);
String dir0 = buildBufferDir(ROOT, 0);
String dir1 = buildBufferDir(ROOT, 1);
try {
conf.set(CONTEXT, dir0 + "," + dir1);
assertTrue(localFs.mkdirs(new Path(dir0)));
assertTrue(localFs.mkdirs(new Path(dir1)));
localFs.create(new Path(dir0 + Path.SEPARATOR + FILENAME));
localFs.create(new Path(dir1 + Path.SEPARATOR + FILENAME));
// check both the paths are returned as paths to read:
final Iterable<Path> pathIterable = dirAllocator.getAllLocalPathsToRead(FILENAME, conf);
int count = 0;
for (final Path p: pathIterable) {
count++;
assertEquals(FILENAME, p.getName());
assertEquals("file", p.getFileSystem(conf).getUri().getScheme());
}
assertEquals(2, count);
// test #next() while no element to iterate any more:
try {
Path p = pathIterable.iterator().next();
assertFalse("NoSuchElementException must be thrown, but returned ["+p
+"] instead.", true); // exception expected
} catch (NoSuchElementException nsee) {
// okay
}
// test modification not allowed:
final Iterable<Path> pathIterable2 = dirAllocator.getAllLocalPathsToRead(FILENAME, conf);
final Iterator<Path> it = pathIterable2.iterator();
try {
it.remove();
assertFalse(true); // exception expected
} catch (UnsupportedOperationException uoe) {
// okay
}
} finally {
Shell.execCommand(new String[] { "chmod", "u+w", BUFFER_DIR_ROOT });
rmBufferDirs();
}
}
@Test

View File

@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
import org.apache.hadoop.fs.PathIOException;
import org.junit.Test;
public class TestPathExceptions {

View File

@ -167,6 +167,12 @@ Trunk (Unreleased)
HDFS-4215. Remove locking from addToParent(..) since it is used in image
loading, and add INode.isFile(). (szetszwo)
HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
(suresh)
HDFS-4209. Clean up the addNode/addChild/addChildNoQuotaCheck methods in
FSDirectory and INodeDirectory. (szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -233,7 +233,7 @@ public class DFSUtil {
/**
* Given a list of path components returns a path as a UTF8 String
*/
public static String byteArray2String(byte[][] pathComponents) {
public static String byteArray2PathString(byte[][] pathComponents) {
if (pathComponents.length == 0)
return "";
if (pathComponents.length == 1 && pathComponents[0].length == 0) {
@ -254,6 +254,14 @@ public class DFSUtil {
return null;
}
/** Convert an object representing a path to a string. */
public static String path2String(final Object path) {
return path == null? null
: path instanceof String? (String)path
: path instanceof byte[][]? byteArray2PathString((byte[][])path)
: path.toString();
}
/**
* Splits the array of bytes into array of arrays of bytes
* on byte separator

View File

@ -638,10 +638,7 @@ class BlockReceiver implements Closeable {
responder.start(); // start thread to processes responses
}
/*
* Receive until the last packet.
*/
while (receivePacket() >= 0) {}
while (receivePacket() >= 0) { /* Receive until the last packet */ }
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
@ -724,7 +721,7 @@ class BlockReceiver implements Closeable {
static private long checksum2long(byte[] checksum) {
long crc = 0L;
for(int i=0; i<checksum.length; i++) {
crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
crc |= (0xffL&checksum[i])<<((checksum.length-i-1)*8);
}
return crc;
}
@ -783,24 +780,23 @@ class BlockReceiver implements Closeable {
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
}
private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
/**
* Processed responses from downstream datanodes in the pipeline
* and sends back replies to the originator.
*/
class PacketResponder implements Runnable, Closeable {
/** queue for packets waiting for ack */
/** queue for packets waiting for ack - synchronization using monitor lock */
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
/** the thread that spawns this responder */
private final Thread receiverThread = Thread.currentThread();
/** is this responder running? */
/** is this responder running? - synchronization using monitor lock */
private volatile boolean running = true;
/** input from the next downstream datanode */
private final DataInputStream downstreamIn;
/** output to upstream datanode/client */
private final DataOutputStream upstreamOut;
/** The type of this responder */
private final PacketResponderType type;
/** for log and error messages */
@ -812,8 +808,7 @@ class BlockReceiver implements Closeable {
}
PacketResponder(final DataOutputStream upstreamOut,
final DataInputStream downstreamIn,
final DatanodeInfo[] downstreams) {
final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) {
this.downstreamIn = downstreamIn;
this.upstreamOut = upstreamOut;
@ -830,23 +825,41 @@ class BlockReceiver implements Closeable {
this.myString = b.toString();
}
private boolean isRunning() {
return running && datanode.shouldRun;
}
/**
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
* @param offsetInBlock
*/
synchronized void enqueue(final long seqno,
final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime());
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
ackQueue.addLast(p);
notifyAll();
void enqueue(final long seqno, final boolean lastPacketInBlock,
final long offsetInBlock) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime());
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
synchronized(this) {
if (running) {
ackQueue.addLast(p);
notifyAll();
}
}
}
/** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
while (isRunning() && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
wait();
}
return isRunning() ? ackQueue.getFirst() : null;
}
/**
@ -854,7 +867,7 @@ class BlockReceiver implements Closeable {
*/
@Override
public synchronized void close() {
while (running && ackQueue.size() != 0 && datanode.shouldRun) {
while (isRunning() && ackQueue.size() != 0) {
try {
wait();
} catch (InterruptedException e) {
@ -877,147 +890,97 @@ class BlockReceiver implements Closeable {
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE
&& !mirrorError) {
// read an ack from downstream datanode
ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
seqno = ack.getSeqno();
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
// read an ack from downstream datanode
ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
wait();
seqno = ack.getSeqno();
}
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream
// nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos
- ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos
+ "ns.");
}
if (!running || !datanode.shouldRun) {
break;
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected="
+ expected + ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
}
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
lastPacketInBlock = pkt.lastPacketInBlock;
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
} catch (InterruptedException ine) {
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info(myString, ioe);
}
}
if (Thread.interrupted() || isInterrupted) {
/* The receiver thread cancelled this thread.
* We could also check any other status updates from the
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
* will detect that this datanode is bad, and rightly so.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
inAddr, myAddr, block.getNumBytes(),
"HDFS_WRITE", clientname, offset,
dnR.getStorageID(), block, endTime-startTime));
} else {
LOG.info("Received " + block + " size "
+ block.getNumBytes() + " from " + inAddr);
}
}
// construct my ack message
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
replies[0] = Status.SUCCESS;
replies[1] = Status.ERROR;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
: ack.getNumOfReplies();
replies = new Status[1+ackLen];
replies[0] = Status.SUCCESS;
for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info(myString, ioe);
}
PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
if (replyAck.isSuccess() &&
pkt.offsetInBlock > replicaInfo.getBytesAcked())
replicaInfo.setBytesAcked(pkt.offsetInBlock);
}
// send my ack back to upstream datanode
replyAck.write(upstreamOut);
upstreamOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ", replyAck=" + replyAck);
}
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
}
if (Thread.interrupted() || isInterrupted) {
/*
* The receiver thread cancelled this thread. We could also check
* any other status updates from the receiver thread (e.g. if it is
* ok to write to replyOut). It is prudent to not send any more
* status back to the client because this datanode has a problem.
* The upstream datanode will detect that this datanode is bad, and
* rightly so.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
if (lastPacketInBlock) {
// Finalize the block and close the block file
finalizeBlock(startTime);
}
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0));
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
@ -1043,6 +1006,66 @@ class BlockReceiver implements Closeable {
LOG.info(myString + " terminating");
}
/**
* Finalize the block and close the block file
* @param startTime time when BlockReceiver started receiving the block
*/
private void finalizeBlock(long startTime) throws IOException {
BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
: 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
dnR.getStorageID(), block, endTime - startTime));
} else {
LOG.info("Received " + block + " size " + block.getNumBytes()
+ " from " + inAddr);
}
}
/**
* @param ack Ack received from downstream
* @param seqno sequence number of ack to be sent upstream
* @param totalAckTimeNanos total ack time including all the downstream
* nodes
* @param offsetInBlock offset in block for the data in packet
*/
private void sendAckUpstream(PipelineAck ack, long seqno,
long totalAckTimeNanos, long offsetInBlock) throws IOException {
Status[] replies = null;
if (mirrorError) { // ack read error
replies = MIRROR_ERROR_STATUS;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
.getNumOfReplies();
replies = new Status[1 + ackLen];
replies[0] = Status.SUCCESS;
for (int i = 0; i < ackLen; i++) {
replies[i + 1] = ack.getReply(i);
}
}
PipelineAck replyAck = new PipelineAck(seqno, replies,
totalAckTimeNanos);
if (replyAck.isSuccess()
&& offsetInBlock > replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(offsetInBlock);
}
// send my ack back to upstream datanode
replyAck.write(upstreamOut);
upstreamOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ", replyAck=" + replyAck);
}
}
/**
* Remove a packet from the head of the ack queue
*

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
@ -88,7 +89,6 @@ public class FSDirectory implements Closeable {
FSImage fsImage;
private final FSNamesystem namesystem;
private volatile boolean ready = false;
private static final long UNKNOWN_DISK_SPACE = -1;
private final int maxComponentLength;
private final int maxDirItems;
private final int lsLimit; // max list limit
@ -263,13 +263,14 @@ public class FSDirectory implements Closeable {
permissions,replication,
preferredBlockSize, modTime, clientName,
clientMachine, clientNode);
boolean added = false;
writeLock();
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
added = addINode(path, newNode);
} finally {
writeUnlock();
}
if (newNode == null) {
if (!added) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
return null;
}
@ -289,7 +290,7 @@ public class FSDirectory implements Closeable {
boolean underConstruction,
String clientName,
String clientMachine) {
INode newNode;
final INode newNode;
assert hasWriteLock();
if (underConstruction) {
newNode = new INodeFileUnderConstruction(
@ -302,16 +303,17 @@ public class FSDirectory implements Closeable {
}
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
if (addINode(path, newNode)) {
return newNode;
}
} catch (IOException e) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.unprotectedAddFile: exception when add " + path
+ " to the file system", e);
}
return null;
}
return newNode;
return null;
}
/**
@ -559,12 +561,12 @@ public class FSDirectory implements Closeable {
// Ensure dst has quota to accommodate rename
verifyQuotaForRename(srcInodes, dstInodes);
INode dstChild = null;
boolean added = false;
INode srcChild = null;
String srcChildName = null;
try {
// remove src
srcChild = removeChild(srcInodesInPath, srcInodes.length-1);
srcChild = removeLastINode(srcInodesInPath);
if (srcChild == null) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ "failed to rename " + src + " to " + dst
@ -575,9 +577,8 @@ public class FSDirectory implements Closeable {
srcChild.setLocalName(dstComponents[dstInodes.length-1]);
// add src to the destination
dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length-1,
srcChild, UNKNOWN_DISK_SPACE);
if (dstChild != null) {
added = addLastINodeNoQuotaCheck(dstInodesInPath, srcChild);
if (added) {
srcChild = null;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
@ -589,11 +590,10 @@ public class FSDirectory implements Closeable {
return true;
}
} finally {
if (dstChild == null && srcChild != null) {
if (!added && srcChild != null) {
// put it back
srcChild.setLocalName(srcChildName);
addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, srcChild,
UNKNOWN_DISK_SPACE);
addLastINodeNoQuotaCheck(srcInodesInPath, srcChild);
}
}
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@ -724,7 +724,7 @@ public class FSDirectory implements Closeable {
// Ensure dst has quota to accommodate rename
verifyQuotaForRename(srcInodes, dstInodes);
INode removedSrc = removeChild(srcInodesInPath, srcInodes.length - 1);
INode removedSrc = removeLastINode(srcInodesInPath);
if (removedSrc == null) {
error = "Failed to rename " + src + " to " + dst
+ " because the source can not be removed";
@ -737,18 +737,13 @@ public class FSDirectory implements Closeable {
INode removedDst = null;
try {
if (dstInode != null) { // dst exists remove it
removedDst = removeChild(dstInodesInPath, dstInodes.length - 1);
removedDst = removeLastINode(dstInodesInPath);
dstChildName = removedDst.getLocalName();
}
INode dstChild = null;
removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
// add src as dst to complete rename
dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1,
removedSrc, UNKNOWN_DISK_SPACE);
int filesDeleted = 0;
if (dstChild != null) {
if (addLastINodeNoQuotaCheck(dstInodesInPath, removedSrc)) {
removedSrc = null;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
@ -759,6 +754,7 @@ public class FSDirectory implements Closeable {
dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
// Collect the blocks and remove the lease for previous dst
int filesDeleted = 0;
if (removedDst != null) {
INode rmdst = removedDst;
removedDst = null;
@ -772,14 +768,12 @@ public class FSDirectory implements Closeable {
if (removedSrc != null) {
// Rename failed - restore src
removedSrc.setLocalName(srcChildName);
addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, removedSrc,
UNKNOWN_DISK_SPACE);
addLastINodeNoQuotaCheck(srcInodesInPath, removedSrc);
}
if (removedDst != null) {
// Rename failed - restore dst
removedDst.setLocalName(dstChildName);
addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, removedDst,
UNKNOWN_DISK_SPACE);
addLastINodeNoQuotaCheck(dstInodesInPath, removedDst);
}
}
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@ -1129,14 +1123,13 @@ public class FSDirectory implements Closeable {
final INode[] inodes = inodesInPath.getINodes();
INode targetNode = inodes[inodes.length-1];
int pos = inodes.length - 1;
// Remove the node from the namespace
targetNode = removeChild(inodesInPath, pos);
targetNode = removeLastINode(inodesInPath);
if (targetNode == null) {
return 0;
}
// set the parent's modification time
inodes[pos-1].setModificationTime(mtime);
inodes[inodes.length - 2].setModificationTime(mtime);
int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
@ -1183,7 +1176,7 @@ public class FSDirectory implements Closeable {
}
//add the new node
rootDir.addNode(path, newnode);
rootDir.addINode(path, newnode);
}
/**
@ -1332,22 +1325,6 @@ public class FSDirectory implements Closeable {
}
}
/**
* Get the parent node of path.
*
* @param path the path to explore
* @return its parent node
*/
INodeDirectory getParent(byte[][] path)
throws FileNotFoundException, UnresolvedLinkException {
readLock();
try {
return rootDir.getParent(path);
} finally {
readUnlock();
}
}
/**
* Check whether the filepath could be created
* @throws SnapshotAccessControlException if path is in RO snapshot
@ -1405,20 +1382,17 @@ public class FSDirectory implements Closeable {
* @param nsDelta the delta change of namespace
* @param dsDelta the delta change of diskspace
* @throws QuotaExceededException if the new count violates any quota limit
* @throws FileNotFound if path does not exist.
* @throws FileNotFoundException if path does not exist.
*/
void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
throws QuotaExceededException,
FileNotFoundException,
UnresolvedLinkException {
throws QuotaExceededException, FileNotFoundException, UnresolvedLinkException {
writeLock();
try {
final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
final INode[] inodes = inodesInPath.getINodes();
int len = inodes.length;
if (inodes[len - 1] == null) {
throw new FileNotFoundException(path +
" does not exist under rootDir.");
throw new FileNotFoundException("Path not found: " + path);
}
updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
} finally {
@ -1647,15 +1621,17 @@ public class FSDirectory implements Closeable {
long timestamp) throws QuotaExceededException {
assert hasWriteLock();
final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
final INode inode = addChild(inodesInPath, pos, dir, -1, true);
inodesInPath.setINode(pos, inode);
if (addChild(inodesInPath, pos, dir, true)) {
inodesInPath.setINode(pos, dir);
}
}
/** Add a node child to the namespace. The full path name of the node is src.
* childDiskspace should be -1, if unknown.
/**
* Add the given child to the namespace.
* @param src The full path name of the child node.
* @throw QuotaExceededException is thrown if it violates quota limit
*/
private <T extends INode> T addNode(String src, T child, long childDiskspace
private boolean addINode(String src, INode child
) throws QuotaExceededException, UnresolvedLinkException {
byte[][] components = INode.getPathComponents(src);
byte[] path = components[components.length-1];
@ -1665,8 +1641,7 @@ public class FSDirectory implements Closeable {
try {
INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
components.length, false);
return addChild(inodesInPath, inodesInPath.getINodes().length-1, child,
childDiskspace, true);
return addLastINode(inodesInPath, child, true);
} finally {
writeUnlock();
}
@ -1790,14 +1765,24 @@ public class FSDirectory implements Closeable {
}
}
/**
* The same as {@link #addChild(INodesInPath, int, INode, boolean)}
* with pos = length - 1.
*/
private boolean addLastINode(INodesInPath inodesInPath,
INode inode, boolean checkQuota) throws QuotaExceededException {
final int pos = inodesInPath.getINodes().length - 1;
return addChild(inodesInPath, pos, inode, checkQuota);
}
/** Add a node child to the inodes at index pos.
* Its ancestors are stored at [0, pos-1].
* @return the added node.
* @return false if the child with this name already exists;
* otherwise return true;
* @throw QuotaExceededException is thrown if it violates quota limit
*/
private <T extends INode> T addChild(INodesInPath inodesInPath, int pos,
T child, long childDiskspace,
boolean checkQuota) throws QuotaExceededException {
private boolean addChild(INodesInPath inodesInPath, int pos,
INode child, boolean checkQuota) throws QuotaExceededException {
final INode[] inodes = inodesInPath.getINodes();
// The filesystem limits are not really quotas, so this check may appear
// odd. It's because a rename operation deletes the src, tries to add
@ -1811,38 +1796,34 @@ public class FSDirectory implements Closeable {
INode.DirCounts counts = new INode.DirCounts();
child.spaceConsumedInTree(counts);
if (childDiskspace < 0) {
childDiskspace = counts.getDsCount();
}
updateCount(inodesInPath, pos, counts.getNsCount(), childDiskspace, checkQuota);
updateCount(inodesInPath, pos, counts.getNsCount(), counts.getDsCount(), checkQuota);
if (inodes[pos-1] == null) {
throw new NullPointerException("Panic: parent does not exist");
}
final T addedNode = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
if (addedNode == null) {
updateCount(inodesInPath, pos, -counts.getNsCount(), -childDiskspace, true);
final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
if (!added) {
updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
}
return addedNode;
return added;
}
private <T extends INode> T addChildNoQuotaCheck(INodesInPath inodesInPath,
int pos, T child, long childDiskspace) {
T inode = null;
private boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
try {
inode = addChild(inodesInPath, pos, child, childDiskspace, false);
return addLastINode(inodesInPath, i, false);
} catch (QuotaExceededException e) {
NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
}
return inode;
return false;
}
/** Remove an inode at index pos from the namespace.
* Its ancestors are stored at [0, pos-1].
/**
* Remove the last inode in the path from the namespace.
* Count of each ancestor with quota is also updated.
* Return the removed node; null if the removal fails.
* @return the removed node; null if the removal fails.
*/
private INode removeChild(final INodesInPath inodesInPath, int pos) {
private INode removeLastINode(final INodesInPath inodesInPath) {
final INode[] inodes = inodesInPath.getINodes();
final int pos = inodes.length - 1;
INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
if (removedNode != null) {
INode.DirCounts counts = new INode.DirCounts();
@ -1961,15 +1942,17 @@ public class FSDirectory implements Closeable {
* See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* Sets quota for for a directory.
* @returns INodeDirectory if any of the quotas have changed. null other wise.
* @throws FileNotFoundException if the path does not exist or is a file
* @throws FileNotFoundException if the path does not exist.
* @throws PathIsNotDirectoryException if the path is not a directory.
* @throws QuotaExceededException if the directory tree size is
* greater than the given quota
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException {
throws FileNotFoundException, PathIsNotDirectoryException,
QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
assert hasWriteLock();
// sanity check
if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET &&
@ -1984,15 +1967,10 @@ public class FSDirectory implements Closeable {
String srcs = normalizePath(src);
final INode[] inodes = rootDir.getMutableINodesInPath(srcs, true)
.getINodes();
INode targetNode = inodes[inodes.length-1];
if (targetNode == null) {
throw new FileNotFoundException("Directory does not exist: " + srcs);
} else if (!targetNode.isDirectory()) {
throw new FileNotFoundException("Cannot set quota on a file: " + srcs);
} else if (targetNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
INodeDirectory dirNode = INodeDirectory.valueOf(inodes[inodes.length-1], srcs);
if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
throw new IllegalArgumentException("Cannot clear namespace quota on root.");
} else { // a directory inode
INodeDirectory dirNode = (INodeDirectory)targetNode;
long oldNsQuota = dirNode.getNsQuota();
long oldDsQuota = dirNode.getDsQuota();
if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
@ -2026,14 +2004,14 @@ public class FSDirectory implements Closeable {
}
/**
* See {@link ClientProtocol#setQuota(String, long, long)} for the
* contract.
* See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* @throws SnapshotAccessControlException if path is in RO snapshot
* @see #unprotectedSetQuota(String, long, long)
*/
void setQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException {
void setQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, PathIsNotDirectoryException,
QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
writeLock();
try {
INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
@ -2242,7 +2220,7 @@ public class FSDirectory implements Closeable {
throws UnresolvedLinkException, QuotaExceededException {
assert hasWriteLock();
final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm);
return addNode(path, symlink, UNKNOWN_DISK_SPACE);
return addINode(path, symlink)? symlink: null;
}
/**

View File

@ -288,7 +288,7 @@ class FSImageFormat {
}
// check if the new inode belongs to the same parent
if(!isParent(pathComponents, parentPath)) {
parentINode = fsDir.getParent(pathComponents);
parentINode = fsDir.rootDir.getParent(pathComponents);
parentPath = getParent(pathComponents);
}
@ -305,7 +305,7 @@ class FSImageFormat {
*/
void addToParent(INodeDirectory parent, INode child) {
// NOTE: This does not update space counts for parents
if (parent.addChild(child, false) == null) {
if (!parent.addChild(child, false)) {
return;
}
namesystem.dir.cacheName(child);

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
@ -43,13 +43,14 @@ import com.google.common.annotations.VisibleForTesting;
*/
public class INodeDirectory extends INode {
/** Cast INode to INodeDirectory. */
public static INodeDirectory valueOf(INode inode, String path
) throws IOException {
public static INodeDirectory valueOf(INode inode, Object path
) throws FileNotFoundException, PathIsNotDirectoryException {
if (inode == null) {
throw new IOException("Directory does not exist: " + path);
throw new FileNotFoundException("Directory does not exist: "
+ DFSUtil.path2String(path));
}
if (!inode.isDirectory()) {
throw new IOException("Path is not a directory: " + path);
throw new PathIsNotDirectoryException(DFSUtil.path2String(path));
}
return (INodeDirectory)inode;
}
@ -357,16 +358,17 @@ public class INodeDirectory extends INode {
* @param setModTime set modification time for the parent node
* not needed when replaying the addition and
* the parent already has the proper mod time
* @return null if the child with this name already exists;
* node, otherwise
* @return false if the child with this name already exists;
* otherwise, return true;
*/
public <T extends INode> T addChild(final T node, boolean setModTime) {
public boolean addChild(final INode node, final boolean setModTime) {
if (children == null) {
children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
}
final int low = searchChildren(node);
if(low >= 0)
return null;
if (low >= 0) {
return false;
}
node.parent = this;
children.add(-low - 1, node);
// update modification time of the parent directory
@ -375,7 +377,7 @@ public class INodeDirectory extends INode {
if (node.getGroupName() == null) {
node.setGroup(getGroupName());
}
return node;
return true;
}
/**
@ -384,53 +386,32 @@ public class INodeDirectory extends INode {
*
* @param path file path
* @param newNode INode to be added
* @return null if the node already exists; inserted INode, otherwise
* @return false if the node already exists; otherwise, return true;
* @throws FileNotFoundException if parent does not exist or
* @throws UnresolvedLinkException if any path component is a symbolic link
* is not a directory.
*/
<T extends INode> T addNode(String path, T newNode
) throws FileNotFoundException, UnresolvedLinkException {
boolean addINode(String path, INode newNode
) throws FileNotFoundException, PathIsNotDirectoryException,
UnresolvedLinkException {
byte[][] pathComponents = getPathComponents(path);
return addToParent(pathComponents, newNode, true) == null? null: newNode;
}
INodeDirectory getParent(byte[][] pathComponents
) throws FileNotFoundException, UnresolvedLinkException {
if (pathComponents.length < 2) // add root
return null;
// Gets the parent INode
INodesInPath inodes = getExistingPathINodes(pathComponents, 2, false);
INode inode = inodes.inodes[0];
if (inode == null) {
throw new FileNotFoundException("Parent path does not exist: "+
DFSUtil.byteArray2String(pathComponents));
}
if (!inode.isDirectory()) {
throw new FileNotFoundException("Parent path is not a directory: "+
DFSUtil.byteArray2String(pathComponents));
}
return (INodeDirectory)inode;
}
/**
* Add new inode
* Optimized version of addNode()
*
* @return parent INode if new inode is inserted
* or null if it already exists.
* @throws FileNotFoundException if parent does not exist or
* is not a directory.
*/
INodeDirectory addToParent(byte[][] pathComponents, INode newNode,
boolean propagateModTime) throws FileNotFoundException, UnresolvedLinkException {
if (pathComponents.length < 2) { // add root
return null;
return false;
}
newNode.setLocalName(pathComponents[pathComponents.length - 1]);
// insert into the parent children list
INodeDirectory parent = getParent(pathComponents);
return parent.addChild(newNode, propagateModTime) == null? null: parent;
return parent.addChild(newNode, true);
}
INodeDirectory getParent(byte[][] pathComponents
) throws FileNotFoundException, PathIsNotDirectoryException,
UnresolvedLinkException {
if (pathComponents.length < 2) // add root
return null;
// Gets the parent INode
INodesInPath inodes = getExistingPathINodes(pathComponents, 2, false);
return INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
}
@Override

View File

@ -33,7 +33,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@InterfaceAudience.Private
public class INodeFile extends INode implements BlockCollection {
/** Cast INode to INodeFile. */
public static INodeFile valueOf(INode inode, String path) throws IOException {
public static INodeFile valueOf(INode inode, String path
) throws FileNotFoundException {
if (inode == null) {
throw new FileNotFoundException("File does not exist: " + path);
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
@ -36,10 +37,10 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
public class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
/** Cast INode to INodeFileUnderConstruction. */
public static INodeFileUnderConstruction valueOf(INode inode, String path
) throws IOException {
) throws FileNotFoundException {
final INodeFile file = INodeFile.valueOf(inode, path);
if (!file.isUnderConstruction()) {
throw new IOException("File is not under construction: " + path);
throw new FileNotFoundException("File is not under construction: " + path);
}
return (INodeFileUnderConstruction)file;
}

View File

@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@ -239,8 +240,8 @@ public class TestINodeFile {
try {
INodeDirectory.valueOf(from, path);
fail();
} catch(IOException ioe) {
assertTrue(ioe.getMessage().contains("Directory does not exist"));
} catch(FileNotFoundException e) {
assertTrue(e.getMessage().contains("Directory does not exist"));
}
}
@ -264,8 +265,7 @@ public class TestINodeFile {
try {
INodeDirectory.valueOf(from, path);
fail();
} catch(IOException ioe) {
assertTrue(ioe.getMessage().contains("Path is not a directory"));
} catch(PathIsNotDirectoryException e) {
}
}
@ -286,8 +286,7 @@ public class TestINodeFile {
try {
INodeDirectory.valueOf(from, path);
fail();
} catch(IOException ioe) {
assertTrue(ioe.getMessage().contains("Path is not a directory"));
} catch(PathIsNotDirectoryException e) {
}
}

View File

@ -15488,7 +15488,7 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Cannot set quota on a file: /test/file1</expected-output>
<expected-output>setQuota: `/test/file1': Is not a directory</expected-output>
</comparator>
</comparators>
</test>

View File

@ -586,6 +586,9 @@ Release 0.23.6 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-4811. JobHistoryServer should show when it was started in WebUI
About page (Ravi Prakash via jlowe)
OPTIMIZATIONS
BUG FIXES
@ -593,6 +596,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4802. Takes a long time to load the task list on the AM for
large jobs (Ravi Prakash via bobby)
MAPREDUCE-4764. repair TestBinaryTokenFile (Ivan A. Veselovsky via bobby)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -51,6 +51,8 @@ public class JobHistoryServer extends CompositeService {
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
public static final long historyServerTimeStamp = System.currentTimeMillis();
private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
private HistoryContext historyContext;
private HistoryClientService clientService;

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
@ -47,7 +49,9 @@ public class HsAboutPage extends HsView {
@Override protected Class<? extends SubView> content() {
HistoryInfo info = new HistoryInfo();
info("History Server").
_("BuildVersion", info.getHadoopBuildVersion() + " on " + info.getHadoopVersionBuiltOn());
_("BuildVersion", info.getHadoopBuildVersion()
+ " on " + info.getHadoopVersionBuiltOn()).
_("History Server started on", Times.format(info.getStartedOn()));
return InfoBlock.class;
}
}

View File

@ -22,17 +22,20 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.util.VersionInfo;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class HistoryInfo {
protected long startedOn;
protected String hadoopVersion;
protected String hadoopBuildVersion;
protected String hadoopVersionBuiltOn;
public HistoryInfo() {
this.startedOn = JobHistoryServer.historyServerTimeStamp;
this.hadoopVersion = VersionInfo.getVersion();
this.hadoopBuildVersion = VersionInfo.getBuildVersion();
this.hadoopVersionBuiltOn = VersionInfo.getDate();
@ -50,4 +53,8 @@ public class HistoryInfo {
return this.hadoopVersionBuiltOn;
}
public long getStartedOn() {
return this.startedOn;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.Clock;
@ -344,21 +345,24 @@ public class TestHsWebServices extends JerseyTest {
}
public void verifyHsInfoGeneric(String hadoopVersionBuiltOn,
String hadoopBuildVersion, String hadoopVersion) {
String hadoopBuildVersion, String hadoopVersion, long startedon) {
WebServicesTestUtils.checkStringMatch("hadoopVersionBuiltOn",
VersionInfo.getDate(), hadoopVersionBuiltOn);
WebServicesTestUtils.checkStringMatch("hadoopBuildVersion",
VersionInfo.getBuildVersion(), hadoopBuildVersion);
WebServicesTestUtils.checkStringMatch("hadoopVersion",
VersionInfo.getVersion(), hadoopVersion);
assertEquals("startedOn doesn't match: ",
JobHistoryServer.historyServerTimeStamp, startedon);
}
public void verifyHSInfo(JSONObject info, TestAppContext ctx)
throws JSONException {
assertEquals("incorrect number of elements", 3, info.length());
assertEquals("incorrect number of elements", 4, info.length());
verifyHsInfoGeneric(info.getString("hadoopVersionBuiltOn"),
info.getString("hadoopBuildVersion"), info.getString("hadoopVersion"));
info.getString("hadoopBuildVersion"), info.getString("hadoopVersion"),
info.getLong("startedOn"));
}
public void verifyHSInfoXML(String xml, TestAppContext ctx)
@ -376,7 +380,8 @@ public class TestHsWebServices extends JerseyTest {
verifyHsInfoGeneric(
WebServicesTestUtils.getXmlString(element, "hadoopVersionBuiltOn"),
WebServicesTestUtils.getXmlString(element, "hadoopBuildVersion"),
WebServicesTestUtils.getXmlString(element, "hadoopVersion"));
WebServicesTestUtils.getXmlString(element, "hadoopVersion"),
WebServicesTestUtils.getXmlLong(element, "startedOn"));
}
}

View File

@ -35,26 +35,28 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@SuppressWarnings("deprecation")
@Ignore
public class TestBinaryTokenFile {
private static final String KEY_SECURITY_TOKEN_FILE_NAME = "key-security-token-file";
private static final String DELEGATION_TOKEN_KEY = "Hdfs";
// my sleep class
static class MySleepMapper extends SleepJob.SleepMapper {
/**
@ -63,29 +65,65 @@ public class TestBinaryTokenFile {
@Override
public void map(IntWritable key, IntWritable value, Context context)
throws IOException, InterruptedException {
// get token storage and a key
Credentials ts = context.getCredentials();
Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
// get context token storage:
final Credentials contextCredentials = context.getCredentials();
if(dts.size() != 2) { // one job token and one delegation token
throw new RuntimeException("tokens are not available"); // fail the test
final Collection<Token<? extends TokenIdentifier>> contextTokenCollection = contextCredentials.getAllTokens();
for (Token<? extends TokenIdentifier> t : contextTokenCollection) {
System.out.println("Context token: [" + t + "]");
}
if (contextTokenCollection.size() != 2) { // one job token and one delegation token
// fail the test:
throw new RuntimeException("Exactly 2 tokens are expected in the contextTokenCollection: " +
"one job token and one delegation token, but was found " + contextTokenCollection.size() + " tokens.");
}
Token<? extends TokenIdentifier> dt = ts.getToken(new Text("Hdfs"));
final Token<? extends TokenIdentifier> dt = contextCredentials.getToken(new Text(DELEGATION_TOKEN_KEY));
if (dt == null) {
throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found in the job context.");
}
//Verify that dt is same as the token in the file
String tokenFile = context.getConfiguration().get(
"mapreduce.job.credentials.binary");
Credentials cred = new Credentials();
cred.readTokenStorageStream(new DataInputStream(new FileInputStream(
String tokenFile0 = context.getConfiguration().get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
if (tokenFile0 != null) {
throw new RuntimeException("Token file key ["+MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY+"] found in the configuration. It should have been removed from the configuration.");
}
final String tokenFile = context.getConfiguration().get(KEY_SECURITY_TOKEN_FILE_NAME);
if (tokenFile == null) {
throw new RuntimeException("Token file key ["+KEY_SECURITY_TOKEN_FILE_NAME+"] not found in the job configuration.");
}
final Credentials binaryCredentials = new Credentials();
binaryCredentials.readTokenStorageStream(new DataInputStream(new FileInputStream(
tokenFile)));
for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
if (!dt.equals(t)) {
throw new RuntimeException(
"Delegation token in job is not same as the token passed in file."
+ " tokenInFile=" + t + ", dt=" + dt);
}
final Collection<Token<? extends TokenIdentifier>> binaryTokenCollection = binaryCredentials.getAllTokens();
if (binaryTokenCollection.size() != 1) {
throw new RuntimeException("The token collection read from file ["+tokenFile+"] must have size = 1.");
}
final Token<? extends TokenIdentifier> binTok = binaryTokenCollection.iterator().next();
System.out.println("The token read from binary file: t = [" + binTok + "]");
// Verify that dt is same as the token in the file:
if (!dt.equals(binTok)) {
throw new RuntimeException(
"Delegation token in job is not same as the token passed in file:"
+ " tokenInFile=[" + binTok + "], dt=[" + dt + "].");
}
// Now test the user tokens.
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
// Print all the UGI tokens for diagnostic purposes:
final Collection<Token<? extends TokenIdentifier>> ugiTokenCollection = ugi.getTokens();
for (Token<? extends TokenIdentifier> t: ugiTokenCollection) {
System.out.println("UGI token: [" + t + "]");
}
final Token<? extends TokenIdentifier> ugiToken
= ugi.getCredentials().getToken(new Text(DELEGATION_TOKEN_KEY));
if (ugiToken == null) {
throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found among the UGI tokens.");
}
if (!ugiToken.equals(binTok)) {
throw new RuntimeException(
"UGI token is not same as the token passed in binary file:"
+ " tokenInBinFile=[" + binTok + "], ugiTok=[" + ugiToken + "].");
}
super.map(key, value, context);
@ -118,13 +156,20 @@ public class TestBinaryTokenFile {
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
job.getConfiguration());
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
cred2.addToken(new Text("Hdfs"), t);
cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
}
DataOutputStream os = new DataOutputStream(new FileOutputStream(
binaryTokenFileName.toString()));
cred2.writeTokenStorageToStream(os);
os.close();
job.getConfiguration().set("mapreduce.job.credentials.binary",
try {
cred2.writeTokenStorageToStream(os);
} finally {
os.close();
}
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
binaryTokenFileName.toString());
// NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config,
// so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration:
job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
binaryTokenFileName.toString());
} catch (IOException e) {
Assert.fail("Exception " + e);
@ -132,39 +177,53 @@ public class TestBinaryTokenFile {
}
}
private static MiniMRCluster mrCluster;
private static MiniMRYarnCluster mrCluster;
private static MiniDFSCluster dfsCluster;
private static final Path TEST_DIR =
new Path(System.getProperty("test.build.data","/tmp"));
private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary");
private static int numSlaves = 1;
private static JobConf jConf;
private static final int numSlaves = 1; // num of data nodes
private static final int noOfNMs = 1;
private static Path p1;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = new Configuration();
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
jConf = new JobConf(conf);
mrCluster = new MiniMRCluster(0, 0, numSlaves,
dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
jConf);
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
final Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG");
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkExitOnShutdown(true);
builder.numDataNodes(numSlaves);
builder.format(true);
builder.racks(null);
dfsCluster = builder.build();
mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs);
mrCluster.init(conf);
mrCluster.start();
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
p1 = new Path("file1");
p1 = fs.makeQualified(p1);
}
@AfterClass
public static void tearDown() throws Exception {
if(mrCluster != null)
mrCluster.shutdown();
mrCluster = null;
if(dfsCluster != null)
if(mrCluster != null) {
mrCluster.stop();
mrCluster = null;
}
if(dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
dfsCluster = null;
}
}
/**
@ -173,31 +232,24 @@ public class TestBinaryTokenFile {
*/
@Test
public void testBinaryTokenFile() throws IOException {
System.out.println("running dist job");
// make sure JT starts
jConf = mrCluster.createJobConf();
Configuration conf = mrCluster.getConfig();
// provide namenodes names for the job to get the delegation tokens for
String nnUri = dfsCluster.getURI(0).toString();
jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
// job tracker principla id..
jConf.set(JTConfig.JT_USER_NAME, "jt_id");
final String nnUri = dfsCluster.getURI(0).toString();
conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
// using argument to pass the file name
String[] args = {
final String[] args = {
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
res = ToolRunner.run(jConf, new MySleepJob(), args);
res = ToolRunner.run(conf, new MySleepJob(), args);
} catch (Exception e) {
System.out.println("Job failed with" + e.getLocalizedMessage());
System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0", res, 0);
assertEquals("dist job res is not 0:", 0, res);
}
}

View File

@ -27,8 +27,6 @@ import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@ -38,111 +36,117 @@ import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* test {@link HadoopArchives}
*/
public class TestHadoopArchives extends TestCase {
public class TestHadoopArchives {
public static final String HADOOP_ARCHIVES_JAR = JarFinder.getJar(HadoopArchives.class);
public static final String HADOOP_ARCHIVES_JAR = JarFinder
.getJar(HadoopArchives.class);
{
((Log4JLogger)LogFactory.getLog(org.apache.hadoop.security.Groups.class)
).getLogger().setLevel(Level.ERROR);
((Log4JLogger)org.apache.hadoop.ipc.Server.LOG
).getLogger().setLevel(Level.ERROR);
((Log4JLogger)org.apache.hadoop.util.AsyncDiskService.LOG
).getLogger().setLevel(Level.ERROR);
((Log4JLogger) LogFactory.getLog(org.apache.hadoop.security.Groups.class))
.getLogger().setLevel(Level.ERROR);
}
private static final String inputDir = "input";
private Path inputPath;
private MiniDFSCluster dfscluster;
private MiniMRCluster mapred;
private Configuration conf;
private FileSystem fs;
private Path archivePath;
static private Path createFile(Path dir, String filename, FileSystem fs
) throws IOException {
static private Path createFile(Path dir, String filename, FileSystem fs)
throws IOException {
final Path f = new Path(dir, filename);
final FSDataOutputStream out = fs.create(f);
final FSDataOutputStream out = fs.create(f);
out.write(filename.getBytes());
out.close();
return f;
}
protected void setUp() throws Exception {
super.setUp();
dfscluster = new MiniDFSCluster(new Configuration(), 2, true, null);
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.set(CapacitySchedulerConfiguration.PREFIX
+ CapacitySchedulerConfiguration.ROOT + "."
+ CapacitySchedulerConfiguration.QUEUES, "default");
conf.set(CapacitySchedulerConfiguration.PREFIX
+ CapacitySchedulerConfiguration.ROOT + ".default."
+ CapacitySchedulerConfiguration.CAPACITY, "100");
dfscluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true)
.build();
fs = dfscluster.getFileSystem();
mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
inputPath = new Path(fs.getHomeDirectory(), inputDir);
inputPath = new Path(fs.getHomeDirectory(), inputDir);
archivePath = new Path(fs.getHomeDirectory(), "archive");
fs.mkdirs(inputPath);
createFile(inputPath, "a", fs);
createFile(inputPath, "b", fs);
createFile(inputPath, "c", fs);
}
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
try {
if (mapred != null) {
mapred.shutdown();
if (dfscluster != null) {
dfscluster.shutdown();
}
if (dfscluster != null) {
dfscluster.shutdown();
}
} catch(Exception e) {
} catch (Exception e) {
System.err.println(e);
}
super.tearDown();
}
@Test
public void testRelativePath() throws Exception {
fs.delete(archivePath, true);
final Path sub1 = new Path(inputPath, "dir1");
fs.mkdirs(sub1);
createFile(sub1, "a", fs);
final Configuration conf = mapred.createJobConf();
final FsShell shell = new FsShell(conf);
final List<String> originalPaths = lsr(shell, "input");
System.out.println("originalPath: " + originalPaths);
final URI uri = fs.getUri();
final String prefix = "har://hdfs-" + uri.getHost() +":" + uri.getPort()
final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort()
+ archivePath.toUri().getPath() + Path.SEPARATOR;
{
final String harName = "foo.har";
final String[] args = {
"-archiveName",
harName,
"-p",
"input",
"*",
"archive"
};
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
assertEquals(0, ToolRunner.run(har, args));
final String[] args = { "-archiveName", harName, "-p", "input", "*",
"archive" };
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(conf);
Assert.assertEquals(0, ToolRunner.run(har, args));
//compare results
// compare results
final List<String> harPaths = lsr(shell, prefix + harName);
assertEquals(originalPaths, harPaths);
Assert.assertEquals(originalPaths, harPaths);
}
}
@Test
public void testPathWithSpaces() throws Exception {
fs.delete(archivePath, true);
//create files/directories with spaces
// create files/directories with spaces
createFile(inputPath, "c c", fs);
final Path sub1 = new Path(inputPath, "sub 1");
fs.mkdirs(sub1);
@ -154,42 +158,36 @@ public class TestHadoopArchives extends TestCase {
final Path sub2 = new Path(inputPath, "sub 1 with suffix");
fs.mkdirs(sub2);
createFile(sub2, "z", fs);
final Configuration conf = mapred.createJobConf();
final FsShell shell = new FsShell(conf);
final String inputPathStr = inputPath.toUri().getPath();
System.out.println("inputPathStr = " + inputPathStr);
final List<String> originalPaths = lsr(shell, inputPathStr);
final URI uri = fs.getUri();
final String prefix = "har://hdfs-" + uri.getHost() +":" + uri.getPort()
final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort()
+ archivePath.toUri().getPath() + Path.SEPARATOR;
{//Enable space replacement
{// Enable space replacement
final String harName = "foo.har";
final String[] args = {
"-archiveName",
harName,
"-p",
inputPathStr,
"*",
archivePath.toString()
};
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
assertEquals(0, ToolRunner.run(har, args));
final String[] args = { "-archiveName", harName, "-p", inputPathStr, "*",
archivePath.toString() };
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(conf);
Assert.assertEquals(0, ToolRunner.run(har, args));
//compare results
// compare results
final List<String> harPaths = lsr(shell, prefix + harName);
assertEquals(originalPaths, harPaths);
Assert.assertEquals(originalPaths, harPaths);
}
}
private static List<String> lsr(final FsShell shell, String dir
) throws Exception {
private static List<String> lsr(final FsShell shell, String dir)
throws Exception {
System.out.println("lsr root=" + dir);
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final PrintStream out = new PrintStream(bytes);
final PrintStream oldOut = System.out;
final PrintStream oldErr = System.err;
@ -197,7 +195,7 @@ public class TestHadoopArchives extends TestCase {
System.setErr(out);
final String results;
try {
assertEquals(0, shell.run(new String[]{"-lsr", dir}));
Assert.assertEquals(0, shell.run(new String[] { "-lsr", dir }));
results = bytes.toString();
} finally {
IOUtils.closeStream(out);
@ -206,13 +204,13 @@ public class TestHadoopArchives extends TestCase {
}
System.out.println("lsr results:\n" + results);
String dirname = dir;
if (dir.lastIndexOf(Path.SEPARATOR) != -1 ) {
if (dir.lastIndexOf(Path.SEPARATOR) != -1) {
dirname = dir.substring(dir.lastIndexOf(Path.SEPARATOR));
}
final List<String> paths = new ArrayList<String>();
for(StringTokenizer t = new StringTokenizer(results, "\n");
t.hasMoreTokens(); ) {
for (StringTokenizer t = new StringTokenizer(results, "\n"); t
.hasMoreTokens();) {
final String s = t.nextToken();
final int i = s.indexOf(dirname);
if (i >= 0) {
@ -220,7 +218,8 @@ public class TestHadoopArchives extends TestCase {
}
}
Collections.sort(paths);
System.out.println("lsr paths = " + paths.toString().replace(", ", ",\n "));
System.out
.println("lsr paths = " + paths.toString().replace(", ", ",\n "));
return paths;
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.security.Permission;
public class TestExternalCall {
private static final Log LOG = LogFactory.getLog(TestExternalCall.class);
private static FileSystem fs;
private static String root;
private static Configuration getConf() {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
return conf;
}
@Before
public void setup() {
securityManager = System.getSecurityManager();
System.setSecurityManager(new NoExitSecurityManager());
try {
fs = FileSystem.get(getConf());
root = new Path("target/tmp").makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toString();
TestDistCpUtils.delete(fs, root);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
}
}
@After
public void tearDown() {
System.setSecurityManager(securityManager);
}
/**
* test methods run end execute of DistCp class. silple copy file
* @throws Exception
*/
@Test
public void testCleanup() throws Exception {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf),
conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
DistCp distcp = new DistCp(conf, null);
String[] arg = { soure.toString(), target.toString() };
distcp.run(arg);
Assert.assertTrue(fs.exists(target));
}
private Path createFile(String fname) throws IOException {
Path result = new Path(root + "/" + fname);
OutputStream out = fs.create(result);
try {
out.write((root + "/" + fname).getBytes());
out.write("\n".getBytes());
} finally {
out.close();
}
return result;
}
/**
* test main method of DistCp. Method should to call System.exit().
*
*/
@Test
public void testCleanupTestViaToolRunner() throws IOException, InterruptedException {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
try {
String[] arg = {target.toString(),soure.toString()};
DistCp.main(arg);
Assert.fail();
} catch (ExitException t) {
Assert.assertTrue(fs.exists(target));
Assert.assertEquals(t.status, 0);
Assert.assertEquals(
stagingDir.getFileSystem(conf).listStatus(stagingDir).length, 0);
}
}
private SecurityManager securityManager;
protected static class ExitException extends SecurityException {
private static final long serialVersionUID = -1982617086752946683L;
public final int status;
public ExitException(int status) {
super("There is no escape!");
this.status = status;
}
}
private static class NoExitSecurityManager extends SecurityManager {
@Override
public void checkPermission(Permission perm) {
// allow anything.
}
@Override
public void checkPermission(Permission perm, Object context) {
// allow anything.
}
@Override
public void checkExit(int status) {
super.checkExit(status);
throw new ExitException(status);
}
}
}

View File

@ -43,21 +43,19 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpV1;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.junit.Ignore;
/**
* A JUnit test for copying files recursively.
*/
@Ignore
public class TestCopyFiles extends TestCase {
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
@ -738,20 +736,22 @@ public class TestCopyFiles extends TestCase {
public void testMapCount() throws Exception {
String namenode = null;
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
MiniDFSCluster mr = null;
try {
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, 3, true, null);
dfs= new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
FileSystem fs = dfs.getFileSystem();
final FsShell shell = new FsShell(conf);
namenode = fs.getUri().toString();
mr = new MiniMRCluster(3, namenode, 1);
MyFile[] files = createFiles(fs.getUri(), "/srcdat");
long totsize = 0;
for (MyFile f : files) {
totsize += f.getSize();
}
Configuration job = mr.createJobConf();
Configuration job = new JobConf(conf);
job.setLong("distcp.bytes.per.map", totsize / 3);
ToolRunner.run(new DistCpV1(job),
new String[] {"-m", "100",
@ -766,8 +766,7 @@ public class TestCopyFiles extends TestCase {
System.out.println(execCmd(shell, "-lsr", logdir));
FileStatus[] logs = fs.listStatus(new Path(logdir));
// rare case where splits are exact, logs.length can be 4
assertTrue("Unexpected map count, logs.length=" + logs.length,
logs.length == 5 || logs.length == 4);
assertTrue( logs.length == 2);
deldir(fs, "/destdat");
deldir(fs, "/logs");

View File

@ -22,8 +22,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.LogFactory;
@ -39,10 +37,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.log4j.Level;
import org.junit.Ignore;
@Ignore
public class TestDistCh extends junit.framework.TestCase {
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
@ -52,7 +50,8 @@ public class TestDistCh extends junit.framework.TestCase {
}
static final Long RANDOM_NUMBER_GENERATOR_SEED = null;
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
private static final Random RANDOM = new Random();
static {
final long seed = RANDOM_NUMBER_GENERATOR_SEED == null?
@ -65,7 +64,7 @@ public class TestDistCh extends junit.framework.TestCase {
new Path(System.getProperty("test.build.data","/tmp")
).toString().replace(' ', '+');
static final int NUN_SUBS = 5;
static final int NUN_SUBS = 7;
static class FileTree {
private final FileSystem fs;
@ -127,9 +126,12 @@ public class TestDistCh extends junit.framework.TestCase {
public void testDistCh() throws Exception {
final Configuration conf = new Configuration();
final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
conf.set(CapacitySchedulerConfiguration.PREFIX+CapacitySchedulerConfiguration.ROOT+"."+CapacitySchedulerConfiguration.QUEUES, "default");
conf.set(CapacitySchedulerConfiguration.PREFIX+CapacitySchedulerConfiguration.ROOT+".default."+CapacitySchedulerConfiguration.CAPACITY, "100");
final MiniDFSCluster cluster= new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
final FileSystem fs = cluster.getFileSystem();
final MiniMRCluster mr = new MiniMRCluster(2, fs.getUri().toString(), 1);
final FsShell shell = new FsShell(conf);
try {
@ -138,37 +140,36 @@ public class TestDistCh extends junit.framework.TestCase {
runLsr(shell, tree.root, 0);
//generate random arguments
final String[] args = new String[RANDOM.nextInt(NUN_SUBS-1) + 1];
final String[] args = new String[NUN_SUBS];
final PermissionStatus[] newstatus = new PermissionStatus[NUN_SUBS];
final List<Integer> indices = new LinkedList<Integer>();
for(int i = 0; i < NUN_SUBS; i++) {
indices.add(i);
}
for(int i = 0; i < args.length; i++) {
final int index = indices.remove(RANDOM.nextInt(indices.size()));
final String sub = "sub" + index;
final boolean changeOwner = RANDOM.nextBoolean();
final boolean changeGroup = RANDOM.nextBoolean();
final boolean changeMode = !changeOwner && !changeGroup? true: RANDOM.nextBoolean();
final String owner = changeOwner? sub: "";
final String group = changeGroup? sub: "";
final String permission = changeMode? RANDOM.nextInt(8) + "" + RANDOM.nextInt(8) + "" + RANDOM.nextInt(8): "";
args[i] = tree.root + "/" + sub + ":" + owner + ":" + group + ":" + permission;
newstatus[index] = new ChPermissionStatus(rootstatus, owner, group, permission);
}
for(int i = 0; i < NUN_SUBS; i++) {
if (newstatus[i] == null) {
newstatus[i] = new ChPermissionStatus(rootstatus);
}
}
args[0]="/test/testDistCh/sub0:sub1::";
newstatus[0] = new ChPermissionStatus(rootstatus, "sub1", "", "");
args[1]="/test/testDistCh/sub1::sub2:";
newstatus[1] = new ChPermissionStatus(rootstatus, "", "sub2", "");
args[2]="/test/testDistCh/sub2:::437";
newstatus[2] = new ChPermissionStatus(rootstatus, "", "", "437");
args[3]="/test/testDistCh/sub3:sub1:sub2:447";
newstatus[3] = new ChPermissionStatus(rootstatus, "sub1", "sub2", "447");
args[4]="/test/testDistCh/sub4::sub5:437";
newstatus[4] = new ChPermissionStatus(rootstatus, "", "sub5", "437");
args[5]="/test/testDistCh/sub5:sub1:sub5:";
newstatus[5] = new ChPermissionStatus(rootstatus, "sub1", "sub5", "");
args[6]="/test/testDistCh/sub6:sub3::437";
newstatus[6] = new ChPermissionStatus(rootstatus, "sub3", "", "437");
System.out.println("args=" + Arrays.asList(args).toString().replace(",", ",\n "));
System.out.println("newstatus=" + Arrays.asList(newstatus).toString().replace(",", ",\n "));
//run DistCh
new DistCh(mr.createJobConf()).run(args);
new DistCh(MiniMRClientClusterFactory.create(this.getClass(), 2, conf).getConfig()).run(args);
runLsr(shell, tree.root, 0);
//check results
@ -184,7 +185,7 @@ public class TestDistCh extends junit.framework.TestCase {
}
}
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
static void checkFileStatus(PermissionStatus expected, FileStatus actual) {
assertEquals(expected.getUserName(), actual.getOwner());

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
public class TestLogalyzer {
private static String EL = System.getProperty("line.separator");
private static String TAB = "\t";
private static final Log LOG = LogFactory.getLog(TestLogalyzer.class);
private static File workSpace = new File("target",
TestLogalyzer.class.getName() + "-workSpace");
private static File outdir = new File(workSpace.getAbsoluteFile()
+ File.separator + "out");
@Test
public void testLogalyzer() throws Exception {
Path f = createLogFile();
String[] args = new String[10];
args[0] = "-archiveDir";
args[1] = f.toString();
args[2] = "-grep";
args[3] = "44";
args[4] = "-sort";
args[5] = "0";
args[6] = "-analysis";
args[7] = outdir.getAbsolutePath();
args[8] = "-separator";
args[9] = " ";
Logalyzer.main(args);
checkResult();
}
private void checkResult() throws Exception {
File result = new File(outdir.getAbsolutePath() + File.separator
+ "part-00000");
File success = new File(outdir.getAbsolutePath() + File.separator
+ "_SUCCESS");
Assert.assertTrue(success.exists());
FileInputStream fis = new FileInputStream(result);
BufferedReader br = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
String line = br.readLine();
Assert.assertTrue(("1 44" + TAB + "2").equals(line));
line = br.readLine();
Assert.assertTrue(("3 44" + TAB + "1").equals(line));
line = br.readLine();
Assert.assertTrue(("4 44" + TAB + "1").equals(line));
br.close();
}
/**
* Create simple log file
*
* @return
* @throws IOException
*/
private Path createLogFile() throws IOException {
FileContext files = FileContext.getLocalFSFileContext();
Path ws = new Path(workSpace.getAbsoluteFile().getAbsolutePath());
files.delete(ws, true);
Path workSpacePath = new Path(workSpace.getAbsolutePath(), "log");
files.mkdir(workSpacePath, null, true);
LOG.info("create logfile.log");
Path logfile1 = new Path(workSpacePath, "logfile.log");
FSDataOutputStream os = files.create(logfile1,
EnumSet.of(CreateFlag.CREATE));
os.writeBytes("4 3" + EL + "1 3" + EL + "4 44" + EL);
os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL);
os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL);
os.flush();
os.close();
LOG.info("create logfile1.log");
Path logfile2 = new Path(workSpacePath, "logfile1.log");
os = files.create(logfile2, EnumSet.of(CreateFlag.CREATE));
os.writeBytes("4 3" + EL + "1 3" + EL + "3 44" + EL);
os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL);
os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL);
os.flush();
os.close();
return workSpacePath;
}
}

View File

@ -180,6 +180,9 @@ Release 0.23.6 - UNRELEASED
YARN-151. Browser thinks RM main page JS is taking too long
(Ravi Prakash via bobby)
YARN-204. test coverage for org.apache.hadoop.tools (Aleksey Gorshkov via
bobby)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -56,6 +56,8 @@ History Server REST API's.
*---------------+--------------+-------------------------------+
|| Item || Data Type || Description |
*---------------+--------------+-------------------------------+
| startedOn | long | The time the history server was started (in ms since epoch)|
*---------------+--------------+-------------------------------+
| hadoopVersion | string | Version of hadoop common |
*---------------+--------------+-------------------------------+
| hadoopBuildVersion | string | Hadoop common build string with build version, user, and checksum |
@ -87,6 +89,7 @@ History Server REST API's.
+---+
{
"historyInfo" : {
"startedOn":1353512830963,
"hadoopVersionBuiltOn" : "Wed Jan 11 21:18:36 UTC 2012",
"hadoopBuildVersion" : "0.23.1-SNAPSHOT from 1230253 by user1 source checksum bb6e554c6d50b0397d826081017437a7",
"hadoopVersion" : "0.23.1-SNAPSHOT"
@ -117,6 +120,7 @@ History Server REST API's.
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<historyInfo>
<startedOn>1353512830963</startedOn>
<hadoopVersion>0.23.1-SNAPSHOT</hadoopVersion>
<hadoopBuildVersion>0.23.1-SNAPSHOT from 1230253 by user1 source checksum bb6e554c6d50b0397d826081017437a7</hadoopBuildVersion>
<hadoopVersionBuiltOn>Wed Jan 11 21:18:36 UTC 2012</hadoopVersionBuiltOn>