Merge trunk into HA branch.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1244202 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
011611c765
|
@ -84,6 +84,8 @@ Trunk (unreleased changes)
|
|||
HADOOP-7988. Upper case in hostname part of the principals doesn't work with
|
||||
kerberos. (jitendra)
|
||||
|
||||
HADOOP-8070. Add a standalone benchmark for RPC call performance. (todd)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc
|
||||
|
@ -164,6 +166,8 @@ Release 0.23.2 - UNRELEASED
|
|||
(szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
HADOOP-8071. Avoid an extra packet in client code when nagling is
|
||||
disabled. (todd)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
|
|
|
@ -794,14 +794,18 @@ public class Client {
|
|||
//for serializing the
|
||||
//data to be written
|
||||
d = new DataOutputBuffer();
|
||||
d.writeInt(0); // placeholder for data length
|
||||
RpcPayloadHeader header = new RpcPayloadHeader(
|
||||
call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
|
||||
header.write(d);
|
||||
call.rpcRequest.write(d);
|
||||
byte[] data = d.getData();
|
||||
int dataLength = d.getLength();
|
||||
out.writeInt(dataLength); //first put the data length
|
||||
out.write(data, 0, dataLength);//write the data
|
||||
int dataLength = d.getLength() - 4;
|
||||
data[0] = (byte)((dataLength >>> 24) & 0xff);
|
||||
data[1] = (byte)((dataLength >>> 16) & 0xff);
|
||||
data[2] = (byte)((dataLength >>> 8) & 0xff);
|
||||
data[3] = (byte)(dataLength & 0xff);
|
||||
out.write(data, 0, dataLength + 4);//write the data
|
||||
out.flush();
|
||||
}
|
||||
} catch(IOException e) {
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.nio.channels.SocketChannel;
|
|||
import java.nio.channels.WritableByteChannel;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -92,6 +93,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
||||
* parameter, and return a {@link Writable} as their value. A service runs on
|
||||
* a port and is defined by a parameter class and a value class.
|
||||
|
@ -318,6 +321,12 @@ public abstract class Server {
|
|||
return rpcMetrics;
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
Iterable<? extends Thread> getHandlers() {
|
||||
return Arrays.asList(handlers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh the service authorization ACL for the service handled by this server.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,420 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.commons.cli.OptionBuilder;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ipc.RPC.Server;
|
||||
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
|
||||
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
|
||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
/**
|
||||
* Benchmark for protobuf RPC.
|
||||
* Run with --help option for usage.
|
||||
*/
|
||||
public class RPCCallBenchmark implements Tool, Configurable {
|
||||
private Configuration conf;
|
||||
private AtomicLong callCount = new AtomicLong(0);
|
||||
private static ThreadMXBean threadBean =
|
||||
ManagementFactory.getThreadMXBean();
|
||||
|
||||
private static class MyOptions {
|
||||
private boolean failed = false;
|
||||
private int serverThreads = 0;
|
||||
private int serverReaderThreads = 1;
|
||||
private int clientThreads = 0;
|
||||
private String host = "0.0.0.0";
|
||||
private int port = 12345;
|
||||
public int secondsToRun = 15;
|
||||
private int msgSize = 1024;
|
||||
public Class<? extends RpcEngine> rpcEngine =
|
||||
WritableRpcEngine.class;
|
||||
|
||||
private MyOptions(String args[]) {
|
||||
try {
|
||||
Options opts = buildOptions();
|
||||
CommandLineParser parser = new GnuParser();
|
||||
CommandLine line = parser.parse(opts, args, true);
|
||||
processOptions(line, opts);
|
||||
validateOptions();
|
||||
} catch (ParseException e) {
|
||||
System.err.println(e.getMessage());
|
||||
System.err.println("Try \"--help\" option for details.");
|
||||
failed = true;
|
||||
}
|
||||
}
|
||||
|
||||
private void validateOptions() throws ParseException {
|
||||
if (serverThreads <= 0 && clientThreads <= 0) {
|
||||
throw new ParseException("Must specify at least -c or -s");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
private Options buildOptions() {
|
||||
Options opts = new Options();
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("serverThreads").hasArg(true)
|
||||
.withArgName("numthreads")
|
||||
.withDescription("number of server threads (handlers) to run (or 0 to not run server)")
|
||||
.create("s"));
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true)
|
||||
.withArgName("threads")
|
||||
.withDescription("number of server reader threads to run")
|
||||
.create("r"));
|
||||
|
||||
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("clientThreads").hasArg(true)
|
||||
.withArgName("numthreads")
|
||||
.withDescription("number of client threads to run (or 0 to not run client)")
|
||||
.create("c"));
|
||||
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("messageSize").hasArg(true)
|
||||
.withArgName("bytes")
|
||||
.withDescription("size of call parameter in bytes")
|
||||
.create("m"));
|
||||
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("time").hasArg(true)
|
||||
.withArgName("seconds")
|
||||
.withDescription("number of seconds to run clients for")
|
||||
.create("t"));
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("port").hasArg(true)
|
||||
.withArgName("port")
|
||||
.withDescription("port to listen or connect on")
|
||||
.create("p"));
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("host").hasArg(true)
|
||||
.withArgName("addr")
|
||||
.withDescription("host to listen or connect on")
|
||||
.create('h'));
|
||||
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("engine").hasArg(true)
|
||||
.withArgName("writable|protobuf")
|
||||
.withDescription("engine to use")
|
||||
.create('e'));
|
||||
|
||||
opts.addOption(
|
||||
OptionBuilder.withLongOpt("help").hasArg(false)
|
||||
.withDescription("show this screen")
|
||||
.create('?'));
|
||||
|
||||
return opts;
|
||||
}
|
||||
|
||||
private void processOptions(CommandLine line, Options opts)
|
||||
throws ParseException {
|
||||
if (line.hasOption("help") || line.hasOption('?')) {
|
||||
HelpFormatter formatter = new HelpFormatter();
|
||||
System.out.println("Protobuf IPC benchmark.");
|
||||
System.out.println();
|
||||
formatter.printHelp(100,
|
||||
"java ... PBRPCBenchmark [options]",
|
||||
"\nSupported options:", opts, "");
|
||||
return;
|
||||
}
|
||||
|
||||
if (line.hasOption('s')) {
|
||||
serverThreads = Integer.parseInt(line.getOptionValue('s'));
|
||||
}
|
||||
if (line.hasOption('r')) {
|
||||
serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
|
||||
}
|
||||
if (line.hasOption('c')) {
|
||||
clientThreads = Integer.parseInt(line.getOptionValue('c'));
|
||||
}
|
||||
if (line.hasOption('t')) {
|
||||
secondsToRun = Integer.parseInt(line.getOptionValue('t'));
|
||||
}
|
||||
if (line.hasOption('m')) {
|
||||
msgSize = Integer.parseInt(line.getOptionValue('m'));
|
||||
}
|
||||
if (line.hasOption('p')) {
|
||||
port = Integer.parseInt(line.getOptionValue('p'));
|
||||
}
|
||||
if (line.hasOption('h')) {
|
||||
host = line.getOptionValue('h');
|
||||
}
|
||||
if (line.hasOption('e')) {
|
||||
String eng = line.getOptionValue('e');
|
||||
if ("protobuf".equals(eng)) {
|
||||
rpcEngine = ProtobufRpcEngine.class;
|
||||
} else if ("writable".equals(eng)) {
|
||||
rpcEngine = WritableRpcEngine.class;
|
||||
} else {
|
||||
throw new ParseException("invalid engine: " + eng);
|
||||
}
|
||||
}
|
||||
|
||||
String[] remainingArgs = line.getArgs();
|
||||
if (remainingArgs.length != 0) {
|
||||
throw new ParseException("Extra arguments: " +
|
||||
Joiner.on(" ").join(remainingArgs));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads
|
||||
+ "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads="
|
||||
+ clientThreads + "\nhost=" + host + "\nport=" + port
|
||||
+ "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private Server startServer(MyOptions opts) throws IOException {
|
||||
if (opts.serverThreads <= 0) {
|
||||
return null;
|
||||
}
|
||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
|
||||
opts.serverReaderThreads);
|
||||
|
||||
RPC.Server server;
|
||||
// Get RPC server for server side implementation
|
||||
if (opts.rpcEngine == ProtobufRpcEngine.class) {
|
||||
// Create server side implementation
|
||||
PBServerImpl serverImpl = new PBServerImpl();
|
||||
BlockingService service = TestProtobufRpcProto
|
||||
.newReflectiveBlockingService(serverImpl);
|
||||
|
||||
server = RPC.getServer(TestRpcService.class, service,
|
||||
opts.host, opts.port, opts.serverThreads, false, conf, null);
|
||||
} else if (opts.rpcEngine == WritableRpcEngine.class) {
|
||||
server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
|
||||
opts.host, opts.port, opts.serverThreads, false, conf, null);
|
||||
} else {
|
||||
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
|
||||
}
|
||||
server.start();
|
||||
return server;
|
||||
}
|
||||
|
||||
private long getTotalCpuTime(Iterable<? extends Thread> threads) {
|
||||
long total = 0;
|
||||
for (Thread t : threads) {
|
||||
long tid = t.getId();
|
||||
total += threadBean.getThreadCpuTime(tid);
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
MyOptions opts = new MyOptions(args);
|
||||
if (opts.failed) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Set RPC engine to the configured RPC engine
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine);
|
||||
|
||||
Server server = startServer(opts);
|
||||
try {
|
||||
|
||||
TestContext ctx = setupClientTestContext(opts);
|
||||
if (ctx != null) {
|
||||
long totalCalls = 0;
|
||||
ctx.startThreads();
|
||||
long veryStart = System.nanoTime();
|
||||
|
||||
// Loop printing results every second until the specified
|
||||
// time has elapsed
|
||||
for (int i = 0; i < opts.secondsToRun ; i++) {
|
||||
long st = System.nanoTime();
|
||||
ctx.waitFor(1000);
|
||||
long et = System.nanoTime();
|
||||
long ct = callCount.getAndSet(0);
|
||||
totalCalls += ct;
|
||||
double callsPerSec = (ct * 1000000000)/(et - st);
|
||||
System.out.println("Calls per second: " + callsPerSec);
|
||||
}
|
||||
|
||||
// Print results
|
||||
|
||||
if (totalCalls > 0) {
|
||||
long veryEnd = System.nanoTime();
|
||||
double callsPerSec =
|
||||
(totalCalls * 1000000000)/(veryEnd - veryStart);
|
||||
long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads());
|
||||
long cpuNanosServer = -1;
|
||||
if (server != null) {
|
||||
cpuNanosServer = getTotalCpuTime(server.getHandlers());;
|
||||
}
|
||||
System.out.println("====== Results ======");
|
||||
System.out.println("Options:\n" + opts);
|
||||
System.out.println("Total calls per second: " + callsPerSec);
|
||||
System.out.println("CPU time per call on client: " +
|
||||
(cpuNanosClient / totalCalls) + " ns");
|
||||
if (server != null) {
|
||||
System.out.println("CPU time per call on server: " +
|
||||
(cpuNanosServer / totalCalls) + " ns");
|
||||
}
|
||||
} else {
|
||||
System.out.println("No calls!");
|
||||
}
|
||||
|
||||
ctx.stop();
|
||||
} else {
|
||||
while (true) {
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (server != null) {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
private TestContext setupClientTestContext(final MyOptions opts)
|
||||
throws IOException, InterruptedException {
|
||||
if (opts.clientThreads <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Set up a separate proxy for each client thread,
|
||||
// rather than making them share TCP pipes.
|
||||
int numProxies = opts.clientThreads;
|
||||
final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies];
|
||||
for (int i = 0; i < numProxies; i++) {
|
||||
proxies[i] =
|
||||
UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{})
|
||||
.doAs(new PrivilegedExceptionAction<RpcServiceWrapper>() {
|
||||
@Override
|
||||
public RpcServiceWrapper run() throws Exception {
|
||||
return createRpcClient(opts);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Create an echo message of the desired length
|
||||
final StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
|
||||
for (int c = 0; c < opts.msgSize; c++) {
|
||||
msgBuilder.append('x');
|
||||
}
|
||||
final String echoMessage = msgBuilder.toString();
|
||||
|
||||
// Create the clients in a test context
|
||||
TestContext ctx = new TestContext();
|
||||
for (int i = 0; i < opts.clientThreads; i++) {
|
||||
final RpcServiceWrapper proxy = proxies[i % numProxies];
|
||||
|
||||
ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
|
||||
@Override
|
||||
public void doAnAction() throws Exception {
|
||||
proxy.doEcho(echoMessage);
|
||||
callCount.incrementAndGet();
|
||||
}
|
||||
});
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple interface that can be implemented either by the
|
||||
* protobuf or writable implementations.
|
||||
*/
|
||||
private interface RpcServiceWrapper {
|
||||
public String doEcho(String msg) throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a client proxy for the specified engine.
|
||||
*/
|
||||
private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port);
|
||||
|
||||
if (opts.rpcEngine == ProtobufRpcEngine.class) {
|
||||
final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
|
||||
return new RpcServiceWrapper() {
|
||||
@Override
|
||||
public String doEcho(String msg) throws Exception {
|
||||
EchoRequestProto req = EchoRequestProto.newBuilder()
|
||||
.setMessage(msg)
|
||||
.build();
|
||||
EchoResponseProto responseProto = proxy.echo(null, req);
|
||||
return responseProto.getMessage();
|
||||
}
|
||||
};
|
||||
} else if (opts.rpcEngine == WritableRpcEngine.class) {
|
||||
final TestProtocol proxy = (TestProtocol)RPC.getProxy(
|
||||
TestProtocol.class, TestProtocol.versionID, addr, conf);
|
||||
return new RpcServiceWrapper() {
|
||||
@Override
|
||||
public String doEcho(String msg) throws Exception {
|
||||
return proxy.echo(msg);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String []args) throws Exception {
|
||||
int rc = ToolRunner.run(new RPCCallBenchmark(), args);
|
||||
System.exit(rc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class TestRPCCallBenchmark {
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testBenchmarkWithWritable() throws Exception {
|
||||
int rc = ToolRunner.run(new RPCCallBenchmark(),
|
||||
new String[] {
|
||||
"--clientThreads", "30",
|
||||
"--serverThreads", "30",
|
||||
"--time", "5",
|
||||
"--serverReaderThreads", "4",
|
||||
"--messageSize", "1024",
|
||||
"--engine", "writable"});
|
||||
assertEquals(0, rc);
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testBenchmarkWithProto() throws Exception {
|
||||
int rc = ToolRunner.run(new RPCCallBenchmark(),
|
||||
new String[] {
|
||||
"--clientThreads", "30",
|
||||
"--serverThreads", "30",
|
||||
"--time", "5",
|
||||
"--serverReaderThreads", "4",
|
||||
"--messageSize", "1024",
|
||||
"--engine", "protobuf"});
|
||||
assertEquals(0, rc);
|
||||
}
|
||||
}
|
|
@ -164,6 +164,10 @@ public abstract class MultithreadedTestUtil {
|
|||
}
|
||||
checkException();
|
||||
}
|
||||
|
||||
public Iterable<? extends Thread> getTestThreads() {
|
||||
return testThreads;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -206,6 +206,10 @@ Trunk (unreleased changes)
|
|||
HDFS-2908. Add apache license header for StorageReport.java. (Brandon Li
|
||||
via jitendra)
|
||||
|
||||
HDFS-2944. Typo in hdfs-default.xml causes
|
||||
dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
|
||||
disabled. (atm)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -217,6 +221,9 @@ Release 0.23.2 - UNRELEASED
|
|||
|
||||
NEW FEATURES
|
||||
|
||||
HDFS-2943. Expose last checkpoint time and transaction stats as JMX
|
||||
metrics. (atm)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience.
|
||||
|
|
|
@ -744,7 +744,7 @@ public class FSImage implements Closeable {
|
|||
long txId = loader.getLoadedImageTxId();
|
||||
LOG.info("Loaded image for txid " + txId + " from " + curFile);
|
||||
lastAppliedTxId = txId;
|
||||
storage.setMostRecentCheckpointTxId(txId);
|
||||
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -761,7 +761,7 @@ public class FSImage implements Closeable {
|
|||
saver.save(newFile, compression);
|
||||
|
||||
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
|
||||
storage.setMostRecentCheckpointTxId(txid);
|
||||
storage.setMostRecentCheckpointInfo(txid, Util.now());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1065,7 +1065,7 @@ public class FSImage implements Closeable {
|
|||
// advertise it as such to other checkpointers
|
||||
// from now on
|
||||
if (txid > storage.getMostRecentCheckpointTxId()) {
|
||||
storage.setMostRecentCheckpointTxId(txid);
|
||||
storage.setMostRecentCheckpointInfo(txid, Util.now());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3156,6 +3156,31 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
public int getExpiredHeartbeats() {
|
||||
return datanodeStatistics.getExpiredHeartbeats();
|
||||
}
|
||||
|
||||
@Metric({"TransactionsSinceLastCheckpoint",
|
||||
"Number of transactions since last checkpoint"})
|
||||
public long getTransactionsSinceLastCheckpoint() {
|
||||
return getEditLog().getLastWrittenTxId() -
|
||||
getFSImage().getStorage().getMostRecentCheckpointTxId();
|
||||
}
|
||||
|
||||
@Metric({"TransactionsSinceLastLogRoll",
|
||||
"Number of transactions since last edit log roll"})
|
||||
public long getTransactionsSinceLastLogRoll() {
|
||||
return (getEditLog().getLastWrittenTxId() -
|
||||
getEditLog().getCurSegmentTxId()) + 1;
|
||||
}
|
||||
|
||||
@Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
|
||||
public long getLastWrittenTransactionId() {
|
||||
return getEditLog().getLastWrittenTxId();
|
||||
}
|
||||
|
||||
@Metric({"LastCheckpointTime",
|
||||
"Time in milliseconds since the epoch of the last checkpoint"})
|
||||
public long getLastCheckpointTime() {
|
||||
return getFSImage().getStorage().getMostRecentCheckpointTime();
|
||||
}
|
||||
|
||||
/** @see ClientProtocol#getStats() */
|
||||
long[] getStats() {
|
||||
|
|
|
@ -128,6 +128,11 @@ public class NNStorage extends Storage implements Closeable {
|
|||
* that have since been written to the edit log.
|
||||
*/
|
||||
protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
|
||||
|
||||
/**
|
||||
* Time of the last checkpoint, in milliseconds since the epoch.
|
||||
*/
|
||||
private long mostRecentCheckpointTime = 0;
|
||||
|
||||
/**
|
||||
* list of failed (and thus removed) storages
|
||||
|
@ -454,18 +459,29 @@ public class NNStorage extends Storage implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the transaction ID of the last checkpoint
|
||||
* Set the transaction ID and time of the last checkpoint
|
||||
*
|
||||
* @param txid transaction id of the last checkpoint
|
||||
* @param time time of the last checkpoint, in millis since the epoch
|
||||
*/
|
||||
void setMostRecentCheckpointTxId(long txid) {
|
||||
void setMostRecentCheckpointInfo(long txid, long time) {
|
||||
this.mostRecentCheckpointTxId = txid;
|
||||
this.mostRecentCheckpointTime = time;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the transaction ID of the last checkpoint.
|
||||
* @return the transaction ID of the last checkpoint.
|
||||
*/
|
||||
public long getMostRecentCheckpointTxId() {
|
||||
return mostRecentCheckpointTxId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the time of the most recent checkpoint in millis since the epoch.
|
||||
*/
|
||||
long getMostRecentCheckpointTime() {
|
||||
return mostRecentCheckpointTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a small file in all available storage directories that
|
||||
|
|
|
@ -361,7 +361,7 @@
|
|||
|
||||
<property>
|
||||
<name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
|
||||
<value>ture</value>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If there is a datanode/network failure in the write pipeline,
|
||||
DFSClient will try to remove the failed datanode from the pipeline
|
||||
|
@ -369,7 +369,7 @@
|
|||
the number of datanodes in the pipeline is decreased. The feature is
|
||||
to add new datanodes to the pipeline.
|
||||
|
||||
This is a site-wise property to enable/disable the feature.
|
||||
This is a site-wide property to enable/disable the feature.
|
||||
|
||||
See also dfs.client.block.write.replace-datanode-on-failure.policy
|
||||
</description>
|
||||
|
|
|
@ -20,13 +20,12 @@ package org.apache.hadoop.hdfs.server.namenode.metrics;
|
|||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -39,17 +38,21 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test for metrics published by the Namenode
|
||||
*/
|
||||
public class TestNameNodeMetrics extends TestCase {
|
||||
public class TestNameNodeMetrics {
|
||||
private static final Configuration CONF = new HdfsConfiguration();
|
||||
private static final int DFS_REPLICATION_INTERVAL = 1;
|
||||
private static final Path TEST_ROOT_DIR_PATH =
|
||||
|
@ -81,8 +84,8 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
return new Path(TEST_ROOT_DIR_PATH, fileName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
|
||||
cluster.waitActive();
|
||||
namesystem = cluster.getNamesystem();
|
||||
|
@ -90,8 +93,8 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
fs = (DistributedFileSystem) cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
|
@ -115,6 +118,7 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
}
|
||||
|
||||
/** Test metrics associated with addition of a file */
|
||||
@Test
|
||||
public void testFileAdd() throws Exception {
|
||||
// Add files with 100 blocks
|
||||
final Path file = getTestPath("testFileAdd");
|
||||
|
@ -159,6 +163,7 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
}
|
||||
|
||||
/** Corrupt a block and ensure metrics reflects it */
|
||||
@Test
|
||||
public void testCorruptBlock() throws Exception {
|
||||
// Create a file with single block with two replicas
|
||||
final Path file = getTestPath("testCorruptBlock");
|
||||
|
@ -189,6 +194,7 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
/** Create excess blocks by reducing the replication factor for
|
||||
* for a file and ensure metrics reflects it
|
||||
*/
|
||||
@Test
|
||||
public void testExcessBlocks() throws Exception {
|
||||
Path file = getTestPath("testExcessBlocks");
|
||||
createFile(file, 100, (short)2);
|
||||
|
@ -201,6 +207,7 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
}
|
||||
|
||||
/** Test to ensure metrics reflects missing blocks */
|
||||
@Test
|
||||
public void testMissingBlock() throws Exception {
|
||||
// Create a file with single block with two replicas
|
||||
Path file = getTestPath("testMissingBlocks");
|
||||
|
@ -230,6 +237,7 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameMetrics() throws Exception {
|
||||
Path src = getTestPath("src");
|
||||
createFile(src, 100, (short)1);
|
||||
|
@ -254,7 +262,8 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
*
|
||||
* @throws IOException in case of an error
|
||||
*/
|
||||
public void testGetBlockLocationMetric() throws Exception{
|
||||
@Test
|
||||
public void testGetBlockLocationMetric() throws Exception {
|
||||
Path file1_Path = new Path(TEST_ROOT_DIR_PATH, "file1.dat");
|
||||
|
||||
// When cluster starts first time there are no file (read,create,open)
|
||||
|
@ -282,4 +291,46 @@ public class TestNameNodeMetrics extends TestCase {
|
|||
updateMetrics();
|
||||
assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test NN checkpoint and transaction-related metrics.
|
||||
*/
|
||||
@Test
|
||||
public void testTransactionAndCheckpointMetrics() throws Exception {
|
||||
long lastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
|
||||
getMetrics(NS_METRICS));
|
||||
|
||||
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
|
||||
assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
|
||||
|
||||
fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp"));
|
||||
updateMetrics();
|
||||
|
||||
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
|
||||
assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS));
|
||||
|
||||
cluster.getNameNodeRpc().rollEditLog();
|
||||
updateMetrics();
|
||||
|
||||
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
|
||||
assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
|
||||
|
||||
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
cluster.getNameNodeRpc().saveNamespace();
|
||||
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
updateMetrics();
|
||||
|
||||
long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
|
||||
getMetrics(NS_METRICS));
|
||||
assertTrue(lastCkptTime < newLastCkptTime);
|
||||
assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
|
||||
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -806,6 +806,12 @@ Release 0.23.1 - 2012-02-08
|
|||
MAPREDUCE-3843. Job summary log file found missing on the RM host
|
||||
(Anupam Seth via tgraves)
|
||||
|
||||
MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
|
||||
the recovery. (vinodkv)
|
||||
|
||||
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
|
||||
still can recover successfully after MAPREDUCE-3846. (vinodkv)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -2682,6 +2688,9 @@ Release 0.22.1 - Unreleased
|
|||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-3837. Job tracker is not able to recover jobs after crash.
|
||||
(Mayank Bansal via shv)
|
||||
|
||||
Release 0.22.0 - 2011-11-29
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -244,7 +244,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||
|
||||
// Log the size of the history-event-queue every so often.
|
||||
if (eventCounter % 1000 == 0) {
|
||||
if (eventCounter != 0 && eventCounter % 1000 == 0) {
|
||||
eventCounter = 0;
|
||||
LOG.info("Size of the JobHistory event queue is "
|
||||
+ eventQueue.size());
|
||||
|
@ -464,8 +464,10 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
}
|
||||
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
||||
event.getJobID());
|
||||
LOG.info("In HistoryEventHandler "
|
||||
+ event.getHistoryEvent().getEventType());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("In HistoryEventHandler "
|
||||
+ event.getHistoryEvent().getEventType());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error writing History Event: " + event.getHistoryEvent(),
|
||||
e);
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeConverter;
|
|||
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
|
@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
|||
* The information is shared across different components using AppContext.
|
||||
*/
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class MRAppMaster extends CompositeService {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
|
||||
|
@ -138,7 +138,7 @@ public class MRAppMaster extends CompositeService {
|
|||
private final int nmPort;
|
||||
private final int nmHttpPort;
|
||||
protected final MRAppMetrics metrics;
|
||||
private Set<TaskId> completedTasksFromPreviousRun;
|
||||
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
||||
private List<AMInfo> amInfos;
|
||||
private AppContext context;
|
||||
private Dispatcher dispatcher;
|
||||
|
@ -596,7 +596,7 @@ public class MRAppMaster extends CompositeService {
|
|||
return dispatcher;
|
||||
}
|
||||
|
||||
public Set<TaskId> getCompletedTaskFromPreviousRun() {
|
||||
public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() {
|
||||
return completedTasksFromPreviousRun;
|
||||
}
|
||||
|
||||
|
@ -737,7 +737,6 @@ public class MRAppMaster extends CompositeService {
|
|||
return jobs;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public EventHandler getEventHandler() {
|
||||
return dispatcher.getEventHandler();
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
|
|||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
|
||||
|
@ -133,7 +134,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
private float cleanupWeight = 0.05f;
|
||||
private float mapWeight = 0.0f;
|
||||
private float reduceWeight = 0.0f;
|
||||
private final Set<TaskId> completedTasksFromPreviousRun;
|
||||
private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
||||
private final List<AMInfo> amInfos;
|
||||
private final Lock readLock;
|
||||
private final Lock writeLock;
|
||||
|
@ -376,7 +377,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
TaskAttemptListener taskAttemptListener,
|
||||
JobTokenSecretManager jobTokenSecretManager,
|
||||
Credentials fsTokenCredentials, Clock clock,
|
||||
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
|
||||
OutputCommitter committer, boolean newApiCommitter, String userName,
|
||||
long appSubmitTime, List<AMInfo> amInfos) {
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
|
|
|
@ -19,13 +19,14 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
|
@ -38,7 +39,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "deprecation" })
|
||||
@SuppressWarnings({ "rawtypes" })
|
||||
public class MapTaskImpl extends TaskImpl {
|
||||
|
||||
private final TaskSplitMetaInfo taskSplitMetaInfo;
|
||||
|
@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImpl {
|
|||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
||||
Set<TaskId> completedTasksFromPreviousRun, int startCount,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics) {
|
||||
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
|
||||
conf, taskAttemptListener, committer, jobToken, fsTokens, clock,
|
||||
|
|
|
@ -19,13 +19,14 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
|
@ -37,7 +38,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "deprecation" })
|
||||
@SuppressWarnings({ "rawtypes" })
|
||||
public class ReduceTaskImpl extends TaskImpl {
|
||||
|
||||
private final int numMapTasks;
|
||||
|
@ -47,7 +48,7 @@ public class ReduceTaskImpl extends TaskImpl {
|
|||
int numMapTasks, TaskAttemptListener taskAttemptListener,
|
||||
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
|
||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
||||
Set<TaskId> completedTasksFromPreviousRun, int startCount,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics) {
|
||||
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
|
||||
taskAttemptListener, committer, jobToken, fsTokens, clock,
|
||||
|
|
|
@ -18,13 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
||||
|
@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
|
@ -208,8 +213,23 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
|
||||
private final StateMachine<TaskState, TaskEventType, TaskEvent>
|
||||
stateMachine;
|
||||
|
||||
protected int nextAttemptNumber;
|
||||
|
||||
// By default, the next TaskAttempt number is zero. Changes during recovery
|
||||
protected int nextAttemptNumber = 0;
|
||||
private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
|
||||
new ArrayList<TaskAttemptInfo>();
|
||||
|
||||
private static final class RecoverdAttemptsComparator implements
|
||||
Comparator<TaskAttemptInfo> {
|
||||
@Override
|
||||
public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
|
||||
long diff = attempt1.getStartTime() - attempt2.getStartTime();
|
||||
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
|
||||
}
|
||||
}
|
||||
|
||||
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
|
||||
new RecoverdAttemptsComparator();
|
||||
|
||||
//should be set to one which comes first
|
||||
//saying COMMIT_PENDING
|
||||
|
@ -230,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
||||
Set<TaskId> completedTasksFromPreviousRun, int startCount,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics) {
|
||||
this.conf = conf;
|
||||
this.clock = clock;
|
||||
|
@ -243,10 +263,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
// have a convention that none of the overrides depends on any
|
||||
// fields that need initialization.
|
||||
maxAttempts = getMaxAttempts();
|
||||
taskId = recordFactory.newRecordInstance(TaskId.class);
|
||||
taskId.setJobId(jobId);
|
||||
taskId.setId(partition);
|
||||
taskId.setTaskType(taskType);
|
||||
taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType);
|
||||
this.partition = partition;
|
||||
this.taskAttemptListener = taskAttemptListener;
|
||||
this.eventHandler = eventHandler;
|
||||
|
@ -255,17 +272,37 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
this.jobToken = jobToken;
|
||||
this.metrics = metrics;
|
||||
|
||||
// See if this is from a previous generation.
|
||||
if (completedTasksFromPreviousRun != null
|
||||
&& completedTasksFromPreviousRun.contains(taskId)) {
|
||||
&& completedTasksFromPreviousRun.containsKey(taskId)) {
|
||||
// This task has TaskAttempts from previous generation. We have to replay
|
||||
// them.
|
||||
LOG.info("Task is from previous run " + taskId);
|
||||
startCount = startCount - 1;
|
||||
TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
|
||||
Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
|
||||
taskInfo.getAllTaskAttempts();
|
||||
taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
|
||||
taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
|
||||
Collections.sort(taskAttemptsFromPreviousGeneration,
|
||||
RECOVERED_ATTEMPTS_COMPARATOR);
|
||||
}
|
||||
|
||||
//attempt ids are generated based on MR app startCount so that attempts
|
||||
//from previous lives don't overstep the current one.
|
||||
//this assumes that a task won't have more than 1000 attempts in its single
|
||||
//life
|
||||
nextAttemptNumber = (startCount - 1) * 1000;
|
||||
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
|
||||
// All the previous attempts are exhausted, now start with a new
|
||||
// generation.
|
||||
|
||||
// All the new TaskAttemptIDs are generated based on MR
|
||||
// ApplicationAttemptID so that attempts from previous lives don't
|
||||
// over-step the current one. This assumes that a task won't have more
|
||||
// than 1000 attempts in its single generation, which is very reasonable.
|
||||
// Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
|
||||
// and requires serious medical attention.
|
||||
nextAttemptNumber = (startCount - 1) * 1000;
|
||||
} else {
|
||||
// There are still some TaskAttempts from previous generation, use them
|
||||
nextAttemptNumber =
|
||||
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
||||
}
|
||||
|
||||
// This "this leak" is okay because the retained pointer is in an
|
||||
// instance variable.
|
||||
|
@ -390,17 +427,23 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
|
||||
//this is always called in read/write lock
|
||||
private long getLaunchTime() {
|
||||
long launchTime = 0;
|
||||
long taskLaunchTime = 0;
|
||||
boolean launchTimeSet = false;
|
||||
for (TaskAttempt at : attempts.values()) {
|
||||
//select the least launch time of all attempts
|
||||
if (launchTime == 0 || launchTime > at.getLaunchTime()) {
|
||||
launchTime = at.getLaunchTime();
|
||||
// select the least launch time of all attempts
|
||||
long attemptLaunchTime = at.getLaunchTime();
|
||||
if (attemptLaunchTime != 0 && !launchTimeSet) {
|
||||
// For the first non-zero launch time
|
||||
launchTimeSet = true;
|
||||
taskLaunchTime = attemptLaunchTime;
|
||||
} else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
|
||||
taskLaunchTime = attemptLaunchTime;
|
||||
}
|
||||
}
|
||||
if (launchTime == 0) {
|
||||
if (!launchTimeSet) {
|
||||
return this.scheduledTime;
|
||||
}
|
||||
return launchTime;
|
||||
return taskLaunchTime;
|
||||
}
|
||||
|
||||
//this is always called in read/write lock
|
||||
|
@ -525,7 +568,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
attempts.put(attempt.getID(), attempt);
|
||||
break;
|
||||
}
|
||||
++nextAttemptNumber;
|
||||
|
||||
// Update nextATtemptNumber
|
||||
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
|
||||
++nextAttemptNumber;
|
||||
} else {
|
||||
// There are still some TaskAttempts from previous generation, use them
|
||||
nextAttemptNumber =
|
||||
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
||||
}
|
||||
|
||||
++numberUncompletedAttempts;
|
||||
//schedule the nextAttemptNumber
|
||||
if (failedAttempts > 0) {
|
||||
|
|
|
@ -19,8 +19,9 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.recover;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
|
@ -32,7 +33,7 @@ public interface Recovery {
|
|||
|
||||
Clock getClock();
|
||||
|
||||
Set<TaskId> getCompletedTasks();
|
||||
Map<TaskId, TaskInfo> getCompletedTasks();
|
||||
|
||||
List<AMInfo> getAMInfos();
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||
|
@ -153,8 +153,8 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Set<TaskId> getCompletedTasks() {
|
||||
return completedTasks.keySet();
|
||||
public Map<TaskId, TaskInfo> getCompletedTasks() {
|
||||
return completedTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,7 +189,8 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
getConfig());
|
||||
//read the previous history file
|
||||
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
|
||||
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
|
||||
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
|
||||
LOG.info("History file is at " + historyFile);
|
||||
in = fc.open(historyFile);
|
||||
JobHistoryParser parser = new JobHistoryParser(in);
|
||||
jobInfo = parser.parse();
|
||||
|
@ -242,7 +243,7 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
|
||||
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
|
||||
.getTaskAttemptID());
|
||||
LOG.info("Attempt start time " + attInfo.getStartTime());
|
||||
LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
|
||||
clock.setTime(attInfo.getStartTime());
|
||||
|
||||
} else if (event.getType() == TaskAttemptEventType.TA_DONE
|
||||
|
@ -250,7 +251,7 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
|| event.getType() == TaskAttemptEventType.TA_KILL) {
|
||||
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
|
||||
.getTaskAttemptID());
|
||||
LOG.info("Attempt finish time " + attInfo.getFinishTime());
|
||||
LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
|
||||
clock.setTime(attInfo.getFinishTime());
|
||||
}
|
||||
|
||||
|
@ -380,17 +381,17 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
}
|
||||
|
||||
// send the done event
|
||||
LOG.info("Sending done event to " + aId);
|
||||
LOG.info("Sending done event to recovered attempt " + aId);
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
break;
|
||||
case KILLED:
|
||||
LOG.info("Sending kill event to " + aId);
|
||||
LOG.info("Sending kill event to recovered attempt " + aId);
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
break;
|
||||
default:
|
||||
LOG.info("Sending fail event to " + aId);
|
||||
LOG.info("Sending fail event to recovered attempt " + aId);
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_FAILMSG));
|
||||
break;
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
|||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
|
||||
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
@ -74,7 +76,14 @@ public class TestRecovery {
|
|||
private Text val1 = new Text("val1");
|
||||
private Text val2 = new Text("val2");
|
||||
|
||||
|
||||
/**
|
||||
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
|
||||
* completely disappears because of failed launch, one attempt gets killed and
|
||||
* one attempt succeeds. AM crashes after the first tasks finishes and
|
||||
* recovers completely and succeeds in the second generation.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCrashed() throws Exception {
|
||||
|
||||
|
@ -112,7 +121,8 @@ public class TestRecovery {
|
|||
// reduces must be in NEW state
|
||||
Assert.assertEquals("Reduce Task state not correct",
|
||||
TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
||||
|
||||
|
||||
/////////// Play some games with the TaskAttempts of the first task //////
|
||||
//send the fail signal to the 1st map task attempt
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
|
@ -120,29 +130,31 @@ public class TestRecovery {
|
|||
TaskAttemptEventType.TA_FAILMSG));
|
||||
|
||||
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
|
||||
|
||||
while (mapTask1.getAttempts().size() != 2) {
|
||||
|
||||
int timeOut = 0;
|
||||
while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
|
||||
Thread.sleep(2000);
|
||||
LOG.info("Waiting for next attempt to start");
|
||||
}
|
||||
Assert.assertEquals(2, mapTask1.getAttempts().size());
|
||||
Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
|
||||
itr.next();
|
||||
TaskAttempt task1Attempt2 = itr.next();
|
||||
|
||||
app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
|
||||
|
||||
//send the kill signal to the 1st map 2nd attempt
|
||||
// This attempt will automatically fail because of the way ContainerLauncher
|
||||
// is setup
|
||||
// This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
task1Attempt2.getID(),
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
|
||||
app.waitForState(task1Attempt2, TaskAttemptState.KILLED);
|
||||
|
||||
while (mapTask1.getAttempts().size() != 3) {
|
||||
new TaskAttemptEvent(task1Attempt2.getID(),
|
||||
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
|
||||
app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
|
||||
|
||||
timeOut = 0;
|
||||
while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
|
||||
Thread.sleep(2000);
|
||||
LOG.info("Waiting for next attempt to start");
|
||||
}
|
||||
Assert.assertEquals(3, mapTask1.getAttempts().size());
|
||||
itr = mapTask1.getAttempts().values().iterator();
|
||||
itr.next();
|
||||
itr.next();
|
||||
|
@ -150,12 +162,36 @@ public class TestRecovery {
|
|||
|
||||
app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
|
||||
|
||||
//send the done signal to the 1st map 3rd attempt
|
||||
//send the kill signal to the 1st map 3rd attempt
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
task1Attempt3.getID(),
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
|
||||
app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
|
||||
|
||||
timeOut = 0;
|
||||
while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
|
||||
Thread.sleep(2000);
|
||||
LOG.info("Waiting for next attempt to start");
|
||||
}
|
||||
Assert.assertEquals(4, mapTask1.getAttempts().size());
|
||||
itr = mapTask1.getAttempts().values().iterator();
|
||||
itr.next();
|
||||
itr.next();
|
||||
itr.next();
|
||||
TaskAttempt task1Attempt4 = itr.next();
|
||||
|
||||
app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
|
||||
|
||||
//send the done signal to the 1st map 4th attempt
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
task1Attempt4.getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
/////////// End of games with the TaskAttempts of the first task //////
|
||||
|
||||
//wait for first map task to complete
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
long task1StartTime = mapTask1.getReport().getStartTime();
|
||||
|
@ -241,6 +277,136 @@ public class TestRecovery {
|
|||
// available in the failed attempt should be available here
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleCrashes() throws Exception {
|
||||
|
||||
int runCount = 0;
|
||||
MRApp app =
|
||||
new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
|
||||
++runCount);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean("mapred.mapper.new-api", true);
|
||||
conf.setBoolean("mapred.reducer.new-api", true);
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
//all maps would be running
|
||||
Assert.assertEquals("No of tasks not correct",
|
||||
3, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task mapTask1 = it.next();
|
||||
Task mapTask2 = it.next();
|
||||
Task reduceTask = it.next();
|
||||
|
||||
// all maps must be running
|
||||
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||
|
||||
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
|
||||
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
||||
|
||||
//before sending the TA_DONE, event make sure attempt has come to
|
||||
//RUNNING state
|
||||
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
|
||||
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
||||
|
||||
// reduces must be in NEW state
|
||||
Assert.assertEquals("Reduce Task state not correct",
|
||||
TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
||||
|
||||
//send the done signal to the 1st map
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
task1Attempt1.getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
//wait for first map task to complete
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
|
||||
// Crash the app
|
||||
app.stop();
|
||||
|
||||
//rerun
|
||||
//in rerun the 1st map will be recovered from previous run
|
||||
app =
|
||||
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||
++runCount);
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||
conf.setBoolean("mapred.mapper.new-api", true);
|
||||
conf.setBoolean("mapred.reducer.new-api", true);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
//all maps would be running
|
||||
Assert.assertEquals("No of tasks not correct",
|
||||
3, job.getTasks().size());
|
||||
it = job.getTasks().values().iterator();
|
||||
mapTask1 = it.next();
|
||||
mapTask2 = it.next();
|
||||
reduceTask = it.next();
|
||||
|
||||
// first map will be recovered, no need to send done
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
|
||||
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||
|
||||
task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
||||
//before sending the TA_DONE, event make sure attempt has come to
|
||||
//RUNNING state
|
||||
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
||||
|
||||
//send the done signal to the 2nd map task
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
mapTask2.getAttempts().values().iterator().next().getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
//wait to get it completed
|
||||
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||
|
||||
// Crash the app again.
|
||||
app.stop();
|
||||
|
||||
//rerun
|
||||
//in rerun the 1st and 2nd map will be recovered from previous run
|
||||
app =
|
||||
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||
++runCount);
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||
conf.setBoolean("mapred.mapper.new-api", true);
|
||||
conf.setBoolean("mapred.reducer.new-api", true);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
//all maps would be running
|
||||
Assert.assertEquals("No of tasks not correct",
|
||||
3, job.getTasks().size());
|
||||
it = job.getTasks().values().iterator();
|
||||
mapTask1 = it.next();
|
||||
mapTask2 = it.next();
|
||||
reduceTask = it.next();
|
||||
|
||||
// The maps will be recovered, no need to send done
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||
|
||||
//wait for reduce to be running before sending done
|
||||
app.waitForState(reduceTask, TaskState.RUNNING);
|
||||
//send the done signal to the reduce
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
reduceTask.getAttempts().values().iterator().next().getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
app.verifyCompleted();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutputRecovery() throws Exception {
|
||||
int runCount = 0;
|
||||
|
@ -552,7 +718,7 @@ public class TestRecovery {
|
|||
}
|
||||
|
||||
|
||||
class MRAppWithHistory extends MRApp {
|
||||
static class MRAppWithHistory extends MRApp {
|
||||
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
||||
String testName, boolean cleanOnStart, int startCount) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
|
||||
|
@ -567,7 +733,17 @@ public class TestRecovery {
|
|||
|
||||
@Override
|
||||
protected ContainerLauncher createContainerLauncher(AppContext context) {
|
||||
MockContainerLauncher launcher = new MockContainerLauncher();
|
||||
MockContainerLauncher launcher = new MockContainerLauncher() {
|
||||
@Override
|
||||
public void handle(ContainerLauncherEvent event) {
|
||||
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
||||
// Pass everything except the 2nd attempt of the first task.
|
||||
if (taskAttemptID.getId() != 1
|
||||
|| taskAttemptID.getTaskId().getId() != 0) {
|
||||
super.handle(event);
|
||||
}
|
||||
}
|
||||
};
|
||||
launcher.shufflePort = 5467;
|
||||
return launcher;
|
||||
}
|
||||
|
@ -581,7 +757,7 @@ public class TestRecovery {
|
|||
}
|
||||
}
|
||||
|
||||
class RecoveryServiceWithCustomDispatcher extends RecoveryService {
|
||||
static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
|
||||
|
||||
public RecoveryServiceWithCustomDispatcher(
|
||||
ApplicationAttemptId applicationAttemptId, Clock clock,
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf;
|
|||
import org.apache.hadoop.mapred.Task;
|
||||
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
|
@ -72,7 +73,7 @@ public class TestTaskImpl {
|
|||
private Path remoteJobConfFile;
|
||||
private Collection<Token<? extends TokenIdentifier>> fsTokens;
|
||||
private Clock clock;
|
||||
private Set<TaskId> completedTasksFromPreviousRun;
|
||||
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
||||
private MRAppMetrics metrics;
|
||||
private TaskImpl mockTask;
|
||||
private ApplicationId appId;
|
||||
|
@ -96,7 +97,7 @@ public class TestTaskImpl {
|
|||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
||||
Set<TaskId> completedTasksFromPreviousRun, int startCount,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics) {
|
||||
super(jobId, taskType , partition, eventHandler,
|
||||
remoteJobConfFile, conf, taskAttemptListener, committer,
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TypeConverter {
|
||||
|
||||
private static RecordFactory recordFactory;
|
||||
|
@ -116,8 +115,8 @@ public class TypeConverter {
|
|||
}
|
||||
|
||||
public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) {
|
||||
return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()),
|
||||
id.getId());
|
||||
return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()),
|
||||
fromYarn(id.getTaskType()), id.getId());
|
||||
}
|
||||
|
||||
public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) {
|
||||
|
|
|
@ -1192,13 +1192,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|||
try {
|
||||
Path jobInfoFile = getSystemFileForJob(jobId);
|
||||
FSDataInputStream in = fs.open(jobInfoFile);
|
||||
JobInfo token = new JobInfo();
|
||||
final JobInfo token = new JobInfo();
|
||||
token.readFields(in);
|
||||
in.close();
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(token.getUser().toString());
|
||||
submitJob(token.getJobID(), restartCount,
|
||||
ugi, token.getJobSubmitDir().toString(), true, null);
|
||||
final UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(token.getUser().toString());
|
||||
ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
|
||||
public JobStatus run() throws IOException ,InterruptedException{
|
||||
return submitJob(token.getJobID(), restartCount,
|
||||
ugi, token.getJobSubmitDir().toString(), true, null);
|
||||
}});
|
||||
|
||||
recovered++;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Could not recover job " + jobId, e);
|
||||
|
|
Loading…
Reference in New Issue