Merge branch 'trunk' into HDFS-16864

This commit is contained in:
Dave Marion 2023-02-17 12:09:17 +00:00
commit 1ecbf91cb0
260 changed files with 11113 additions and 4006 deletions

View File

@ -14,6 +14,8 @@
# limitations under the License.
github:
ghp_path: /
ghp_branch: gh-pages
enabled_merge_buttons:
squash: true
merge: false

59
.github/workflows/website.yml vendored Normal file
View File

@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: website
# Controls when the action will run.
on:
push:
branches: [ trunk ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout Hadoop trunk
uses: actions/checkout@v3
with:
repository: apache/hadoop
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
- name: Cache local Maven repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Build Hadoop maven plugins
run: cd hadoop-maven-plugins && mvn --batch-mode install
- name: Build Hadoop
run: mvn clean install -DskipTests -DskipShade
- name: Build document
run: mvn clean site
- name: Stage document
run: mvn site:stage -DstagingDirectory=${GITHUB_WORKSPACE}/staging/
- name: Deploy to GitHub Pages
uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./staging/hadoop-project
user_name: 'github-actions[bot]'
user_email: 'github-actions[bot]@users.noreply.github.com'

View File

@ -250,7 +250,6 @@ commons-codec:commons-codec:1.11
commons-collections:commons-collections:3.2.2
commons-daemon:commons-daemon:1.0.13
commons-io:commons-io:2.8.0
commons-logging:commons-logging:1.1.3
commons-net:commons-net:3.9.0
de.ruedigermoeller:fst:2.50
io.grpc:grpc-api:1.26.0
@ -260,7 +259,6 @@ io.grpc:grpc-netty:1.26.0
io.grpc:grpc-protobuf:1.26.0
io.grpc:grpc-protobuf-lite:1.26.0
io.grpc:grpc-stub:1.26.0
io.netty:netty:3.10.6.Final
io.netty:netty-all:4.1.77.Final
io.netty:netty-buffer:4.1.77.Final
io.netty:netty-codec:4.1.77.Final
@ -363,7 +361,7 @@ org.lz4:lz4-java:1.7.1
org.objenesis:objenesis:2.6
org.xerial.snappy:snappy-java:1.0.5
org.yaml:snakeyaml:1.33
org.wildfly.openssl:wildfly-openssl:1.0.7.Final
org.wildfly.openssl:wildfly-openssl:1.1.3.Final
--------------------------------------------------------------------------------

View File

@ -180,11 +180,6 @@
<artifactId>jersey-server</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@ -200,11 +195,6 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.grizzly</groupId>
<artifactId>grizzly-http-servlet</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>

View File

@ -241,12 +241,15 @@ public class CryptoOutputStream extends FilterOutputStream implements
return;
}
try {
flush();
if (closeOutputStream) {
super.close();
codec.close();
try {
flush();
} finally {
if (closeOutputStream) {
super.close();
codec.close();
}
freeBuffers();
}
freeBuffers();
} finally {
closed = true;
}

View File

@ -302,7 +302,12 @@ public abstract class CachingBlockManager extends BlockManager {
private void read(BufferData data) throws IOException {
synchronized (data) {
readBlock(data, false, BufferData.State.BLANK);
try {
readBlock(data, false, BufferData.State.BLANK);
} catch (IOException e) {
LOG.error("error reading block {}", data.getBlockNumber(), e);
throw e;
}
}
}
@ -362,9 +367,6 @@ public abstract class CachingBlockManager extends BlockManager {
buffer.flip();
data.setReady(expectedState);
} catch (Exception e) {
String message = String.format("error during readBlock(%s)", data.getBlockNumber());
LOG.error(message, e);
if (isPrefetch && tracker != null) {
tracker.failed();
}
@ -406,7 +408,8 @@ public abstract class CachingBlockManager extends BlockManager {
try {
blockManager.prefetch(data, taskQueuedStartTime);
} catch (Exception e) {
LOG.error("error during prefetch", e);
LOG.info("error prefetching block {}. {}", data.getBlockNumber(), e.getMessage());
LOG.debug("error prefetching block {}", data.getBlockNumber(), e);
}
return null;
}
@ -493,7 +496,8 @@ public abstract class CachingBlockManager extends BlockManager {
return;
}
} catch (Exception e) {
LOG.error("error waiting on blockFuture: {}", data, e);
LOG.info("error waiting on blockFuture: {}. {}", data, e.getMessage());
LOG.debug("error waiting on blockFuture: {}", data, e);
data.setDone();
return;
}
@ -523,8 +527,8 @@ public abstract class CachingBlockManager extends BlockManager {
data.setDone();
} catch (Exception e) {
numCachingErrors.incrementAndGet();
String message = String.format("error adding block to cache after wait: %s", data);
LOG.error(message, e);
LOG.info("error adding block to cache after wait: {}. {}", data, e.getMessage());
LOG.debug("error adding block to cache after wait: {}", data, e);
data.setDone();
}

View File

@ -15,6 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Filesystem implementations that allow Hadoop to read directly from
* the local file system.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
package org.apache.hadoop.fs.local;

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for the execution of a file system command.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.fs.shell;

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for embedded HTTP services.
*/
@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce"})
@InterfaceStability.Unstable
package org.apache.hadoop.http;

View File

@ -32,7 +32,6 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -246,30 +245,6 @@ public class IOUtils {
}
}
/**
* Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param log the log to record problems to at debug level. Can be null.
* @param closeables the objects to close
* @deprecated use {@link #cleanupWithLogger(Logger, java.io.Closeable...)}
* instead
*/
@Deprecated
public static void cleanup(Log log, java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch(Throwable e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing " + c, e);
}
}
}
}
}
/**
* Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
* null pointers. Must only be used for cleanup in exception handlers.

View File

@ -92,7 +92,7 @@ public class WritableName {
) throws IOException {
Class<?> writableClass = NAME_TO_CLASS.get(name);
if (writableClass != null)
return writableClass.asSubclass(Writable.class);
return writableClass;
try {
return conf.getClassByName(name);
} catch (ClassNotFoundException e) {

View File

@ -15,6 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of compression/decompression for the BZip2
* compression algorithm.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.compress.bzip2;

View File

@ -15,6 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of compression/decompression for the LZ4
* compression algorithm.
*
* @see <a href="http://code.google.com/p/lz4/">LZ4</a>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.compress.lz4;

View File

@ -15,6 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of compression/decompression for the Snappy
* compression algorithm.
*
* @see <a href="http://code.google.com/p/snappy/">Snappy</a>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.compress.snappy;

View File

@ -15,6 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of compression/decompression based on the popular
* gzip compressed file format.
*
* @see <a href="http://www.gzip.org/">gzip</a>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.compress.zlib;

View File

@ -15,6 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of compression/decompression based on the zStandard
* compression algorithm.
*
* @see <a href="https://github.com/facebook/zstd">zStandard</a>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.compress.zstd;

View File

@ -15,6 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Various native IO-related calls not available in Java. These
* functions should generally be used alongside a fallback to another
* more portable mechanism.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.nativeio;

View File

@ -704,7 +704,7 @@ public class Client implements AutoCloseable {
* handle that, a relogin is attempted.
*/
private synchronized void handleSaslConnectionFailure(
final int currRetries, final int maxRetries, final Exception ex,
final int currRetries, final int maxRetries, final IOException ex,
final Random rand, final UserGroupInformation ugi) throws IOException,
InterruptedException {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@ -715,10 +715,7 @@ public class Client implements AutoCloseable {
disposeSasl();
if (shouldAuthenticateOverKrb()) {
if (currRetries < maxRetries) {
if(LOG.isDebugEnabled()) {
LOG.debug("Exception encountered while connecting to "
+ "the server : " + ex);
}
LOG.debug("Exception encountered while connecting to the server {}", remoteId, ex);
// try re-login
if (UserGroupInformation.isLoginKeytabBased()) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
@ -736,7 +733,11 @@ public class Client implements AutoCloseable {
+ UserGroupInformation.getLoginUser().getUserName() + " to "
+ remoteId;
LOG.warn(msg, ex);
throw (IOException) new IOException(msg).initCause(ex);
throw NetUtils.wrapException(remoteId.getAddress().getHostName(),
remoteId.getAddress().getPort(),
NetUtils.getHostname(),
0,
ex);
}
} else {
// With RequestHedgingProxyProvider, one rpc call will send multiple
@ -744,11 +745,9 @@ public class Client implements AutoCloseable {
// all other requests will be interrupted. It's not a big problem,
// and should not print a warning log.
if (ex instanceof InterruptedIOException) {
LOG.debug("Exception encountered while connecting to the server",
ex);
LOG.debug("Exception encountered while connecting to the server {}", remoteId, ex);
} else {
LOG.warn("Exception encountered while connecting to the server ",
ex);
LOG.warn("Exception encountered while connecting to the server {}", remoteId, ex);
}
}
if (ex instanceof RemoteException)
@ -1182,7 +1181,14 @@ public class Client implements AutoCloseable {
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
RpcWritable.wrap(call.rpcRequest).writeTo(buf);
rpcRequestQueue.put(Pair.of(call, buf));
// Wait for the message to be sent. We offer with timeout to
// prevent a race condition between checking the shouldCloseConnection
// and the stopping of the polling thread
while (!shouldCloseConnection.get()) {
if (rpcRequestQueue.offer(Pair.of(call, buf), 1, TimeUnit.SECONDS)) {
break;
}
}
}
/* Receive a response.

View File

@ -900,12 +900,26 @@ public class RPC {
/**
* @return Default: -1.
* @param numReaders input numReaders.
* @deprecated call {@link #setNumReaders(int value)} instead.
*/
@Deprecated
public Builder setnumReaders(int numReaders) {
this.numReaders = numReaders;
return this;
}
/**
* Set the number of reader threads.
*
* @return this builder.
* @param value input numReaders.
* @since HADOOP-18625.
*/
public Builder setNumReaders(int value) {
this.numReaders = value;
return this;
}
/**
* @return Default: -1.
* @param queueSizePerHandler

View File

@ -1985,11 +1985,26 @@ public abstract class Server {
private long lastContact;
private int dataLength;
private Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
private String hostAddress;
private int remotePort;
private InetAddress addr;
/**
* Client Host IP address from where the socket connection is being established to the Server.
*/
private final String hostAddress;
/**
* Client remote port used for the given socket connection.
*/
private final int remotePort;
/**
* Address to which the socket is connected to.
*/
private final InetAddress addr;
/**
* Client Host address from where the socket connection is being established to the Server.
*/
private final String hostName;
IpcConnectionContextProto connectionContext;
String protocolName;
@ -2033,8 +2048,12 @@ public abstract class Server {
this.isOnAuxiliaryPort = isOnAuxiliaryPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
this.hostName = this.hostAddress;
} else {
// host IP address
this.hostAddress = addr.getHostAddress();
// host name for the IP address
this.hostName = addr.getHostName();
}
this.remotePort = socket.getPort();
this.responseQueue = new LinkedList<RpcCall>();
@ -2050,7 +2069,7 @@ public abstract class Server {
@Override
public String toString() {
return getHostAddress() + ":" + remotePort;
return hostName + ":" + remotePort + " / " + hostAddress + ":" + remotePort;
}
boolean setShouldClose() {
@ -2463,19 +2482,18 @@ public abstract class Server {
return -1;
}
if(!RpcConstants.HEADER.equals(dataLengthBuffer)) {
LOG.warn("Incorrect RPC Header length from {}:{} "
+ "expected length: {} got length: {}",
hostAddress, remotePort, RpcConstants.HEADER, dataLengthBuffer);
if (!RpcConstants.HEADER.equals(dataLengthBuffer)) {
LOG.warn("Incorrect RPC Header length from {}:{} / {}:{}. Expected: {}. Actual: {}",
hostName, remotePort, hostAddress, remotePort, RpcConstants.HEADER,
dataLengthBuffer);
setupBadVersionResponse(version);
return -1;
}
if (version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
LOG.warn("Version mismatch from {}:{} / {}:{}. "
+ "Expected version: {}. Actual version: {} ", hostName,
remotePort, hostAddress, remotePort, CURRENT_VERSION, version);
setupBadVersionResponse(version);
return -1;
}

View File

@ -34,10 +34,6 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Jdk14Logger;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -51,6 +47,8 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
* Change log level in runtime.
@ -340,22 +338,14 @@ public class LogLevel {
out.println(MARKER
+ "Submitted Class Name: <b>" + logName + "</b><br />");
Log log = LogFactory.getLog(logName);
Logger log = Logger.getLogger(logName);
out.println(MARKER
+ "Log Class: <b>" + log.getClass().getName() +"</b><br />");
if (level != null) {
out.println(MARKER + "Submitted Level: <b>" + level + "</b><br />");
}
if (log instanceof Log4JLogger) {
process(((Log4JLogger)log).getLogger(), level, out);
}
else if (log instanceof Jdk14Logger) {
process(((Jdk14Logger)log).getLogger(), level, out);
}
else {
out.println("Sorry, " + log.getClass() + " not supported.<br />");
}
process(log, level, out);
}
out.println(FORMS);
@ -371,14 +361,14 @@ public class LogLevel {
+ "<input type='submit' value='Set Log Level' />"
+ "</form>";
private static void process(org.apache.log4j.Logger log, String level,
private static void process(Logger log, String level,
PrintWriter out) throws IOException {
if (level != null) {
if (!level.equalsIgnoreCase(org.apache.log4j.Level.toLevel(level)
if (!level.equalsIgnoreCase(Level.toLevel(level)
.toString())) {
out.println(MARKER + "Bad Level : <b>" + level + "</b><br />");
} else {
log.setLevel(org.apache.log4j.Level.toLevel(level));
log.setLevel(Level.toLevel(level));
out.println(MARKER + "Setting Level to " + level + " ...<br />");
}
}
@ -386,21 +376,5 @@ public class LogLevel {
+ "Effective Level: <b>" + log.getEffectiveLevel() + "</b><br />");
}
private static void process(java.util.logging.Logger log, String level,
PrintWriter out) throws IOException {
if (level != null) {
String levelToUpperCase = level.toUpperCase();
try {
log.setLevel(java.util.logging.Level.parse(levelToUpperCase));
} catch (IllegalArgumentException e) {
out.println(MARKER + "Bad Level : <b>" + level + "</b><br />");
}
out.println(MARKER + "Setting Level to " + level + " ...<br />");
}
java.util.logging.Level lev;
for(; (lev = log.getLevel()) == null; log = log.getParent());
out.println(MARKER + "Effective Level: <b>" + lev + "</b><br />");
}
}
}

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for service-level authorization.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.security.authorize;

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Filters for HTTP service security.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.security.http;

View File

@ -866,9 +866,9 @@ extends AbstractDelegationTokenIdentifier>
/**
* Add token stats to the owner to token count mapping.
*
* @param id
* @param id token id.
*/
private void addTokenForOwnerStats(TokenIdent id) {
protected void addTokenForOwnerStats(TokenIdent id) {
String realOwner = getTokenRealOwner(id);
tokenOwnerStats.put(realOwner,
tokenOwnerStats.getOrDefault(realOwner, 0L)+1);

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* ZooKeeper secret manager for TokenIdentifiers and DelegationKeys.
*/
@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce"})
@InterfaceStability.Evolving
package org.apache.hadoop.security.token.delegation;

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for delegation tokens.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.security.token;

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.slf4j.Logger;
@ -75,9 +74,10 @@ public final class ServiceOperations {
* @param log the log to warn at
* @param service a service; may be null
* @return any exception that was caught; null if none was.
* @see ServiceOperations#stopQuietly(Service)
* @deprecated to be removed with 3.4.0. Use {@link #stopQuietly(Logger, Service)} instead.
*/
public static Exception stopQuietly(Log log, Service service) {
@Deprecated
public static Exception stopQuietly(org.apache.commons.logging.Log log, Service service) {
try {
stop(service);
} catch (Exception e) {

View File

@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for services.
*/
@InterfaceAudience.Public
package org.apache.hadoop.service;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -33,7 +33,7 @@ import java.util.function.Consumer;
*
* This class does not support null element.
*
* This class is not thread safe.
* This class is thread safe.
*
* @param <K> Key type for looking up the elements
* @param <E> Element type, which must be

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.commons.logging.Log;
import org.slf4j.Logger;
class LogAdapter {
private Log LOG;
private Logger LOGGER;
private LogAdapter(Log LOG) {
this.LOG = LOG;
}
private LogAdapter(Logger LOGGER) {
this.LOGGER = LOGGER;
}
/**
* @deprecated use {@link #create(Logger)} instead
*/
@Deprecated
public static LogAdapter create(Log LOG) {
return new LogAdapter(LOG);
}
public static LogAdapter create(Logger LOGGER) {
return new LogAdapter(LOGGER);
}
public void info(String msg) {
if (LOG != null) {
LOG.info(msg);
} else if (LOGGER != null) {
LOGGER.info(msg);
}
}
public void warn(String msg, Throwable t) {
if (LOG != null) {
LOG.warn(msg, t);
} else if (LOGGER != null) {
LOGGER.warn(msg, t);
}
}
public void debug(Throwable t) {
if (LOG != null) {
LOG.debug(t);
} else if (LOGGER != null) {
LOGGER.debug("", t);
}
}
public void error(String msg) {
if (LOG != null) {
LOG.error(msg);
} else if (LOGGER != null) {
LOGGER.error(msg);
}
}
}

View File

@ -36,7 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
@ -228,10 +227,12 @@ public class ReflectionUtils {
* @param log the logger that logs the stack trace
* @param title a descriptive title for the call stacks
* @param minInterval the minimum time from the last
* @deprecated to be removed with 3.4.0. Use {@link #logThreadInfo(Logger, String, long)} instead.
*/
public static void logThreadInfo(Log log,
String title,
long minInterval) {
@Deprecated
public static void logThreadInfo(org.apache.commons.logging.Log log,
String title,
long minInterval) {
boolean dumpStack = false;
if (log.isInfoEnabled()) {
synchronized (ReflectionUtils.class) {

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.util;
import org.slf4j.Logger;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -42,11 +42,11 @@ public enum SignalLogger {
* Our signal handler.
*/
private static class Handler implements SignalHandler {
final private LogAdapter LOG;
final private Logger log;
final private SignalHandler prevHandler;
Handler(String name, LogAdapter LOG) {
this.LOG = LOG;
Handler(String name, Logger log) {
this.log = log;
prevHandler = Signal.handle(new Signal(name), this);
}
@ -57,7 +57,7 @@ public enum SignalLogger {
*/
@Override
public void handle(Signal signal) {
LOG.error("RECEIVED SIGNAL " + signal.getNumber() +
log.error("RECEIVED SIGNAL " + signal.getNumber() +
": SIG" + signal.getName());
prevHandler.handle(signal);
}
@ -66,13 +66,9 @@ public enum SignalLogger {
/**
* Register some signal handlers.
*
* @param LOG The log4j logfile to use in the signal handlers.
* @param log The log4j logfile to use in the signal handlers.
*/
public void register(final Log LOG) {
register(LogAdapter.create(LOG));
}
void register(final LogAdapter LOG) {
public void register(final Logger log) {
if (registered) {
throw new IllegalStateException("Can't re-install the signal handlers.");
}
@ -83,15 +79,15 @@ public enum SignalLogger {
String separator = "";
for (String signalName : SIGNALS) {
try {
new Handler(signalName, LOG);
new Handler(signalName, log);
bld.append(separator)
.append(signalName);
separator = ", ";
} catch (Exception e) {
LOG.debug(e);
log.debug("Error: ", e);
}
}
bld.append("]");
LOG.info(bld.toString());
log.info(bld.toString());
}
}

View File

@ -740,42 +740,26 @@ public class StringUtils {
* Print a log message for starting up and shutting down
* @param clazz the class of the server
* @param args arguments
* @param LOG the target log object
* @param log the target log object
*/
public static void startupShutdownMessage(Class<?> clazz, String[] args,
final org.apache.commons.logging.Log LOG) {
startupShutdownMessage(clazz, args, LogAdapter.create(LOG));
}
/**
* Print a log message for starting up and shutting down
* @param clazz the class of the server
* @param args arguments
* @param LOG the target log object
*/
public static void startupShutdownMessage(Class<?> clazz, String[] args,
final org.slf4j.Logger LOG) {
startupShutdownMessage(clazz, args, LogAdapter.create(LOG));
}
static void startupShutdownMessage(Class<?> clazz, String[] args,
final LogAdapter LOG) {
final org.slf4j.Logger log) {
final String hostname = NetUtils.getHostname();
final String classname = clazz.getSimpleName();
LOG.info(createStartupShutdownMessage(classname, hostname, args));
log.info(createStartupShutdownMessage(classname, hostname, args));
if (SystemUtils.IS_OS_UNIX) {
try {
SignalLogger.INSTANCE.register(LOG);
SignalLogger.INSTANCE.register(log);
} catch (Throwable t) {
LOG.warn("failed to register any UNIX signal loggers: ", t);
log.warn("failed to register any UNIX signal loggers: ", t);
}
}
ShutdownHookManager.get().addShutdownHook(
new Runnable() {
@Override
public void run() {
LOG.info(toStartupShutdownString("SHUTDOWN_MSG: ", new String[]{
log.info(toStartupShutdownString("SHUTDOWN_MSG: ", new String[]{
"Shutting down " + classname + " at " + hostname}));
LogManager.shutdown();
}

View File

@ -93,6 +93,10 @@ public class VersionInfo {
return info.getProperty("protocVersion", "Unknown");
}
protected String _getCompilePlatform() {
return info.getProperty("compilePlatform", "Unknown");
}
private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("common");
/**
* Get the Hadoop version.
@ -167,12 +171,21 @@ public class VersionInfo {
return COMMON_VERSION_INFO._getProtocVersion();
}
/**
* Returns the OS platform used for the build.
* @return the OS platform
*/
public static String getCompilePlatform() {
return COMMON_VERSION_INFO._getCompilePlatform();
}
public static void main(String[] args) {
LOG.debug("version: "+ getVersion());
System.out.println("Hadoop " + getVersion());
System.out.println("Source code repository " + getUrl() + " -r " +
getRevision());
System.out.println("Compiled by " + getUser() + " on " + getDate());
System.out.println("Compiled on platform " + getCompilePlatform());
System.out.println("Compiled with protoc " + getProtocVersion());
System.out.println("From source with checksum " + getSrcChecksum());
System.out.println("This command was run using " +

View File

@ -1,5 +1,4 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,9 +14,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
/**
* Support for concurrent execution.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.util.concurrent;

View File

@ -24,3 +24,4 @@ date=${version-info.build.time}
url=${version-info.scm.uri}
srcChecksum=${version-info.source.md5}
protocVersion=${hadoop.protobuf.version}
compilePlatform=${os.detected.classifier}

View File

@ -35,6 +35,8 @@ These instructions do not cover integration with any Kerberos services,
-everyone bringing up a production cluster should include connecting to their
organisation's Kerberos infrastructure as a key part of the deployment.
See [Security](./SecureMode.html) for details on how to secure a cluster.
Prerequisites
-------------

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.crypto;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.Mockito.*;
/**
@ -54,4 +56,22 @@ public class TestCryptoOutputStreamClosing {
verify(outputStream, never()).close();
}
@Test
public void testUnderlyingOutputStreamClosedWhenExceptionClosing() throws Exception {
OutputStream outputStream = mock(OutputStream.class);
CryptoOutputStream cos = spy(new CryptoOutputStream(outputStream, codec,
new byte[16], new byte[16], 0L, true));
// exception while flushing during close
doThrow(new IOException("problem flushing wrapped stream"))
.when(cos).flush();
intercept(IOException.class,
() -> cos.close());
// We expect that the close of the CryptoOutputStream closes the
// wrapped OutputStream even though we got an exception
// during CryptoOutputStream::close (in the flush method)
verify(outputStream).close();
}
}

View File

@ -1321,16 +1321,16 @@ public class TestFileUtil {
if (wildcardPath.equals(classPath)) {
// add wildcard matches
for (File wildcardMatch: wildcardMatches) {
expectedClassPaths.add(wildcardMatch.toURI().toURL()
expectedClassPaths.add(wildcardMatch.getCanonicalFile().toURI().toURL()
.toExternalForm());
}
} else {
File fileCp = null;
if(!new Path(classPath).isAbsolute()) {
fileCp = new File(tmp, classPath);
fileCp = new File(tmp, classPath).getCanonicalFile();
}
else {
fileCp = new File(classPath);
fileCp = new File(classPath).getCanonicalFile();
}
if (nonExistentSubdir.equals(classPath)) {
// expect to maintain trailing path separator if present in input, even
@ -1385,7 +1385,8 @@ public class TestFileUtil {
for (Path jar: jars) {
URL url = jar.toUri().toURL();
assertTrue("the jar should match either of the jars",
url.equals(jar1.toURI().toURL()) || url.equals(jar2.toURI().toURL()));
url.equals(jar1.getCanonicalFile().toURI().toURL()) ||
url.equals(jar2.getCanonicalFile().toURI().toURL()));
}
}

View File

@ -25,8 +25,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -39,7 +37,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
@ -51,8 +50,8 @@ import org.junit.Test;
*/
public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
private static final Log LOG =
LogFactory.getLog(TestViewFileSystemLocalFileSystem.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestViewFileSystemLocalFileSystem.class);
@Override
@Before

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -35,6 +33,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
@ -43,8 +43,8 @@ import org.junit.Test;
*/
public class TestViewFileSystemOverloadSchemeLocalFileSystem {
private static final String FILE = "file";
private static final Log LOG =
LogFactory.getLog(TestViewFileSystemOverloadSchemeLocalFileSystem.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestViewFileSystemOverloadSchemeLocalFileSystem.class);
private FileSystem fsTarget;
private Configuration conf;
private Path targetTestRoot;

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.http;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.minikdc.MiniKdc;
@ -53,8 +51,6 @@ import static org.junit.Assert.assertTrue;
*/
public class TestHttpServerWithSpnego {
static final Log LOG = LogFactory.getLog(TestHttpServerWithSpnego.class);
private static final String SECRET_STR = "secret";
private static final String HTTP_USER = "HTTP";
private static final String PREFIX = "hadoop.http.authentication.";

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
@ -756,6 +759,122 @@ public class TestSequenceFile {
}
}
@Test
public void testSerializationUsingWritableNameAlias() throws IOException {
Configuration config = new Configuration();
config.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, SimpleSerializer.class.getName());
Path path = new Path(System.getProperty("test.build.data", "."),
"SerializationUsingWritableNameAlias");
// write with the original serializable class
SequenceFile.Writer writer = SequenceFile.createWriter(
config,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(SimpleSerializable.class),
SequenceFile.Writer.valueClass(SimpleSerializable.class));
int max = 10;
try {
SimpleSerializable val = new SimpleSerializable();
val.setId(-1);
for (int i = 0; i < max; i++) {
SimpleSerializable key = new SimpleSerializable();
key.setId(i);
writer.append(key, val);
}
} finally {
writer.close();
}
// override name so it gets forced to the new serializable
WritableName.setName(AnotherSimpleSerializable.class, SimpleSerializable.class.getName());
// read and expect our new serializable, and all the correct values read
SequenceFile.Reader reader = new SequenceFile.Reader(
config,
SequenceFile.Reader.file(path));
AnotherSimpleSerializable key = new AnotherSimpleSerializable();
int count = 0;
while (true) {
key = (AnotherSimpleSerializable) reader.next(key);
if (key == null) {
// make sure we exhausted all the ints we wrote
assertEquals(count, max);
break;
}
assertEquals(count++, key.getId());
}
}
public static class SimpleSerializable implements Serializable {
private int id;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
public static class AnotherSimpleSerializable extends SimpleSerializable {
}
public static class SimpleSerializer implements Serialization<SimpleSerializable> {
@Override
public boolean accept(Class<?> c) {
return SimpleSerializable.class.isAssignableFrom(c);
}
@Override
public Serializer<SimpleSerializable> getSerializer(Class<SimpleSerializable> c) {
return new Serializer<SimpleSerializable>() {
private DataOutputStream out;
@Override
public void open(OutputStream out) throws IOException {
this.out = new DataOutputStream(out);
}
@Override
public void serialize(SimpleSerializable simpleSerializable) throws IOException {
out.writeInt(simpleSerializable.getId());
}
@Override
public void close() throws IOException {
out.close();
}
};
}
@Override
public Deserializer<SimpleSerializable> getDeserializer(Class<SimpleSerializable> c) {
return new Deserializer<SimpleSerializable>() {
private DataInputStream dis;
@Override
public void open(InputStream in) throws IOException {
dis = new DataInputStream(in);
}
@Override
public SimpleSerializable deserialize(SimpleSerializable simpleSerializable)
throws IOException {
simpleSerializable.setId(dis.readInt());
return simpleSerializable;
}
@Override
public void close() throws IOException {
dis.close();
}
};
}
}
/** For debugging and testing. */
public static void main(String[] args) throws Exception {
int count = 1024 * 1024;

View File

@ -24,8 +24,14 @@ import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Unit tests for WritableName. */
@ -63,6 +69,28 @@ public class TestWritableName {
}
}
private static class SimpleSerializable {
}
private static class SimpleSerializer implements Serialization<SimpleSerializable> {
@Override
public boolean accept(Class<?> c) {
return c.equals(SimpleSerializable.class);
}
@Override
public Serializer<SimpleSerializable> getSerializer(Class<SimpleSerializable> c) {
return null;
}
@Override
public Deserializer<SimpleSerializable> getDeserializer(Class<SimpleSerializable> c) {
return null;
}
}
private static final String testName = "mystring";
@Test
@ -95,7 +123,27 @@ public class TestWritableName {
// check original name still works
test = WritableName.getClass(testName, conf);
assertTrue(test.equals(SimpleWritable.class));
}
@Test
public void testAddNameSerializable() throws Exception {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, SimpleSerializer.class.getName());
SerializationFactory serializationFactory =
new SerializationFactory(conf);
String altName = testName + ".alt";
WritableName.addName(SimpleSerializable.class, altName);
Class<?> test = WritableName.getClass(altName, conf);
assertEquals(test, SimpleSerializable.class);
assertNotNull(serializationFactory.getSerialization(test));
// check original name still works
test = WritableName.getClass(SimpleSerializable.class.getName(), conf);
assertEquals(test, SimpleSerializable.class);
assertNotNull(serializationFactory.getSerialization(test));
}
@Test

View File

@ -1168,6 +1168,10 @@ public class TestIPC {
call(client, addr, serviceClass, conf);
Connection connection = server.getConnections()[0];
LOG.info("Connection is from: {}", connection);
assertEquals(
"Connection string representation should include both IP address and Host name", 2,
connection.toString().split(" / ").length);
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();
@ -1336,7 +1340,7 @@ public class TestIPC {
/**
* Test the retry count while used in a retry proxy.
*/
@Test(timeout=60000)
@Test(timeout=100000)
public void testRetryProxy() throws IOException {
final Client client = new Client(LongWritable.class, conf);

View File

@ -378,7 +378,7 @@ public class TestRPC extends TestRpcBase {
assertEquals(confReaders, server.getNumReaders());
server = newServerBuilder(conf)
.setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200)
.setNumHandlers(1).setNumReaders(3).setQueueSizePerHandler(200)
.setVerbose(false).build();
assertEquals(3, server.getNumReaders());
@ -1849,6 +1849,11 @@ public class TestRPC extends TestRpcBase {
// if it wasn't fatal, verify there's only one open connection.
Connection[] conns = server.getConnections();
assertEquals(reqName, 1, conns.length);
String connectionInfo = conns[0].toString();
LOG.info("Connection is from: {}", connectionInfo);
assertEquals(
"Connection string representation should include both IP address and Host name", 2,
connectionInfo.split(" / ").length);
// verify whether the connection should have been reused.
if (isDisconnected) {
assertNotSame(reqName, lastConn, conns[0]);

View File

@ -22,8 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Appender;
import org.apache.log4j.Category;
@ -44,8 +42,6 @@ import java.util.Vector;
public class TestLog4Json {
private static final Log LOG = LogFactory.getLog(TestLog4Json.class);
@Test
public void testConstruction() throws Throwable {
Log4Json l4j = new Log4Json();

View File

@ -22,9 +22,6 @@ import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -70,8 +67,7 @@ public class TestLogLevel extends KerberosSecurityTestcase {
private final String logName = TestLogLevel.class.getName();
private String clientPrincipal;
private String serverPrincipal;
private final Log testlog = LogFactory.getLog(logName);
private final Logger log = ((Log4JLogger)testlog).getLogger();
private final Logger log = Logger.getLogger(logName);
private final static String PRINCIPAL = "loglevel.principal";
private final static String KEYTAB = "loglevel.keytab";
private static final String PREFIX = "hadoop.http.authentication.";

View File

@ -18,21 +18,27 @@
package org.apache.hadoop.security.authentication.server;
import java.io.IOException;
import java.io.PrintWriter;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import javax.servlet.FilterConfig;
import javax.servlet.FilterChain;
import javax.servlet.ServletContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.ServletRequest;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import static org.assertj.core.api.Assertions.assertThat;
import org.glassfish.grizzly.servlet.HttpServletResponseImpl;
import org.junit.Test;
import org.mockito.Mockito;
@ -76,8 +82,192 @@ public class TestProxyUserAuthenticationFilter {
}
}
private class HttpServletResponseForTest extends HttpServletResponseImpl {
private class HttpServletResponseForTest implements HttpServletResponse {
@Override
public void addCookie(Cookie cookie) {
}
@Override
public boolean containsHeader(String name) {
return false;
}
@Override
public String encodeURL(String url) {
return null;
}
@Override
public String encodeRedirectURL(String url) {
return null;
}
@Override
public String encodeUrl(String url) {
return null;
}
@Override
public String encodeRedirectUrl(String url) {
return null;
}
@Override
public void sendError(int sc, String msg) throws IOException {
}
@Override
public void sendError(int sc) throws IOException {
}
@Override
public void sendRedirect(String location) throws IOException {
}
@Override
public void setDateHeader(String name, long date) {
}
@Override
public void addDateHeader(String name, long date) {
}
@Override
public void setHeader(String name, String value) {
}
@Override
public void addHeader(String name, String value) {
}
@Override
public void setIntHeader(String name, int value) {
}
@Override
public void addIntHeader(String name, int value) {
}
@Override
public void setStatus(int sc) {
}
@Override
public void setStatus(int sc, String sm) {
}
@Override
public int getStatus() {
return 0;
}
@Override
public String getHeader(String name) {
return null;
}
@Override
public Collection<String> getHeaders(String name) {
return null;
}
@Override
public Collection<String> getHeaderNames() {
return null;
}
@Override
public String getCharacterEncoding() {
return null;
}
@Override
public String getContentType() {
return null;
}
@Override
public ServletOutputStream getOutputStream() throws IOException {
return null;
}
@Override
public PrintWriter getWriter() throws IOException {
return null;
}
@Override
public void setCharacterEncoding(String charset) {
}
@Override
public void setContentLength(int len) {
}
@Override
public void setContentLengthLong(long len) {
}
@Override
public void setContentType(String type) {
}
@Override
public void setBufferSize(int size) {
}
@Override
public int getBufferSize() {
return 0;
}
@Override
public void flushBuffer() throws IOException {
}
@Override
public void resetBuffer() {
}
@Override
public boolean isCommitted() {
return false;
}
@Override
public void reset() {
}
@Override
public void setLocale(Locale loc) {
}
@Override
public Locale getLocale() {
return null;
}
}

View File

@ -49,8 +49,6 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -117,29 +115,11 @@ public abstract class GenericTestUtils {
public static final String ERROR_INVALID_ARGUMENT =
"Total wait time should be greater than check interval time";
/**
* @deprecated use {@link #disableLog(org.slf4j.Logger)} instead
*/
@Deprecated
@SuppressWarnings("unchecked")
public static void disableLog(Log log) {
// We expect that commons-logging is a wrapper around Log4j.
disableLog((Log4JLogger) log);
}
@Deprecated
public static Logger toLog4j(org.slf4j.Logger logger) {
return LogManager.getLogger(logger.getName());
}
/**
* @deprecated use {@link #disableLog(org.slf4j.Logger)} instead
*/
@Deprecated
public static void disableLog(Log4JLogger log) {
log.getLogger().setLevel(Level.OFF);
}
/**
* @deprecated use {@link #disableLog(org.slf4j.Logger)} instead
*/
@ -152,45 +132,6 @@ public abstract class GenericTestUtils {
disableLog(toLog4j(logger));
}
/**
* @deprecated
* use {@link #setLogLevel(org.slf4j.Logger, org.slf4j.event.Level)} instead
*/
@Deprecated
@SuppressWarnings("unchecked")
public static void setLogLevel(Log log, Level level) {
// We expect that commons-logging is a wrapper around Log4j.
setLogLevel((Log4JLogger) log, level);
}
/**
* A helper used in log4j2 migration to accept legacy
* org.apache.commons.logging apis.
* <p>
* And will be removed after migration.
*
* @param log a log
* @param level level to be set
*/
@Deprecated
public static void setLogLevel(Log log, org.slf4j.event.Level level) {
setLogLevel(log, Level.toLevel(level.toString()));
}
/**
* @deprecated
* use {@link #setLogLevel(org.slf4j.Logger, org.slf4j.event.Level)} instead
*/
@Deprecated
public static void setLogLevel(Log4JLogger log, Level level) {
log.getLogger().setLevel(level);
}
/**
* @deprecated
* use {@link #setLogLevel(org.slf4j.Logger, org.slf4j.event.Level)} instead
*/
@Deprecated
public static void setLogLevel(Logger logger, Level level) {
logger.setLevel(level);
}
@ -535,13 +476,15 @@ public abstract class GenericTestUtils {
private WriterAppender appender;
private Logger logger;
public static LogCapturer captureLogs(Log l) {
Logger logger = ((Log4JLogger)l).getLogger();
return new LogCapturer(logger);
public static LogCapturer captureLogs(org.slf4j.Logger logger) {
if (logger.getName().equals("root")) {
return new LogCapturer(org.apache.log4j.Logger.getRootLogger());
}
return new LogCapturer(toLog4j(logger));
}
public static LogCapturer captureLogs(org.slf4j.Logger logger) {
return new LogCapturer(toLog4j(logger));
public static LogCapturer captureLogs(Logger logger) {
return new LogCapturer(logger);
}
private LogCapturer(Logger logger) {

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.util;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -43,7 +43,7 @@ public class TestJarFinder {
public void testJar() throws Exception {
//picking a class that is for sure in a JAR in the classpath
String jar = JarFinder.getJar(LogFactory.class);
String jar = JarFinder.getJar(LoggerFactory.class);
Assert.assertTrue(new File(jar).exists());
}

View File

@ -32,9 +32,9 @@ public class TestSignalLogger {
@Test(timeout=60000)
public void testInstall() throws Exception {
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
SignalLogger.INSTANCE.register(LogAdapter.create(LOG));
SignalLogger.INSTANCE.register(LOG);
try {
SignalLogger.INSTANCE.register(LogAdapter.create(LOG));
SignalLogger.INSTANCE.register(LOG);
Assert.fail("expected IllegalStateException from double registration");
} catch (IllegalStateException e) {
// fall through

View File

@ -63,11 +63,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
@ -26,6 +27,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.hadoop.classification.VisibleForTesting;
@ -172,15 +174,18 @@ public final class RpcUtil {
*/
@ChannelHandler.Sharable
private static final class RpcUdpResponseStage extends
ChannelInboundHandlerAdapter {
SimpleChannelInboundHandler<RpcResponse> {
public RpcUdpResponseStage() {
// do not auto release the RpcResponse message.
super(false);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RpcResponse r = (RpcResponse) msg;
// TODO: check out https://github.com/netty/netty/issues/1282 for
// correct usage
ctx.channel().writeAndFlush(r.data());
protected void channelRead0(ChannelHandlerContext ctx,
RpcResponse response) throws Exception {
ByteBuf buf = Unpooled.wrappedBuffer(response.data());
ctx.writeAndFlush(new DatagramPacket(
buf, (InetSocketAddress) response.recipient()));
}
}
}

View File

@ -117,15 +117,13 @@ final class Portmap {
.childOption(ChannelOption.SO_REUSEADDR, true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
private final IdleStateHandler idleStateHandler = new IdleStateHandler(
0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
RpcUtil.STAGE_RPC_MESSAGE_PARSER, new IdleStateHandler(0, 0,
idleTimeMilliSeconds, TimeUnit.MILLISECONDS), handler,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}});

View File

@ -23,8 +23,10 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Map;
import org.apache.hadoop.oncrpc.RpcReply;
import org.junit.Assert;
import org.apache.hadoop.oncrpc.RpcCall;
@ -35,6 +37,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestPortmap {
private static Portmap pm = new Portmap();
private static final int SHORT_TIMEOUT_MILLISECONDS = 10;
@ -92,6 +96,19 @@ public class TestPortmap {
pm.getUdpServerLoAddress());
try {
s.send(p);
// verify that portmap server responds a UDF packet back to the client
byte[] receiveData = new byte[65535];
DatagramPacket receivePacket = new DatagramPacket(receiveData,
receiveData.length);
s.setSoTimeout(2000);
s.receive(receivePacket);
// verify that the registration is accepted.
XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
receivePacket.getLength()));
RpcReply reply = RpcReply.read(xdr);
assertEquals(reply.getState(), RpcReply.ReplyState.MSG_ACCEPTED);
} finally {
s.close();
}

View File

@ -61,10 +61,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>

View File

@ -133,11 +133,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>commons-io</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-daemon</groupId>
<artifactId>commons-daemon</artifactId>

View File

@ -49,10 +49,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.classification.VisibleForTesting;
@ -136,6 +137,19 @@ public class StateStoreMetrics implements StateStoreMBean {
counter.set(size);
}
/**
* set the count of the location cache access information.
* @param name Name of the record.
* @param count count of the record.
*/
public void setLocationCache(String name, long count) {
MutableGaugeLong counter = (MutableGaugeLong) registry.get(name);
if (counter == null) {
counter = registry.newGauge(name, name, count);
}
counter.set(count);
}
@VisibleForTesting
public void reset() {
reads.resetMinMax();

View File

@ -42,6 +42,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -51,6 +52,7 @@ import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
@ -97,6 +99,8 @@ public class MountTableResolver
private final TreeMap<String, MountTable> tree = new TreeMap<>();
/** Path -> Remote location. */
private final Cache<String, PathLocation> locationCache;
private final LongAdder locCacheMiss = new LongAdder();
private final LongAdder locCacheAccess = new LongAdder();
/** Default nameservice when no mount matches the math. */
private String defaultNameService = "";
@ -408,6 +412,9 @@ public class MountTableResolver
mountTable.getMountTableEntries(request);
List<MountTable> records = response.getEntries();
refreshEntries(records);
StateStoreMetrics metrics = this.getMountTableStore().getDriver().getMetrics();
metrics.setLocationCache("locationCacheMissed", this.getLocCacheMiss().sum());
metrics.setLocationCache("locationCacheAccessed", this.getLocCacheAccess().sum());
} catch (IOException e) {
LOG.error("Cannot fetch mount table entries from State Store", e);
return false;
@ -441,9 +448,12 @@ public class MountTableResolver
if (this.locationCache == null) {
res = lookupLocation(processTrashPath(path));
} else {
Callable<? extends PathLocation> meh = (Callable<PathLocation>) () ->
lookupLocation(processTrashPath(path));
Callable<? extends PathLocation> meh = (Callable<PathLocation>) () -> {
this.getLocCacheMiss().increment();
return lookupLocation(processTrashPath(path));
};
res = this.locationCache.get(processTrashPath(path), meh);
this.getLocCacheAccess().increment();
}
if (isTrashPath(path)) {
List<RemoteLocation> remoteLocations = new ArrayList<>();
@ -699,4 +709,12 @@ public class MountTableResolver
public void setDisabled(boolean disable) {
this.disabled = disable;
}
public LongAdder getLocCacheMiss() {
return locCacheMiss;
}
public LongAdder getLocCacheAccess() {
return locCacheAccess;
}
}

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@ -236,7 +238,15 @@ public class NamenodeHeartbeatService extends PeriodicService {
@Override
public void periodicInvoke() {
updateState();
try {
// Run using the login user credentials
SecurityUtil.doAsLoginUser((PrivilegedExceptionAction<Void>) () -> {
updateState();
return null;
});
} catch (IOException e) {
LOG.error("Cannot update namenode state", e);
}
}
/**

View File

@ -239,6 +239,18 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final long
FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1;
// HDFS Router-based federation State Store ZK DRIVER
public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
public static final String FEDERATION_STORE_ZK_PARENT_PATH =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
"/hdfs-federation";
public static final String FEDERATION_STORE_ZK_ASYNC_MAX_THREADS =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads";
public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
-1;
// HDFS Router safe mode
public static final String DFS_ROUTER_SAFEMODE_ENABLE =
FEDERATION_ROUTER_PREFIX + "safemode.enable";

View File

@ -333,7 +333,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
.setBindAddress(confRpcAddress.getHostName())
.setPort(confRpcAddress.getPort())
.setNumHandlers(handlerCount)
.setnumReaders(readerCount)
.setNumReaders(readerCount)
.setQueueSizePerHandler(handlerQueueSize)
.setVerbose(false)
.setAlignmentContext(routerStateIdContext)

View File

@ -73,7 +73,7 @@ public abstract class RecordStore<R extends BaseRecord> {
*
* @return State Store driver.
*/
protected StateStoreDriver getDriver() {
public StateStoreDriver getDriver() {
return this.driver;
}

View File

@ -25,7 +25,16 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.hadoop.conf.Configuration;
@ -57,14 +66,9 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
private static final Logger LOG =
LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
/** Configuration keys. */
public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
public static final String FEDERATION_STORE_ZK_PARENT_PATH =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
"/hdfs-federation";
/** Service to get/update zk state. */
private ThreadPoolExecutor executorService;
private boolean enableConcurrent;
/** Directory to store the state store data. */
@ -82,8 +86,22 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
Configuration conf = getConf();
baseZNode = conf.get(
FEDERATION_STORE_ZK_PARENT_PATH,
FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH,
RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
int numThreads = conf.getInt(
RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS,
RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT);
enableConcurrent = numThreads > 0;
if (enableConcurrent) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("StateStore ZK Client-%d")
.build();
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
LOG.info("Init StateStoreZookeeperImpl by async mode with {} threads.", numThreads);
} else {
LOG.info("Init StateStoreZookeeperImpl by sync mode.");
}
try {
this.zkManager = new ZKCuratorManager(conf);
this.zkManager.start();
@ -109,8 +127,16 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
}
}
@VisibleForTesting
public void setEnableConcurrent(boolean enableConcurrent) {
this.enableConcurrent = enableConcurrent;
}
@Override
public void close() throws Exception {
if (executorService != null) {
executorService.shutdown();
}
if (zkManager != null) {
zkManager.close();
}
@ -136,34 +162,21 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
List<T> ret = new ArrayList<>();
String znode = getZNodeForClass(clazz);
try {
List<String> children = zkManager.getChildren(znode);
for (String child : children) {
try {
String path = getNodePath(znode, child);
Stat stat = new Stat();
String data = zkManager.getStringData(path, stat);
boolean corrupted = false;
if (data == null || data.equals("")) {
// All records should have data, otherwise this is corrupted
corrupted = true;
} else {
try {
T record = createRecord(data, stat, clazz);
ret.add(record);
} catch (IOException e) {
LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
clazz.getSimpleName(), data, e.getMessage());
corrupted = true;
}
List<Callable<T>> callables = new ArrayList<>();
zkManager.getChildren(znode).forEach(c -> callables.add(() -> getRecord(clazz, znode, c)));
if (enableConcurrent) {
List<Future<T>> futures = executorService.invokeAll(callables);
for (Future<T> future : futures) {
if (future.get() != null) {
ret.add(future.get());
}
if (corrupted) {
LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
child, path);
zkManager.delete(path);
}
} else {
for (Callable<T> callable : callables) {
T record = callable.call();
if (record != null) {
ret.add(record);
}
} catch (Exception e) {
LOG.error("Cannot get data for {}: {}", child, e.getMessage());
}
}
} catch (Exception e) {
@ -178,6 +191,44 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
return new QueryResult<T>(ret, getTime());
}
/**
* Get one data record in the StateStore or delete it if it's corrupted.
*
* @param clazz Record class to evaluate.
* @param znode The ZNode for the class.
* @param child The child for znode to get.
* @return The record to get.
*/
private <T extends BaseRecord> T getRecord(Class<T> clazz, String znode, String child) {
T record = null;
try {
String path = getNodePath(znode, child);
Stat stat = new Stat();
String data = zkManager.getStringData(path, stat);
boolean corrupted = false;
if (data == null || data.equals("")) {
// All records should have data, otherwise this is corrupted
corrupted = true;
} else {
try {
record = createRecord(data, stat, clazz);
} catch (IOException e) {
LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
clazz.getSimpleName(), data, e.getMessage());
corrupted = true;
}
}
if (corrupted) {
LOG.error("Cannot get data for {} at {}, cleaning corrupted data", child, path);
zkManager.delete(path);
}
} catch (Exception e) {
LOG.error("Cannot get data for {}: {}", child, e.getMessage());
}
return record;
}
@Override
public <T extends BaseRecord> boolean putAll(
List<T> records, boolean update, boolean error) throws IOException {
@ -192,22 +243,40 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
String znode = getZNodeForClass(recordClass);
long start = monotonicNow();
boolean status = true;
for (T record : records) {
String primaryKey = getPrimaryKey(record);
String recordZNode = getNodePath(znode, primaryKey);
byte[] data = serialize(record);
if (!writeNode(recordZNode, data, update, error)){
status = false;
final AtomicBoolean status = new AtomicBoolean(true);
List<Callable<Void>> callables = new ArrayList<>();
records.forEach(record ->
callables.add(
() -> {
String primaryKey = getPrimaryKey(record);
String recordZNode = getNodePath(znode, primaryKey);
byte[] data = serialize(record);
if (!writeNode(recordZNode, data, update, error)) {
status.set(false);
}
return null;
}
)
);
try {
if (enableConcurrent) {
executorService.invokeAll(callables);
} else {
for(Callable<Void> callable : callables) {
callable.call();
}
}
} catch (Exception e) {
LOG.error("Write record failed : {}", e.getMessage(), e);
throw new IOException(e);
}
long end = monotonicNow();
if (status) {
if (status.get()) {
getMetrics().addWrite(end - start);
} else {
getMetrics().addFailure(end - start);
}
return status;
return status.get();
}
@Override

View File

@ -377,6 +377,26 @@
</description>
</property>
<property>
<name>dfs.federation.router.store.driver.zk.parent-path</name>
<value>/hdfs-federation</value>
<description>
The parent path of zookeeper for StateStoreZooKeeperImpl.
</description>
</property>
<property>
<name>dfs.federation.router.store.driver.zk.async.max.threads</name>
<value>-1</value>
<description>
Max threads number of StateStoreZooKeeperImpl in async mode.
The only class currently being supported:
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl.
Default value is -1, which means StateStoreZooKeeperImpl is working in sync mode.
Use positive integer value to enable async mode.
</description>
</property>
<property>
<name>dfs.federation.router.cache.ttl</name>
<value>1m</value>

View File

@ -390,6 +390,7 @@
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
{/DeadNodes}
</table>

View File

@ -729,4 +729,41 @@ public class TestMountTableResolver {
assertEquals("2->/testInvalidateCache/foo", mountTable
.getDestinationForPath("/testInvalidateCache/foo").toString());
}
/**
* Test location cache hit when get destination for path.
*/
@Test
public void testLocationCacheHitrate() throws Exception {
List<MountTable> entries = new ArrayList<>();
// Add entry and test location cache
Map<String, String> map1 = getMountTableEntry("1", "/testlocationcache");
MountTable entry1 = MountTable.newInstance("/testlocationcache", map1);
entries.add(entry1);
Map<String, String> map2 = getMountTableEntry("2",
"/anothertestlocationcache");
MountTable entry2 = MountTable.newInstance("/anothertestlocationcache",
map2);
entries.add(entry2);
mountTable.refreshEntries(entries);
mountTable.getLocCacheAccess().reset();
mountTable.getLocCacheMiss().reset();
assertEquals("1->/testlocationcache",
mountTable.getDestinationForPath("/testlocationcache").toString());
assertEquals("2->/anothertestlocationcache",
mountTable.getDestinationForPath("/anothertestlocationcache")
.toString());
assertEquals(2, mountTable.getLocCacheMiss().intValue());
assertEquals("1->/testlocationcache",
mountTable.getDestinationForPath("/testlocationcache").toString());
assertEquals(3, mountTable.getLocCacheAccess().intValue());
// Cleanup before exit
mountTable.removeEntry("/testlocationcache");
mountTable.removeEntry("/anothertestlocationcache");
}
}

View File

@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RP
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -36,6 +37,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.router.SecurityConfUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeCon
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.net.MockDomainNameResolver;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -318,4 +321,33 @@ public class TestRouterNamenodeHeartbeat {
return conf;
}
@Test
public void testNamenodeHeartbeatWithSecurity() throws Exception {
Configuration conf = SecurityConfUtil.initSecurity();
MiniRouterDFSCluster testCluster = null;
try {
testCluster = new MiniRouterDFSCluster(true, 1, conf);
// Start Namenodes and routers
testCluster.startCluster(conf);
testCluster.startRouters();
// Register Namenodes to generate a NamenodeStatusReport
testCluster.registerNamenodes();
testCluster.waitNamenodeRegistration();
for (MiniRouterDFSCluster.RouterContext routerContext : testCluster.getRouters()) {
ActiveNamenodeResolver resolver = routerContext.getRouter().getNamenodeResolver();
// Validate that NamenodeStatusReport has been registered
assertNotNull(resolver.getNamespaces());
assertFalse(resolver.getNamespaces().isEmpty());
}
} finally {
if (testCluster != null) {
testCluster.shutdown();
}
UserGroupInformation.reset();
SecurityConfUtil.destroy();
}
}
}

View File

@ -2054,7 +2054,7 @@ public class TestRouterRpc {
@Test
public void testMkdirsWithCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditlog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// Current callerContext is null
assertNull(CallerContext.getCurrent());
@ -2092,7 +2092,7 @@ public class TestRouterRpc {
@Test
public void testAddClientIpPortToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// 1. ClientIp and ClientPort are not set on the client.
// Set client context.
@ -2127,7 +2127,7 @@ public class TestRouterRpc {
@Test
public void testAddClientIdAndCallIdToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// 1. ClientId and ClientCallId are not set on the client.
// Set client context.

View File

@ -440,7 +440,7 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc {
@Test
public void testCallerContextWithMultiDestinations() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// set client context
CallerContext.setCurrent(

View File

@ -119,7 +119,7 @@ public class TestStateStoreDriverBase {
}
@SuppressWarnings("unchecked")
private <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass)
protected <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass)
throws IllegalArgumentException, IllegalAccessException, IOException {
if (recordClass == MembershipState.class) {

View File

@ -18,12 +18,13 @@
package org.apache.hadoop.hdfs.server.federation.store.driver;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl.FEDERATION_STORE_ZK_PARENT_PATH;
import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservic
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.util.Time;
import org.apache.zookeeper.CreateMode;
import org.junit.AfterClass;
import org.junit.Before;
@ -73,9 +75,10 @@ public class TestStateStoreZK extends TestStateStoreDriverBase {
// Disable auto-repair of connection
conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
TimeUnit.HOURS.toMillis(1));
conf.setInt(RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS, 10);
baseZNode = conf.get(FEDERATION_STORE_ZK_PARENT_PATH,
FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
baseZNode = conf.get(RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH,
RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
getStateStore(conf);
}
@ -91,6 +94,8 @@ public class TestStateStoreZK extends TestStateStoreDriverBase {
@Before
public void startup() throws IOException {
removeAll(getStateStoreDriver());
StateStoreZooKeeperImpl stateStoreZooKeeper = (StateStoreZooKeeperImpl) getStateStoreDriver();
stateStoreZooKeeper.setEnableConcurrent(false);
}
private <T extends BaseRecord> String generateFakeZNode(
@ -126,33 +131,79 @@ public class TestStateStoreZK extends TestStateStoreDriverBase {
assertNull(curatorFramework.checkExists().forPath(znode));
}
@Test
public void testAsyncPerformance() throws Exception {
StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver();
List<MountTable> insertList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
MountTable newRecord = generateFakeRecord(MountTable.class);
insertList.add(newRecord);
}
// Insert Multiple on sync mode
long startSync = Time.now();
stateStoreDriver.putAll(insertList, true, false);
long endSync = Time.now();
stateStoreDriver.removeAll(MembershipState.class);
stateStoreDriver.setEnableConcurrent(true);
// Insert Multiple on async mode
long startAsync = Time.now();
stateStoreDriver.putAll(insertList, true, false);
long endAsync = Time.now();
assertTrue((endSync - startSync) > (endAsync - startAsync));
}
@Test
public void testGetNullRecord() throws Exception {
testGetNullRecord(getStateStoreDriver());
StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver();
testGetNullRecord(stateStoreDriver);
// test async mode
stateStoreDriver.setEnableConcurrent(true);
testGetNullRecord(stateStoreDriver);
}
@Test
public void testInsert()
throws IllegalArgumentException, IllegalAccessException, IOException {
testInsert(getStateStoreDriver());
StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver();
testInsert(stateStoreDriver);
// test async mode
stateStoreDriver.setEnableConcurrent(true);
testInsert(stateStoreDriver);
}
@Test
public void testUpdate()
throws IllegalArgumentException, ReflectiveOperationException,
IOException, SecurityException {
testPut(getStateStoreDriver());
StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver();
testPut(stateStoreDriver);
// test async mode
stateStoreDriver.setEnableConcurrent(true);
testPut(stateStoreDriver);
}
@Test
public void testDelete()
throws IllegalArgumentException, IllegalAccessException, IOException {
testRemove(getStateStoreDriver());
StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver();
testRemove(stateStoreDriver);
// test async mode
stateStoreDriver.setEnableConcurrent(true);
testRemove(stateStoreDriver);
}
@Test
public void testFetchErrors()
throws IllegalArgumentException, IllegalAccessException, IOException {
testFetchErrors(getStateStoreDriver());
StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver();
testFetchErrors(stateStoreDriver);
// test async mode
stateStoreDriver.setEnableConcurrent(true);
testFetchErrors(stateStoreDriver);
}
}

View File

@ -117,11 +117,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>commons-io</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-daemon</groupId>
<artifactId>commons-daemon</artifactId>
@ -152,6 +147,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
</exclusions>
</dependency>
@ -175,11 +174,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>

View File

@ -395,12 +395,12 @@ public class BlockManager implements BlockStatsMXBean {
* The maximum number of outgoing replication streams a given node should have
* at one time considering all but the highest priority replications needed.
*/
int maxReplicationStreams;
private volatile int maxReplicationStreams;
/**
* The maximum number of outgoing replication streams a given node should have
* at one time.
*/
int replicationStreamsHardLimit;
private volatile int replicationStreamsHardLimit;
/** Minimum copies needed or else write is disallowed */
public final short minReplication;
/** Default number of replicas */
@ -409,7 +409,7 @@ public class BlockManager implements BlockStatsMXBean {
final int maxCorruptFilesReturned;
final float blocksInvalidateWorkPct;
private int blocksReplWorkMultiplier;
private volatile int blocksReplWorkMultiplier;
// whether or not to issue block encryption keys.
final boolean encryptDataTransfer;
@ -1017,12 +1017,19 @@ public class BlockManager implements BlockStatsMXBean {
*
* @param newVal - Must be a positive non-zero integer.
*/
public void setMaxReplicationStreams(int newVal) {
ensurePositiveInt(newVal,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
@VisibleForTesting
public void setMaxReplicationStreams(int newVal, boolean ensurePositiveInt) {
if (ensurePositiveInt) {
ensurePositiveInt(newVal,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
}
maxReplicationStreams = newVal;
}
public void setMaxReplicationStreams(int newVal) {
setMaxReplicationStreams(newVal, true);
}
/** Returns the current setting for maxReplicationStreamsHardLimit, set by
* {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}.
*

View File

@ -59,7 +59,7 @@ class PendingReconstructionBlocks {
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
private long timeout =
private volatile long timeout =
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT * 1000;
private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;

View File

@ -31,8 +31,6 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.metrics2.util.MBeans;
@ -58,13 +56,12 @@ public class MetricsLoggerTask implements Runnable {
}
}
private Log metricsLog;
private org.apache.log4j.Logger metricsLog;
private String nodeName;
private short maxLogLineLength;
public MetricsLoggerTask(Log metricsLog, String nodeName,
short maxLogLineLength) {
this.metricsLog = metricsLog;
public MetricsLoggerTask(String metricsLog, String nodeName, short maxLogLineLength) {
this.metricsLog = org.apache.log4j.Logger.getLogger(metricsLog);
this.nodeName = nodeName;
this.maxLogLineLength = maxLogLineLength;
}
@ -118,13 +115,8 @@ public class MetricsLoggerTask implements Runnable {
.substring(0, maxLogLineLength) + "...");
}
private static boolean hasAppenders(Log logger) {
if (!(logger instanceof Log4JLogger)) {
// Don't bother trying to determine the presence of appenders.
return true;
}
Log4JLogger log4JLogger = ((Log4JLogger) logger);
return log4JLogger.getLogger().getAllAppenders().hasMoreElements();
private static boolean hasAppenders(org.apache.log4j.Logger logger) {
return logger.getAllAppenders().hasMoreElements();
}
/**
@ -150,13 +142,8 @@ public class MetricsLoggerTask implements Runnable {
* Make the metrics logger async and add all pre-existing appenders to the
* async appender.
*/
public static void makeMetricsLoggerAsync(Log metricsLog) {
if (!(metricsLog instanceof Log4JLogger)) {
LOG.warn("Metrics logging will not be async since "
+ "the logger is not log4j");
return;
}
org.apache.log4j.Logger logger = ((Log4JLogger) metricsLog).getLogger();
public static void makeMetricsLoggerAsync(String metricsLog) {
org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(metricsLog);
logger.setAdditivity(false); // Don't pollute actual logs with metrics dump
@SuppressWarnings("unchecked")

View File

@ -679,15 +679,20 @@ class BPOfferService {
actor.reRegister();
return false;
}
writeLock();
boolean isActiveActor;
InetSocketAddress nnSocketAddress;
readLock();
try {
if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor);
} else {
return processCommandFromStandby(cmd, actor);
}
isActiveActor = (actor == bpServiceToActive);
nnSocketAddress = actor.getNNSocketAddress();
} finally {
writeUnlock();
readUnlock();
}
if (isActiveActor) {
return processCommandFromActive(cmd, nnSocketAddress);
} else {
return processCommandFromStandby(cmd, nnSocketAddress);
}
}
@ -715,7 +720,7 @@ class BPOfferService {
* @throws IOException
*/
private boolean processCommandFromActive(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
InetSocketAddress nnSocketAddress) throws IOException {
final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
final BlockIdCommand blockIdCmd =
@ -768,7 +773,7 @@ class BPOfferService {
dn.finalizeUpgradeForPool(bp);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();
String who = "NameNode at " + nnSocketAddress;
dn.getBlockRecoveryWorker().recoverBlocks(who,
((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
@ -810,11 +815,11 @@ class BPOfferService {
* DNA_REGISTER which should be handled earlier itself.
*/
private boolean processCommandFromStandby(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
InetSocketAddress nnSocketAddress) throws IOException {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE",
actor.getNNSocketAddress());
nnSocketAddress);
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(),
@ -831,11 +836,11 @@ class BPOfferService {
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.warn("Got a command from standby NN {} - ignoring command: {}",
actor.getNNSocketAddress(), cmd.getAction());
nnSocketAddress, cmd.getAction());
break;
default:
LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}",
cmd.getAction(), actor.getNNSocketAddress());
cmd.getAction(), nnSocketAddress);
}
return true;
}

View File

@ -202,10 +202,13 @@ class BPServiceActor implements Runnable {
Map<String, String> getActorInfoMap() {
final Map<String, String> info = new HashMap<String, String>();
info.put("NamenodeAddress", getNameNodeAddress());
info.put("NamenodeHaState", state != null ? state.toString() : "Unknown");
info.put("BlockPoolID", bpos.getBlockPoolId());
info.put("ActorState", getRunningState());
info.put("LastHeartbeat",
String.valueOf(getScheduler().getLastHearbeatTime()));
info.put("LastHeartbeatResponseTime",
String.valueOf(getScheduler().getLastHeartbeatResponseTime()));
info.put("LastBlockReport",
String.valueOf(getScheduler().getLastBlockReportTime()));
info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize()));
@ -579,6 +582,8 @@ class BPServiceActor implements Runnable {
slowPeers,
slowDisks);
scheduler.updateLastHeartbeatResponseTime(monotonicNow());
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextOutlierReport();
@ -697,6 +702,8 @@ class BPServiceActor implements Runnable {
// Every so often, send heartbeat or block-report
//
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
LOG.debug("BP offer service run start time: {}, sendHeartbeat: {}", startTime,
sendHeartbeat);
HeartbeatResponse resp = null;
if (sendHeartbeat) {
//
@ -709,6 +716,8 @@ class BPServiceActor implements Runnable {
boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
scheduler.isBlockReportDue(startTime);
if (!dn.areHeartbeatsDisabledForTests()) {
LOG.debug("Before sending heartbeat to namenode {}, the state of the namenode known"
+ " to datanode so far is {}", this.getNameNodeAddress(), state);
resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
if (resp.getFullBlockReportLeaseId() != 0) {
@ -733,7 +742,12 @@ class BPServiceActor implements Runnable {
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
HAServiceState stateFromResp = resp.getNameNodeHaState().getState();
if (state != stateFromResp) {
LOG.info("After receiving heartbeat response, updating state of namenode {} to {}",
this.getNameNodeAddress(), stateFromResp);
}
state = stateFromResp;
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
@ -794,6 +808,7 @@ class BPServiceActor implements Runnable {
long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
LOG.info("BPServiceActor {} is interrupted", this);
Thread.currentThread().interrupt();
}
}
@ -995,6 +1010,8 @@ class BPServiceActor implements Runnable {
while (!duplicateQueue.isEmpty()) {
BPServiceActorAction actionItem = duplicateQueue.remove();
try {
LOG.debug("BPServiceActor ( {} ) processing queued messages. Action item: {}", this,
actionItem);
actionItem.reportTo(bpNamenode, bpRegistration);
} catch (BPServiceActorActionException baae) {
LOG.warn(baae.getMessage() + nnAddr , baae);
@ -1189,6 +1206,9 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
volatile long lastHeartbeatTime = monotonicNow();
@VisibleForTesting
private volatile long lastHeartbeatResponseTime = -1;
@VisibleForTesting
boolean resetBlockReportTime = true;
@ -1237,6 +1257,10 @@ class BPServiceActor implements Runnable {
lastHeartbeatTime = heartbeatTime;
}
void updateLastHeartbeatResponseTime(long heartbeatTime) {
this.lastHeartbeatResponseTime = heartbeatTime;
}
void updateLastBlockReportTime(long blockReportTime) {
lastBlockReportTime = blockReportTime;
}
@ -1249,6 +1273,10 @@ class BPServiceActor implements Runnable {
return (monotonicNow() - lastHeartbeatTime)/1000;
}
private long getLastHeartbeatResponseTime() {
return (monotonicNow() - lastHeartbeatResponseTime) / 1000;
}
long getLastBlockReportTime() {
return (monotonicNow() - lastBlockReportTime)/1000;
}
@ -1471,7 +1499,7 @@ class BPServiceActor implements Runnable {
dn.getMetrics().addNumProcessedCommands(processCommandsMs);
}
if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
LOG.info("Took {} ms to process {} commands from NN",
LOG.warn("Took {} ms to process {} commands from NN",
processCommandsMs, cmds.length);
}
}

View File

@ -35,7 +35,6 @@ import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.StorageType;
@ -73,7 +72,7 @@ import org.slf4j.Logger;
**/
class BlockReceiver implements Closeable {
public static final Logger LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
static final Logger CLIENT_TRACE_LOG = DataNode.CLIENT_TRACE_LOG;
@VisibleForTesting
static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
@ -1402,7 +1401,7 @@ class BlockReceiver implements Closeable {
public void run() {
datanode.metrics.incrDataNodePacketResponderCount();
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
final long startTime = CLIENT_TRACE_LOG.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
@ -1557,7 +1556,7 @@ class BlockReceiver implements Closeable {
// Hold a volume reference to finalize block.
try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler()) {
BlockReceiver.this.close();
endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
endTime = CLIENT_TRACE_LOG.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block, dirSyncOnFinalize);
}
@ -1568,11 +1567,11 @@ class BlockReceiver implements Closeable {
datanode.closeBlock(block, null, replicaInfo.getStorageUuid(),
replicaInfo.isOnTransientStorage());
if (ClientTraceLog.isInfoEnabled() && isClient) {
if (CLIENT_TRACE_LOG.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
CLIENT_TRACE_LOG.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
myAddr, replicaInfo.getVolume(), block.getNumBytes(),
"HDFS_WRITE", clientname, offset, dnR.getDatanodeUuid(),
block, endTime - startTime));

View File

@ -32,7 +32,6 @@ import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.hdfs.DFSUtilClient;
@ -103,7 +102,7 @@ import org.slf4j.Logger;
*/
class BlockSender implements java.io.Closeable {
static final Logger LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
static final Logger CLIENT_TRACE_LOG = DataNode.CLIENT_TRACE_LOG;
private static final boolean is32Bit =
System.getProperty("sun.arch.data.model").equals("32");
/**
@ -784,7 +783,7 @@ class BlockSender implements java.io.Closeable {
// Trigger readahead of beginning of file if configured.
manageOsCache();
final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
final long startTime = CLIENT_TRACE_LOG.isDebugEnabled() ? System.nanoTime() : 0;
try {
int maxChunksPerPacket;
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
@ -831,9 +830,9 @@ class BlockSender implements java.io.Closeable {
sentEntireByteRange = true;
}
} finally {
if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
if ((clientTraceFmt != null) && CLIENT_TRACE_LOG.isDebugEnabled()) {
final long endTime = System.nanoTime();
ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
CLIENT_TRACE_LOG.debug(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
close();

View File

@ -140,8 +140,6 @@ import javax.annotation.Nullable;
import javax.management.ObjectName;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -319,8 +317,8 @@ public class DataNode extends ReconfigurableBase
", blockid: %s" + // block id
", duration(ns): %s"; // duration time
static final Log ClientTraceLog =
LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
static final Logger CLIENT_TRACE_LOG =
LoggerFactory.getLogger(DataNode.class.getName() + ".clienttrace");
private static final String USAGE =
"Usage: hdfs datanode [-regular | -rollback | -rollingupgrade rollback" +
@ -360,7 +358,7 @@ public class DataNode extends ReconfigurableBase
FS_GETSPACEUSED_JITTER_KEY,
FS_GETSPACEUSED_CLASSNAME));
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";
private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
private final FileIoProvider fileIoProvider;
@ -3621,8 +3619,12 @@ public class DataNode extends ReconfigurableBase
*/
@Override // DataNodeMXBean
public String getBPServiceActorInfo() {
final ArrayList<Map<String, String>> infoArray =
new ArrayList<Map<String, String>>();
return JSON.toString(getBPServiceActorInfoMap());
}
@VisibleForTesting
public List<Map<String, String>> getBPServiceActorInfoMap() {
final List<Map<String, String>> infoArray = new ArrayList<>();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
@ -3630,7 +3632,7 @@ public class DataNode extends ReconfigurableBase
}
}
}
return JSON.toString(infoArray);
return infoArray;
}
/**
@ -3825,6 +3827,29 @@ public class DataNode extends ReconfigurableBase
* @return true - if the data node is fully started
*/
public boolean isDatanodeFullyStarted() {
return isDatanodeFullyStarted(false);
}
/**
* A datanode is considered to be fully started if all the BP threads are
* alive and all the block pools are initialized. If checkConnectionToActiveNamenode is true,
* the datanode is considered to be fully started if it is also heartbeating to
* active namenode in addition to the above-mentioned conditions.
*
* @param checkConnectionToActiveNamenode if true, performs additional check of whether datanode
* is heartbeating to active namenode.
* @return true if the datanode is fully started and also conditionally connected to active
* namenode, false otherwise.
*/
public boolean isDatanodeFullyStarted(boolean checkConnectionToActiveNamenode) {
if (checkConnectionToActiveNamenode) {
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
if (!bp.isInitialized() || !bp.isAlive() || bp.getActiveNN() == null) {
return false;
}
}
return true;
}
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
if (!bp.isInitialized() || !bp.isAlive()) {
return false;
@ -4033,12 +4058,12 @@ public class DataNode extends ReconfigurableBase
return;
}
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG);
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG_NAME);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG,
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG_NAME,
"DataNode", (short) 0), metricsLoggerPeriodSec, metricsLoggerPeriodSec,
TimeUnit.SECONDS);
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
@ -29,11 +27,14 @@ import java.util.HashMap;
import java.util.Stack;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class for maintain a set of lock for fsDataSetImpl.
*/
public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetLock> {
public static final Log LOG = LogFactory.getLog(DataSetLockManager.class);
public static final Logger LOG = LoggerFactory.getLogger(DataSetLockManager.class);
private final HashMap<String, TrackLog> threadCountMap = new HashMap<>();
private final LockMap lockMap = new LockMap();
private boolean isFair = true;

View File

@ -21,7 +21,6 @@ import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
@ -105,7 +104,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
*/
class DataXceiver extends Receiver implements Runnable {
public static final Logger LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
static final Logger CLIENT_TRACE_LOG = DataNode.CLIENT_TRACE_LOG;
private Peer peer;
private final String remoteAddress; // address of remote side
@ -426,10 +425,10 @@ class DataXceiver extends Receiver implements Runnable {
registeredSlotId);
datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
}
if (ClientTraceLog.isInfoEnabled()) {
if (CLIENT_TRACE_LOG.isInfoEnabled()) {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format(
BlockSender.CLIENT_TRACE_LOG.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
" blockid: %s, srvID: %s, success: %b",
blk.getBlockId(), dnR.getDatanodeUuid(), success));
@ -466,8 +465,8 @@ class DataXceiver extends Receiver implements Runnable {
bld.build().writeDelimitedTo(socketOut);
success = true;
} finally {
if (ClientTraceLog.isInfoEnabled()) {
BlockSender.ClientTraceLog.info(String.format(
if (CLIENT_TRACE_LOG.isInfoEnabled()) {
BlockSender.CLIENT_TRACE_LOG.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS," +
" shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b",
slotId.getShmId().getHi(), slotId.getShmId().getLo(),
@ -526,9 +525,9 @@ class DataXceiver extends Receiver implements Runnable {
sendShmSuccessResponse(sock, shmInfo);
success = true;
} finally {
if (ClientTraceLog.isInfoEnabled()) {
if (CLIENT_TRACE_LOG.isInfoEnabled()) {
if (success) {
BlockSender.ClientTraceLog.info(String.format(
BlockSender.CLIENT_TRACE_LOG.info(String.format(
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
"op: REQUEST_SHORT_CIRCUIT_SHM," +
" shmId: %016x%016x, srvID: %s, success: true",
@ -536,7 +535,7 @@ class DataXceiver extends Receiver implements Runnable {
shmInfo.getShmId().getLo(),
datanode.getDatanodeUuid()));
} else {
BlockSender.ClientTraceLog.info(String.format(
BlockSender.CLIENT_TRACE_LOG.info(String.format(
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
"op: REQUEST_SHORT_CIRCUIT_SHM, " +
"shmId: n/a, srvID: %s, success: false",
@ -587,13 +586,10 @@ class DataXceiver extends Receiver implements Runnable {
BlockSender blockSender = null;
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
"", "%d", "HDFS_READ", clientName, "%d",
dnR.getDatanodeUuid(), block, "%d")
: dnR + " Served block " + block + " to " +
remoteAddress;
final String clientTraceFmt = clientName.length() > 0 && CLIENT_TRACE_LOG.isInfoEnabled() ?
String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress, "", "%d", "HDFS_READ",
clientName, "%d", dnR.getDatanodeUuid(), block, "%d") :
dnR + " Served block " + block + " to " + remoteAddress;
try {
try {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.ipc.RemoteException;
@ -84,4 +85,12 @@ public class ErrorReportAction implements BPServiceActorAction {
}
return true;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("errorCode", errorCode)
.append("errorMessage", errorMessage)
.toString();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
@ -111,4 +112,13 @@ public class ReportBadBlockAction implements BPServiceActorAction {
}
return true;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("block", block)
.append("storageUuid", storageUuid)
.append("storageType", storageType)
.toString();
}
}

View File

@ -1557,7 +1557,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b);
LOG.info("Recover failed close {}, new GS:{}, expectedBlockLen:{}",
b, newGS, expectedBlockLen);
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,

View File

@ -84,9 +84,9 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp;
if (shouldSyncWritesAndSkipFsync) {
rp = new RandomAccessFile(name, "rw");
rp = new RandomAccessFile(name, "rwd");
} else {
rp = new RandomAccessFile(name, "rws");
rp = new RandomAccessFile(name, "rw");
}
try {
fp = new FileOutputStream(rp.getFD()); // open for append

View File

@ -185,9 +185,6 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -405,7 +402,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final String contextFieldSeparator;
boolean isAuditEnabled() {
return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
return (!isDefaultAuditLogger || AUDIT_LOG.isInfoEnabled())
&& !auditLoggers.isEmpty();
}
@ -491,8 +488,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* perm=&lt;permissions (optional)&gt;
* </code>
*/
public static final Log auditLog = LogFactory.getLog(
FSNamesystem.class.getName() + ".audit");
public static final Logger AUDIT_LOG = Logger.getLogger(FSNamesystem.class.getName() + ".audit");
private final int maxCorruptFileBlocksReturn;
private final boolean isPermissionEnabled;
@ -5943,6 +5939,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// Ensure we record the new generation stamp
getEditLog().logSync();
LOG.info("bumpBlockGenerationStamp({}, client={}) success",
locatedBlock.getBlock(), clientName);
return locatedBlock;
}
@ -8783,8 +8781,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
FileStatus status, CallerContext callerContext, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isDebugEnabled() ||
(auditLog.isInfoEnabled() && !debugCmdSet.contains(cmd))) {
if (AUDIT_LOG.isDebugEnabled() ||
(AUDIT_LOG.isInfoEnabled() && !debugCmdSet.contains(cmd))) {
final StringBuilder sb = STRING_BUILDER.get();
src = escapeJava(src);
dst = escapeJava(dst);
@ -8853,16 +8851,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
public void logAuditMessage(String message) {
auditLog.info(message);
AUDIT_LOG.info(message);
}
}
private static void enableAsyncAuditLog(Configuration conf) {
if (!(auditLog instanceof Log4JLogger)) {
LOG.warn("Log4j is required to enable async auditlog");
return;
}
Logger logger = ((Log4JLogger)auditLog).getLogger();
Logger logger = AUDIT_LOG;
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -125,15 +122,10 @@ public class FsImageValidation {
}
static void setLogLevel(Class<?> clazz, Level level) {
final Log log = LogFactory.getLog(clazz);
if (log instanceof Log4JLogger) {
final org.apache.log4j.Logger logger = ((Log4JLogger) log).getLogger();
logger.setLevel(level);
LOG.info("setLogLevel {} to {}, getEffectiveLevel() = {}",
clazz.getName(), level, logger.getEffectiveLevel());
} else {
LOG.warn("Failed setLogLevel {} to {}", clazz.getName(), level);
}
final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(clazz);
logger.setLevel(level);
LOG.info("setLogLevel {} to {}, getEffectiveLevel() = {}", clazz.getName(), level,
logger.getEffectiveLevel());
}
static String toCommaSeparatedNumber(long n) {

View File

@ -25,8 +25,6 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -427,8 +425,7 @@ public class NameNode extends ReconfigurableBase implements
private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace.";
public static final Log MetricsLog =
LogFactory.getLog("NameNodeMetricsLog");
public static final String METRICS_LOG_NAME = "NameNodeMetricsLog";
protected FSNamesystem namesystem;
protected final NamenodeRole role;
@ -949,13 +946,13 @@ public class NameNode extends ReconfigurableBase implements
return;
}
MetricsLoggerTask.makeMetricsLoggerAsync(MetricsLog);
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG_NAME);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
false);
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(MetricsLog,
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG_NAME,
"NameNode", (short) 128),
metricsLoggerPeriodSec,
metricsLoggerPeriodSec,

View File

@ -2721,10 +2721,10 @@
<description>
Specifies whether to flush edit log file channel. When set, expensive
FileChannel#force calls are skipped and synchronous disk writes are
enabled instead by opening the edit log file with RandomAccessFile("rws")
enabled instead by opening the edit log file with RandomAccessFile("rwd")
flags. This can significantly improve the performance of edit log writes
on the Windows platform.
Note that the behavior of the "rws" flags is platform and hardware specific
Note that the behavior of the "rwd" flags is platform and hardware specific
and might not provide the same level of guarantees as FileChannel#force.
For example, the write will skip the disk-cache on SAS and SCSI devices
while it might not on SATA devices. This is an expert level setting,

View File

@ -81,9 +81,11 @@
<thead>
<tr>
<th>Namenode Address</th>
<th>Namenode HA State</th>
<th>Block Pool ID</th>
<th>Actor State</th>
<th>Last Heartbeat</th>
<th>Last Heartbeat Sent</th>
<th>Last Heartbeat Response</th>
<th>Last Block Report</th>
<th>Last Block Report Size (Max Size)</th>
</tr>
@ -91,9 +93,11 @@
{#dn.BPServiceActorInfo}
<tr>
<td>{NamenodeAddress}</td>
<td>{NamenodeHaState}</td>
<td>{BlockPoolID}</td>
<td>{ActorState}</td>
<td>{LastHeartbeat}s</td>
<td>{LastHeartbeatResponseTime}s</td>
<td>{#helper_relative_time value="{LastBlockReport}"/}</td>
<td>{maxBlockReportSize|fmt_bytes} ({maxDataLength|fmt_bytes})</td>
</tr>

View File

@ -361,6 +361,7 @@
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
{/DeadNodes}
</table>

View File

@ -36,11 +36,14 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
/**
@ -52,6 +55,9 @@ abstract public class TestSymlinkHdfs extends SymlinkBaseTest {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.TRACE);
}
private static final Logger LOG = LoggerFactory.getLogger(
TestSymlinkHdfs.class);
protected static MiniDFSCluster cluster;
protected static WebHdfsFileSystem webhdfs;
protected static DistributedFileSystem dfs;
@ -99,6 +105,7 @@ abstract public class TestSymlinkHdfs extends SymlinkBaseTest {
if (cluster != null) {
cluster.shutdown();
}
IOUtils.cleanupWithLogger(LOG, webhdfs);
}
@Test(timeout=10000)

View File

@ -2529,6 +2529,24 @@ public class MiniDFSCluster implements AutoCloseable {
return restartDataNode(dnprop, false);
}
/**
* Wait for the datanode to be fully functional i.e. all the BP service threads are alive,
* all block pools initiated and also connected to active namenode.
*
* @param dn Datanode instance.
* @param timeout Timeout in millis until when we should wait for datanode to be fully
* operational.
* @throws InterruptedException If the thread wait is interrupted.
* @throws TimeoutException If times out while awaiting the fully operational capability of
* datanode.
*/
public void waitDatanodeConnectedToActive(DataNode dn, int timeout)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> dn.isDatanodeFullyStarted(true),
100, timeout, "Datanode is not connected to active namenode even after "
+ timeout + " ms of waiting");
}
public void waitDatanodeFullyStarted(DataNode dn, int timeout)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(dn::isDatanodeFullyStarted, 100, timeout,

View File

@ -21,8 +21,6 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -41,6 +39,8 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.*;
@ -52,7 +52,7 @@ import static org.junit.Assert.*;
*/
@RunWith(Parameterized.class)
public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
public static final Log LOG = LogFactory.getLog(TestPermission.class);
public static final Logger LOG = LoggerFactory.getLogger(TestPermission.class);
private HdfsConfiguration conf;
private MiniDFSCluster cluster;

View File

@ -190,7 +190,7 @@ public class TestDFSRename {
Path path = new Path("/test");
dfs.mkdirs(path);
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
dfs.rename(path, new Path("/dir1"),
new Rename[] {Rename.OVERWRITE, Rename.TO_TRASH});
String auditOut = auditLog.getOutput();

View File

@ -32,7 +32,7 @@ public class TestFileLengthOnClusterRestart {
* Tests the fileLength when we sync the file and restart the cluster and
* Datanodes not report to Namenode yet.
*/
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister()
throws Exception {
final Configuration conf = new HdfsConfiguration();

Some files were not shown because too many files have changed in this diff Show More