Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-26 11:09:03 -07:00
commit 7060502edb
29 changed files with 1194 additions and 130 deletions

View File

@ -746,6 +746,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11040. Return value of read(ByteBuffer buf) in CryptoInputStream is
incorrect in some cases. (Yi Liu via wang)
HADOOP-11140. hadoop-aws only need test-scoped dependency on
hadoop-common's tests jar. (Juan Yu via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HADOOP-10734. Implement high-performance secure random number sources.

View File

@ -296,6 +296,10 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.tracing\.TraceAdminPB.*">
</Match>
<!--
Manually checked, misses child thread manually syncing on parent's intrinsic lock.

View File

@ -348,6 +348,7 @@
<include>ProtobufRpcEngine.proto</include>
<include>Security.proto</include>
<include>GetUserMappingsProtocol.proto</include>
<include>TraceAdmin.proto</include>
<include>RefreshAuthorizationPolicyProtocol.proto</include>
<include>RefreshUserMappingsProtocol.proto</include>
<include>RefreshCallQueueProtocol.proto</include>

View File

@ -35,6 +35,7 @@ function hadoop_usage()
echo " jar <jar> run a jar file"
echo " jnipath prints the java.library.path"
echo " key manage keys via the KeyProvider"
echo " trace view and modify Hadoop tracing settings"
echo " version print the version"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
@ -159,6 +160,9 @@ case ${COMMAND} in
key)
CLASS=org.apache.hadoop.crypto.key.KeyShell
;;
trace)
CLASS=org.apache.hadoop.tracing.TraceAdmin
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
;;

View File

@ -152,6 +152,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH =
"security.refresh.generic.protocol.acl";
public static final String
HADOOP_SECURITY_SERVICE_AUTHORIZATION_TRACING =
"security.trace.protocol.acl";
public static final String
SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
public static final String
SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl";

View File

@ -18,18 +18,24 @@
package org.apache.hadoop.tracing;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.htrace.HTraceConfiguration;
import org.htrace.SpanReceiver;
import org.htrace.Trace;
/**
* This class provides functions for reading the names of SpanReceivers from
* the Hadoop configuration, adding those SpanReceivers to the Tracer,
@ -37,11 +43,15 @@ import org.htrace.Trace;
* This class does nothing If no SpanReceiver is configured.
*/
@InterfaceAudience.Private
public class SpanReceiverHost {
public static final String SPAN_RECEIVERS_CONF_KEY = "hadoop.trace.spanreceiver.classes";
public class SpanReceiverHost implements TraceAdminProtocol {
public static final String SPAN_RECEIVERS_CONF_KEY =
"hadoop.trace.spanreceiver.classes";
private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
private Collection<SpanReceiver> receivers = new HashSet<SpanReceiver>();
private final TreeMap<Long, SpanReceiver> receivers =
new TreeMap<Long, SpanReceiver>();
private Configuration config;
private boolean closed = false;
private long highestId = 1;
private static enum SingletonHolder {
INSTANCE;
@ -69,6 +79,8 @@ public class SpanReceiverHost {
}
}
private static List<ConfigurationPair> EMPTY = Collections.emptyList();
/**
* Reads the names of classes specified in the
* "hadoop.trace.spanreceiver.classes" property and instantiates and registers
@ -79,58 +91,75 @@ public class SpanReceiverHost {
* called on them. This allows SpanReceivers to use values from the Hadoop
* configuration.
*/
public void loadSpanReceivers(Configuration conf) {
Class<?> implClass = null;
String[] receiverNames = conf.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY);
public synchronized void loadSpanReceivers(Configuration conf) {
config = new Configuration(conf);
String[] receiverNames =
config.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY);
if (receiverNames == null || receiverNames.length == 0) {
return;
}
for (String className : receiverNames) {
className = className.trim();
try {
implClass = Class.forName(className);
receivers.add(loadInstance(implClass, conf));
LOG.info("SpanReceiver " + className + " was loaded successfully.");
} catch (ClassNotFoundException e) {
LOG.warn("Class " + className + " cannot be found.", e);
} catch (IOException e) {
LOG.warn("Load SpanReceiver " + className + " failed.", e);
}
}
for (SpanReceiver rcvr : receivers) {
SpanReceiver rcvr = loadInstance(className, EMPTY);
Trace.addReceiver(rcvr);
receivers.put(highestId++, rcvr);
LOG.info("SpanReceiver " + className + " was loaded successfully.");
} catch (IOException e) {
LOG.error("Failed to load SpanReceiver", e);
}
}
}
private SpanReceiver loadInstance(Class<?> implClass, Configuration conf)
throws IOException {
private synchronized SpanReceiver loadInstance(String className,
List<ConfigurationPair> extraConfig) throws IOException {
Class<?> implClass = null;
SpanReceiver impl;
try {
Object o = ReflectionUtils.newInstance(implClass, conf);
implClass = Class.forName(className);
Object o = ReflectionUtils.newInstance(implClass, config);
impl = (SpanReceiver)o;
impl.configure(wrapHadoopConf(conf));
impl.configure(wrapHadoopConf(config, extraConfig));
} catch (ClassCastException e) {
throw new IOException("Class " + className +
" does not implement SpanReceiver.");
} catch (ClassNotFoundException e) {
throw new IOException("Class " + className + " cannot be found.");
} catch (SecurityException e) {
throw new IOException(e);
throw new IOException("Got SecurityException while loading " +
"SpanReceiver " + className);
} catch (IllegalArgumentException e) {
throw new IOException(e);
throw new IOException("Got IllegalArgumentException while loading " +
"SpanReceiver " + className, e);
} catch (RuntimeException e) {
throw new IOException(e);
throw new IOException("Got RuntimeException while loading " +
"SpanReceiver " + className, e);
}
return impl;
}
private static HTraceConfiguration wrapHadoopConf(final Configuration conf) {
private static HTraceConfiguration wrapHadoopConf(final Configuration conf,
List<ConfigurationPair> extraConfig) {
final HashMap<String, String> extraMap = new HashMap<String, String>();
for (ConfigurationPair pair : extraConfig) {
extraMap.put(pair.getKey(), pair.getValue());
}
return new HTraceConfiguration() {
public static final String HTRACE_CONF_PREFIX = "hadoop.";
@Override
public String get(String key) {
if (extraMap.containsKey(key)) {
return extraMap.get(key);
}
return conf.get(HTRACE_CONF_PREFIX + key);
}
@Override
public String get(String key, String defaultValue) {
if (extraMap.containsKey(key)) {
return extraMap.get(key);
}
return conf.get(HTRACE_CONF_PREFIX + key, defaultValue);
}
};
@ -142,12 +171,67 @@ public class SpanReceiverHost {
public synchronized void closeReceivers() {
if (closed) return;
closed = true;
for (SpanReceiver rcvr : receivers) {
for (SpanReceiver rcvr : receivers.values()) {
try {
rcvr.close();
} catch (IOException e) {
LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e);
}
}
receivers.clear();
}
public synchronized SpanReceiverInfo[] listSpanReceivers()
throws IOException {
SpanReceiverInfo info[] = new SpanReceiverInfo[receivers.size()];
int i = 0;
for(Map.Entry<Long, SpanReceiver> entry : receivers.entrySet()) {
info[i] = new SpanReceiverInfo(entry.getKey(),
entry.getValue().getClass().getName());
i++;
}
return info;
}
public synchronized long addSpanReceiver(SpanReceiverInfo info)
throws IOException {
StringBuilder configStringBuilder = new StringBuilder();
String prefix = "";
for (ConfigurationPair pair : info.configPairs) {
configStringBuilder.append(prefix).append(pair.getKey()).
append(" = ").append(pair.getValue());
prefix = ", ";
}
SpanReceiver rcvr = null;
try {
rcvr = loadInstance(info.getClassName(), info.configPairs);
} catch (IOException e) {
LOG.info("Failed to add SpanReceiver " + info.getClassName() +
" with configuration " + configStringBuilder.toString(), e);
throw e;
} catch (RuntimeException e) {
LOG.info("Failed to add SpanReceiver " + info.getClassName() +
" with configuration " + configStringBuilder.toString(), e);
throw e;
}
Trace.addReceiver(rcvr);
long newId = highestId++;
receivers.put(newId, rcvr);
LOG.info("Successfully added SpanReceiver " + info.getClassName() +
" with configuration " + configStringBuilder.toString());
return newId;
}
public synchronized void removeSpanReceiver(long spanReceiverId)
throws IOException {
SpanReceiver rcvr = receivers.remove(spanReceiverId);
if (rcvr == null) {
throw new IOException("There is no span receiver with id " + spanReceiverId);
}
Trace.removeReceiver(rcvr);
rcvr.close();
LOG.info("Successfully removed SpanReceiver " + spanReceiverId +
" with class " + rcvr.getClass().getName());
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SpanReceiverInfo {
private final long id;
private final String className;
final List<ConfigurationPair> configPairs =
new LinkedList<ConfigurationPair>();
static class ConfigurationPair {
private final String key;
private final String value;
ConfigurationPair(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
}
SpanReceiverInfo(long id, String className) {
this.id = id;
this.className = className;
}
public long getId() {
return id;
}
public String getClassName() {
return className;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SpanReceiverInfoBuilder {
private SpanReceiverInfo info;
public SpanReceiverInfoBuilder(String className) {
info = new SpanReceiverInfo(0, className);
}
public void addConfigurationPair(String key, String value) {
info.configPairs.add(new ConfigurationPair(key, value));
}
public SpanReceiverInfo build() {
SpanReceiverInfo ret = info;
info = null;
return ret;
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.TableListing;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
/**
* A command-line tool for viewing and modifying tracing settings.
*/
@InterfaceAudience.Private
public class TraceAdmin extends Configured implements Tool {
private TraceAdminProtocolPB proxy;
private TraceAdminProtocolTranslatorPB remote;
private void usage() {
PrintStream err = System.err;
err.print(
"Hadoop tracing configuration commands:\n" +
" -add [-class classname] [-Ckey=value] [-Ckey2=value2] ...\n" +
" Add a span receiver with the provided class name. Configuration\n" +
" keys for the span receiver can be specified with the -C options.\n" +
" The span receiver will also inherit whatever configuration keys\n" +
" exist in the daemon's configuration.\n" +
" -help: Print this help message.\n" +
" -host [hostname:port]\n" +
" Specify the hostname and port of the daemon to examine.\n" +
" Required for all commands.\n" +
" -list: List the current span receivers.\n" +
" -remove [id]\n" +
" Remove the span receiver with the specified id. Use -list to\n" +
" find the id of each receiver.\n"
);
}
private int listSpanReceivers(List<String> args) throws IOException {
SpanReceiverInfo infos[] = remote.listSpanReceivers();
if (infos.length == 0) {
System.out.println("[no span receivers found]");
return 0;
}
TableListing listing = new TableListing.Builder().
addField("ID").
addField("CLASS").
showHeaders().
build();
for (SpanReceiverInfo info : infos) {
listing.addRow("" + info.getId(), info.getClassName());
}
System.out.println(listing.toString());
return 0;
}
private final static String CONFIG_PREFIX = "-C";
private int addSpanReceiver(List<String> args) throws IOException {
String className = StringUtils.popOptionWithArgument("-class", args);
if (className == null) {
System.err.println("You must specify the classname with -class.");
return 1;
}
ByteArrayOutputStream configStream = new ByteArrayOutputStream();
PrintStream configsOut = new PrintStream(configStream);
SpanReceiverInfoBuilder factory = new SpanReceiverInfoBuilder(className);
String prefix = "";
for (int i = 0; i < args.size(); ++i) {
String str = args.get(i);
if (!str.startsWith(CONFIG_PREFIX)) {
System.err.println("Can't understand argument: " + str);
return 1;
}
str = str.substring(CONFIG_PREFIX.length());
int equalsIndex = str.indexOf("=");
if (equalsIndex < 0) {
System.err.println("Can't parse configuration argument " + str);
System.err.println("Arguments must be in the form key=value");
return 1;
}
String key = str.substring(0, equalsIndex);
String value = str.substring(equalsIndex + 1);
factory.addConfigurationPair(key, value);
configsOut.print(prefix + key + " = " + value);
prefix = ", ";
}
try {
long id = remote.addSpanReceiver(factory.build());
System.out.println("Added trace span receiver " + id +
" with configuration " + configStream.toString());
} catch (IOException e) {
System.out.println("addSpanReceiver error with configuration " +
configStream.toString());
throw e;
}
return 0;
}
private int removeSpanReceiver(List<String> args) throws IOException {
String indexStr = StringUtils.popFirstNonOption(args);
long id = -1;
try {
id = Long.parseLong(indexStr);
} catch (NumberFormatException e) {
System.err.println("Failed to parse ID string " +
indexStr + ": " + e.getMessage());
return 1;
}
remote.removeSpanReceiver(id);
System.err.println("Removed trace span receiver " + id);
return 0;
}
@Override
public int run(String argv[]) throws Exception {
LinkedList<String> args = new LinkedList<String>();
for (String arg : argv) {
args.add(arg);
}
if (StringUtils.popOption("-h", args) ||
StringUtils.popOption("-help", args)) {
usage();
return 0;
} else if (args.size() == 0) {
usage();
return 0;
}
String hostPort = StringUtils.popOptionWithArgument("-host", args);
if (hostPort == null) {
System.err.println("You must specify a host with -host.");
return 1;
}
if (args.size() < 0) {
System.err.println("You must specify an operation.");
return 1;
}
RPC.setProtocolEngine(getConf(), TraceAdminProtocolPB.class,
ProtobufRpcEngine.class);
InetSocketAddress address = NetUtils.createSocketAddr(hostPort);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Class<?> xface = TraceAdminProtocolPB.class;
proxy = (TraceAdminProtocolPB)RPC.getProxy(xface,
RPC.getProtocolVersion(xface), address,
ugi, getConf(), NetUtils.getDefaultSocketFactory(getConf()), 0);
remote = new TraceAdminProtocolTranslatorPB(proxy);
try {
if (args.get(0).equals("-list")) {
return listSpanReceivers(args.subList(1, args.size()));
} else if (args.get(0).equals("-add")) {
return addSpanReceiver(args.subList(1, args.size()));
} else if (args.get(0).equals("-remove")) {
return removeSpanReceiver(args.subList(1, args.size()));
} else {
System.err.println("Unrecognized tracing command: " + args.get(0));
System.err.println("Use -help for help.");
return 1;
}
} finally {
remote.close();
}
}
public static void main(String[] argv) throws Exception {
TraceAdmin admin = new TraceAdmin();
admin.setConf(new Configuration());
System.exit(admin.run(argv));
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol interface that provides tracing.
*/
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TraceAdminProtocol {
public static final long versionID = 1L;
/**
* List the currently active trace span receivers.
*
* @throws IOException On error.
*/
@Idempotent
public SpanReceiverInfo[] listSpanReceivers() throws IOException;
/**
* Add a new trace span receiver.
*
* @param desc The span receiver description.
* @return The ID of the new trace span receiver.
*
* @throws IOException On error.
*/
@AtMostOnce
public long addSpanReceiver(SpanReceiverInfo desc) throws IOException;
/**
* Remove a trace span receiver.
*
* @param spanReceiverId The id of the span receiver to remove.
* @throws IOException On error.
*/
@AtMostOnce
public void removeSpanReceiver(long spanReceiverId) throws IOException;
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService",
protocolVersion = 1)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TraceAdminProtocolPB extends
TraceAdminService.BlockingInterface, VersionedProtocol {
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ConfigPair;
import org.apache.hadoop.tracing.TraceAdminPB.RemoveSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.RemoveSpanReceiverResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
public class TraceAdminProtocolServerSideTranslatorPB
implements TraceAdminProtocolPB, Closeable {
private final TraceAdminProtocol server;
public TraceAdminProtocolServerSideTranslatorPB(TraceAdminProtocol server) {
this.server = server;
}
@Override
public void close() throws IOException {
RPC.stopProxy(server);
}
@Override
public ListSpanReceiversResponseProto listSpanReceivers(
RpcController controller, ListSpanReceiversRequestProto req)
throws ServiceException {
try {
SpanReceiverInfo[] descs = server.listSpanReceivers();
ListSpanReceiversResponseProto.Builder bld =
ListSpanReceiversResponseProto.newBuilder();
for (int i = 0; i < descs.length; ++i) {
bld.addDescriptions(TraceAdminPB.SpanReceiverListInfo.newBuilder().
setId(descs[i].getId()).
setClassName(descs[i].getClassName()).build());
}
return bld.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public AddSpanReceiverResponseProto addSpanReceiver(
RpcController controller, AddSpanReceiverRequestProto req)
throws ServiceException {
try {
SpanReceiverInfoBuilder factory =
new SpanReceiverInfoBuilder(req.getClassName());
for (ConfigPair config : req.getConfigList()) {
factory.addConfigurationPair(config.getKey(), config.getValue());
}
long id = server.addSpanReceiver(factory.build());
return AddSpanReceiverResponseProto.newBuilder().setId(id).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public RemoveSpanReceiverResponseProto removeSpanReceiver(
RpcController controller, RemoveSpanReceiverRequestProto req)
throws ServiceException {
try {
server.removeSpanReceiver(req.getId());
return RemoveSpanReceiverResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return TraceAdminProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
if (!protocol.equals(RPC.getProtocolName(TraceAdminProtocolPB.class))) {
throw new IOException("Serverside implements " +
RPC.getProtocolName(TraceAdminProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(TraceAdminProtocolPB.class),
TraceAdminProtocolPB.class);
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ConfigPair;
import org.apache.hadoop.tracing.TraceAdminPB.RemoveSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.SpanReceiverListInfo;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
public class TraceAdminProtocolTranslatorPB implements
TraceAdminProtocol, ProtocolTranslator, Closeable {
private final TraceAdminProtocolPB rpcProxy;
public TraceAdminProtocolTranslatorPB(TraceAdminProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
ArrayList<SpanReceiverInfo> infos = new ArrayList<SpanReceiverInfo>(1);
try {
ListSpanReceiversRequestProto req =
ListSpanReceiversRequestProto.newBuilder().build();
ListSpanReceiversResponseProto resp =
rpcProxy.listSpanReceivers(null, req);
for (SpanReceiverListInfo info : resp.getDescriptionsList()) {
infos.add(new SpanReceiverInfo(info.getId(), info.getClassName()));
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return infos.toArray(new SpanReceiverInfo[infos.size()]);
}
@Override
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
try {
AddSpanReceiverRequestProto.Builder bld =
AddSpanReceiverRequestProto.newBuilder();
bld.setClassName(info.getClassName());
for (ConfigurationPair configPair : info.configPairs) {
ConfigPair tuple = ConfigPair.newBuilder().
setKey(configPair.getKey()).
setValue(configPair.getValue()).build();
bld.addConfig(tuple);
}
AddSpanReceiverResponseProto resp =
rpcProxy.addSpanReceiver(null, bld.build());
return resp.getId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void removeSpanReceiver(long spanReceiverId) throws IOException {
try {
RemoveSpanReceiverRequestProto req =
RemoveSpanReceiverRequestProto.newBuilder()
.setId(spanReceiverId).build();
rpcProxy.removeSpanReceiver(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
option java_package = "org.apache.hadoop.tracing";
option java_outer_classname = "TraceAdminPB";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.common;
message ListSpanReceiversRequestProto {
}
message SpanReceiverListInfo {
required int64 id = 1;
required string className = 2;
}
message ListSpanReceiversResponseProto {
repeated SpanReceiverListInfo descriptions = 1;
}
message ConfigPair {
required string key = 1;
required string value = 2;
}
message AddSpanReceiverRequestProto {
required string className = 1;
repeated ConfigPair config = 2;
}
message AddSpanReceiverResponseProto {
required int64 id = 1;
}
message RemoveSpanReceiverRequestProto {
required int64 id = 1;
}
message RemoveSpanReceiverResponseProto {
}
service TraceAdminService {
rpc listSpanReceivers(ListSpanReceiversRequestProto)
returns(ListSpanReceiversResponseProto);
rpc addSpanReceiver(AddSpanReceiverRequestProto)
returns(AddSpanReceiverResponseProto);
rpc removeSpanReceiver(RemoveSpanReceiverRequestProto)
returns(RemoveSpanReceiverResponseProto);
}

View File

@ -18,9 +18,6 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
HDFS-6584. Support archival storage. (See breakdown of tasks below for
features and contributors)
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@ -264,96 +261,15 @@ Trunk (Unreleased)
HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal)
BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE
HDFS-6677. Change INodeFile and FSImage to support storage policy ID.
(szetszwo)
HDFS-6670. Add block storage policy support with default HOT, WARM and COLD
policies. (szetszwo)
HDFS-6671. Change BlockPlacementPolicy to consider block storage policy
in replicaiton. (szetszwo)
HDFS-6710. Change BlockPlacementPolicy to consider block storage policy
in replica deletion. (szetszwo)
HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files.
(vinayakumarb via szetszwo)
HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage
types are unavailable. (szetszwo)
HDFS-6835. Add a new API to set storage policy. (jing9)
HDFS-6847. Support storage policy on directories and include storage policy
in HdfsFileStatus. (Jing Zhao via szetszwo)
HDFS-6801. Add a new data migration tool, Mover, for archiving data.
(szetszwo via jing9)
HDFS-6863. Support migration for snapshot paths. (jing9)
HDFS-6906. Add more tests for BlockStoragePolicy. (szetszwo via jing9)
HDFS-6911. check if a block is already scheduled in Mover.
(szetszwo via jing9)
HDFS-6920. Check the storage type of delNodeHintStorage when deleting
a replica. (szetszwo via jing9)
HDFS-6944. Add retry and termination logic for Mover. (jing9)
HDFS-6969. INode#getStoragePolicyID should always return the latest
storage policy. (jing9)
HDFS-6961. BlockPlacementPolicy#chooseTarget should check each valid
storage type in each choosing round. (jing9)
HDFS-6876. support set/get storage policy in DFSAdmin. (jing9)
HDFS-6997. Add more tests for data migration and replicaion. (szetszwo)
HDFS-6875. Support migration for a list of specified paths. (jing9)
HDFS-7027. Mover does not terminate when some storage type is out of space.
(szetszwo via jing9)
HDFS-7029. Fix TestDFSInotifyEventInputStream and TestDistributedFileSystem.
(szetszwo via jing9)
HDFS-7028. FSDirectory should not get storage policy id from symlinks.
(szetszwo)
HDFS-7034. Fix TestBlockPlacement and TestStorageMover. (jing9)
HDFS-7039. Fix Balancer tests. (szetszwo via jing9)
HDFS-7062. Skip under construction block for migration. (jing9)
HDFS-7052. Add Mover into hdfs script. (jing9)
HDFS-7072. Fix TestBlockManager and TestStorageMover. (jing9 via szetszwo)
HDFS-6864. Archival Storage: add user documentation. (szetszwo via jing9)
HDFS-7088. Archival Storage: fix TestBalancer and
TestBalancerWithMultipleNameNodes. (szetszwo via jing9)
HDFS-7095. TestStorageMover often fails in Jenkins. (jing9)
HDFS-7081. Add new DistributedFileSystem API for getting all the existing
storage policies. (jing9)
HDFS-7140. Add a tool to list all the existing block storage policies.
(jing9)
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
HDFS-6584. Support archival storage. (See breakdown of tasks below for
features and contributors)
IMPROVEMENTS
HDFS-6613. Improve logging in caching classes. (wang)
@ -593,6 +509,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7077. Separate CipherSuite from crypto protocol version. (wang)
HDFS-6956. Allow dynamically changing the tracing level in Hadoop servers
(cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -819,6 +738,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7132. hdfs namenode -metadataVersion command does not honor
configured name dirs. (Charles Lamb via wang)
HDFS-7148. TestEncryptionZones#testIsEncryptedMethod fails on branch-2
after archival storage merge. (wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
@ -966,6 +888,90 @@ Release 2.6.0 - UNRELEASED
HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId
file in the current directory. (jing9)
BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE
HDFS-6677. Change INodeFile and FSImage to support storage policy ID.
(szetszwo)
HDFS-6670. Add block storage policy support with default HOT, WARM and COLD
policies. (szetszwo)
HDFS-6671. Change BlockPlacementPolicy to consider block storage policy
in replicaiton. (szetszwo)
HDFS-6710. Change BlockPlacementPolicy to consider block storage policy
in replica deletion. (szetszwo)
HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files.
(vinayakumarb via szetszwo)
HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage
types are unavailable. (szetszwo)
HDFS-6835. Add a new API to set storage policy. (jing9)
HDFS-6847. Support storage policy on directories and include storage policy
in HdfsFileStatus. (Jing Zhao via szetszwo)
HDFS-6801. Add a new data migration tool, Mover, for archiving data.
(szetszwo via jing9)
HDFS-6863. Support migration for snapshot paths. (jing9)
HDFS-6906. Add more tests for BlockStoragePolicy. (szetszwo via jing9)
HDFS-6911. check if a block is already scheduled in Mover.
(szetszwo via jing9)
HDFS-6920. Check the storage type of delNodeHintStorage when deleting
a replica. (szetszwo via jing9)
HDFS-6944. Add retry and termination logic for Mover. (jing9)
HDFS-6969. INode#getStoragePolicyID should always return the latest
storage policy. (jing9)
HDFS-6961. BlockPlacementPolicy#chooseTarget should check each valid
storage type in each choosing round. (jing9)
HDFS-6876. support set/get storage policy in DFSAdmin. (jing9)
HDFS-6997. Add more tests for data migration and replicaion. (szetszwo)
HDFS-6875. Support migration for a list of specified paths. (jing9)
HDFS-7027. Mover does not terminate when some storage type is out of space.
(szetszwo via jing9)
HDFS-7029. Fix TestDFSInotifyEventInputStream and TestDistributedFileSystem.
(szetszwo via jing9)
HDFS-7028. FSDirectory should not get storage policy id from symlinks.
(szetszwo)
HDFS-7034. Fix TestBlockPlacement and TestStorageMover. (jing9)
HDFS-7039. Fix Balancer tests. (szetszwo via jing9)
HDFS-7062. Skip under construction block for migration. (jing9)
HDFS-7052. Add Mover into hdfs script. (jing9)
HDFS-7072. Fix TestBlockManager and TestStorageMover. (jing9 via szetszwo)
HDFS-6864. Archival Storage: add user documentation. (szetszwo via jing9)
HDFS-7088. Archival Storage: fix TestBalancer and
TestBalancerWithMultipleNameNodes. (szetszwo via jing9)
HDFS-7095. TestStorageMover often fails in Jenkins. (jing9)
HDFS-7081. Add new DistributedFileSystem API for getting all the existing
storage policies. (jing9)
HDFS-7140. Add a tool to list all the existing block storage policies.
(jing9)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.tracing.TraceAdminProtocol;
/**
* {@link PolicyProvider} for HDFS protocols.
@ -72,7 +73,10 @@ public class HDFSPolicyProvider extends PolicyProvider {
RefreshCallQueueProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH,
GenericRefreshProtocol.class)
GenericRefreshProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_TRACING,
TraceAdminProtocol.class)
};
@Override

View File

@ -179,6 +179,10 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -189,6 +193,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminProtocol;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
@ -230,7 +236,7 @@ import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol,
DataNodeMXBean {
TraceAdminProtocol, DataNodeMXBean {
public static final Log LOG = LogFactory.getLog(DataNode.class);
static{
@ -699,6 +705,14 @@ public class DataNode extends ReconfigurableBase
.newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
ipcServer);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
ipcServer);
LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
// set service-level authorization security policy
@ -3027,4 +3041,22 @@ public class DataNode extends ReconfigurableBase
return lastDiskErrorCheck;
}
}
@Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
checkSuperuserPrivilege();
return spanReceiverHost.listSpanReceivers();
}
@Override
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
checkSuperuserPrivilege();
return spanReceiverHost.addSpanReceiver(info);
}
@Override
public void removeSpanReceiver(long id) throws IOException {
checkSuperuserPrivilege();
spanReceiverHost.removeSpanReceiver(id);
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.TraceAdminProtocol;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
@ -244,6 +245,8 @@ public class NameNode implements NameNodeStatusMXBean {
return RefreshCallQueueProtocol.versionID;
} else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
return GetUserMappingsProtocol.versionID;
} else if (protocol.equals(TraceAdminProtocol.class.getName())){
return TraceAdminProtocol.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
@ -279,7 +282,7 @@ public class NameNode implements NameNodeStatusMXBean {
private JvmPauseMonitor pauseMonitor;
private ObjectName nameNodeStatusBeanName;
private SpanReceiverHost spanReceiverHost;
SpanReceiverHost spanReceiverHost;
/**
* The namenode address that clients will use to access this namenode
* or the name service. For HA configurations using logical URI, it

View File

@ -165,6 +165,11 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@ -257,6 +262,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
@ -300,6 +310,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
genericRefreshService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, serviceRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
@ -346,6 +358,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
genericRefreshService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, clientRpcServer);
// set service-level authorization security policy
if (serviceAuthEnabled =
@ -1597,5 +1611,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
}
}
@Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
namesystem.checkSuperuserPrivilege();
return nn.spanReceiverHost.listSpanReceivers();
}
@Override
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
namesystem.checkSuperuserPrivilege();
return nn.spanReceiverHost.addSpanReceiver(info);
}
@Override
public void removeSpanReceiver(long id) throws IOException {
namesystem.checkSuperuserPrivilege();
nn.spanReceiverHost.removeSpanReceiver(id);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tracing.TraceAdminProtocol;
/** The full set of RPC methods implemented by the Namenode. */
@InterfaceAudience.Private
@ -38,5 +39,6 @@ public interface NamenodeProtocols
RefreshCallQueueProtocol,
GenericRefreshProtocol,
GetUserMappingsProtocol,
HAServiceProtocol {
HAServiceProtocol,
TraceAdminProtocol {
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
public class TestTraceAdmin {
private String runTraceCommand(TraceAdmin trace, String... cmd)
throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
PrintStream oldStdout = System.out;
PrintStream oldStderr = System.err;
System.setOut(ps);
System.setErr(ps);
int ret = -1;
try {
ret = trace.run(cmd);
} finally {
System.out.flush();
System.setOut(oldStdout);
System.setErr(oldStderr);
}
return "ret:" + ret + ", " + baos.toString();
}
private String getHostPortForNN(MiniDFSCluster cluster) {
return "127.0.0.1:" + cluster.getNameNodePort();
}
@Test
public void testCreateAndDestroySpanReceiver() throws Exception {
Configuration conf = new Configuration();
conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, "");
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
TemporarySocketDirectory tempDir = new TemporarySocketDirectory();
String tracePath =
new File(tempDir.getDir(), "tracefile").getAbsolutePath();
try {
TraceAdmin trace = new TraceAdmin();
trace.setConf(conf);
Assert.assertEquals("ret:0, [no span receivers found]\n",
runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster)));
Assert.assertEquals("ret:0, Added trace span receiver 1 with " +
"configuration local-file-span-receiver.path = " + tracePath + "\n",
runTraceCommand(trace, "-add", "-host", getHostPortForNN(cluster),
"-class", "org.htrace.impl.LocalFileSpanReceiver",
"-Clocal-file-span-receiver.path=" + tracePath));
String list =
runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster));
Assert.assertTrue(list.startsWith("ret:0"));
Assert.assertTrue(list.contains("1 org.htrace.impl.LocalFileSpanReceiver"));
Assert.assertEquals("ret:0, Removed trace span receiver 1\n",
runTraceCommand(trace, "-remove", "1", "-host",
getHostPortForNN(cluster)));
Assert.assertEquals("ret:0, [no span receivers found]\n",
runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster)));
} finally {
cluster.shutdown();
tempDir.close();
}
}
}

View File

@ -377,6 +377,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6093. minor distcp doc edits (Charles Lamb via aw)
MAPREDUCE-5831. Make MR client ignore unknown counters received from AM.
(Junping Du via zjshen)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -241,9 +241,12 @@ public class TypeConverter {
org.apache.hadoop.mapreduce.Counter c =
counters.findCounter(yGrp.getName(),
yCntr.getName());
// if c can be found, or it will be skipped.
if (c != null) {
c.setValue(yCntr.getValue());
}
}
}
return counters;
}

View File

@ -34,6 +34,8 @@ import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.WritableUtils;
@ -54,6 +56,8 @@ public abstract class FileSystemCounterGroup<C extends Counter>
static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
private static final Log LOG = LogFactory.getLog(FileSystemCounterGroup.class);
// C[] would need Array.newInstance which requires a Class<C> reference.
// Just a few local casts probably worth not having to carry it around.
private final Map<String, Object[]> map =
@ -159,13 +163,17 @@ public abstract class FileSystemCounterGroup<C extends Counter>
else {
ours = findCounter(counter.getName());
}
if (ours != null) {
ours.setValue(counter.getValue());
}
}
@Override
public C addCounter(String name, String displayName, long value) {
C counter = findCounter(name);
if (counter != null) {
counter.setValue(value);
}
return counter;
}
@ -192,13 +200,14 @@ public abstract class FileSystemCounterGroup<C extends Counter>
}
catch (Exception e) {
if (create) throw new IllegalArgumentException(e);
LOG.warn(counterName + " is not a recognized counter.");
return null;
}
}
@Override
public C findCounter(String counterName) {
return findCounter(counterName, true);
return findCounter(counterName, false);
}
@SuppressWarnings("unchecked")

View File

@ -151,13 +151,21 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
@Override
public void addCounter(C counter) {
C ours = findCounter(counter.getName());
if (ours != null) {
ours.setValue(counter.getValue());
} else {
LOG.warn(counter.getName() + "is not a known counter.");
}
}
@Override
public C addCounter(String name, String displayName, long value) {
C counter = findCounter(name);
if (counter != null) {
counter.setValue(value);
} else {
LOG.warn(name + "is not a known counter.");
}
return counter;
}
@ -179,7 +187,13 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
@Override
public C findCounter(String counterName) {
return findCounter(valueOf(counterName));
try {
T enumValue = valueOf(counterName);
return findCounter(enumValue);
} catch (IllegalArgumentException e) {
LOG.warn(counterName + " is not a recognized counter.");
return null;
}
}
@SuppressWarnings("unchecked")
@ -208,13 +222,15 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
}
@Override
@SuppressWarnings("unchecked")
@SuppressWarnings("rawtypes")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other, "other counter group")
instanceof FrameworkCounterGroup<?, ?>) {
for (Counter counter : other) {
findCounter(((FrameworkCounter) counter).key.name())
.increment(counter.getValue());
C c = findCounter(((FrameworkCounter) counter).key.name());
if (c != null) {
c.increment(counter.getValue());
}
}
}
}

View File

@ -33,9 +33,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.CountersExceededException;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.Counters.GroupFactory;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
import org.apache.hadoop.mapreduce.counters.CounterGroupFactory.FrameworkGroupFactory;
import org.junit.Test;
/**
@ -321,4 +324,55 @@ public class TestCounters {
public static void main(String[] args) throws IOException {
new TestCounters().testCounters();
}
@SuppressWarnings("rawtypes")
@Test
public void testFrameworkCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
FrameworkGroupFactory frameworkGroupFactory =
groupFactory.newFrameworkGroupFactory(JobCounter.class);
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
FrameworkCounterGroup counterGroup =
(FrameworkCounterGroup) group.getUnderlyingGroup();
org.apache.hadoop.mapreduce.Counter count1 =
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
counterGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
@Test
public void testFilesystemCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
Group fsGroup = groupFactory.newFileSystemGroup();
org.apache.hadoop.mapreduce.Counter count1 =
fsGroup.findCounter("ANY_BYTES_READ");
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
fsGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
}
class GroupFactoryForTest extends GroupFactory {
public <T extends Enum<T>>
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
return super.newFrameworkGroupFactory(cls);
}
public Group newFileSystemGroup() {
return super.newFileSystemGroup();
}
}

View File

@ -96,7 +96,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
<scope>test</scope>
<type>test-jar</type>
</dependency>

View File

@ -256,6 +256,9 @@ Release 2.6.0 - UNRELEASED
YARN-1959. Fix headroom calculation in FairScheduler.
(Anubhav Dhoot via kasha)
YARN-2577. Clarify ACL delimiter and how to configure ACL groups only
(Miklos Christine via aw)
OPTIMIZATIONS
BUG FIXES

View File

@ -406,6 +406,9 @@ Queue Access Control Lists (ACLs)
is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
ACL, then both users may submit to queue2.
<<Note:>> The delimiter is a space character. To specify only ACL groups, begin the
value with a space character.
The root queue's ACLs are "*" by default which, because ACLs are passed down,
means that everybody may submit to and kill applications from every queue.
To start restricting access, change the root queue's ACLs to something other