svn merge -c 1214066 from trunk for MAPREDUCE-3545.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1230880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-13 03:26:08 +00:00
parent 4b5d4bb1fc
commit 66f4e8b694
13 changed files with 4 additions and 582 deletions

View File

@ -2,6 +2,10 @@ Hadoop MapReduce Change Log
Release 0.23-PB - Unreleased Release 0.23-PB - Unreleased
INCOMPATIBLE CHANGES
MAPREDUCE-3545. Remove Avro RPC. (suresh)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols

View File

@ -759,8 +759,6 @@ public class TestRuntimeEstimators {
} }
class MyAppContext implements AppContext { class MyAppContext implements AppContext {
// I'll be making Avro objects by hand. Please don't do that very often.
private final ApplicationAttemptId myAppAttemptID; private final ApplicationAttemptId myAppAttemptID;
private final ApplicationId myApplicationID; private final ApplicationId myApplicationID;
private final JobId myJobID; private final JobId myJobID;

View File

@ -1,153 +0,0 @@
@namespace("org.apache.hadoop.mapreduce.v2.api")
protocol MRClientProtocol {
import idl "./yarn/yarn-api/src/main/avro/yarn-types.genavro";
enum TaskType {
MAP,
REDUCE
}
record JobID {
org.apache.hadoop.yarn.ApplicationID appID;
int id;
}
record TaskID {
JobID jobID;
TaskType taskType;
int id;
}
record TaskAttemptID {
TaskID taskID;
int id;
}
enum TaskState {
NEW,
SCHEDULED,
RUNNING,
SUCCEEDED,
FAILED,
KILL_WAIT,
KILLED
}
enum Phase {
STARTING,
MAP,
SHUFFLE,
SORT,
REDUCE,
CLEANUP
}
record Counter {
string name;
string displayName;
long value;
}
record CounterGroup {
string name;
string displayname;
map<Counter> counters;
}
record Counters {
map<CounterGroup> groups;
}
record TaskReport {
TaskID id;
TaskState state;
float progress;
long startTime;
long finishTime;
Counters counters;
array<TaskAttemptID> runningAttempts;
union{TaskAttemptID, null} successfulAttempt;
array<string> diagnostics;
}
enum TaskAttemptState {
NEW,
UNASSIGNED,
ASSIGNED,
RUNNING,
COMMIT_PENDING,
SUCCESS_CONTAINER_CLEANUP,
SUCCEEDED,
FAIL_CONTAINER_CLEANUP,
FAIL_TASK_CLEANUP,
FAILED,
KILL_CONTAINER_CLEANUP,
KILL_TASK_CLEANUP,
KILLED
}
record TaskAttemptReport {
TaskAttemptID id;
TaskAttemptState state;
float progress;
long startTime;
long finishTime;
Counters counters;
string diagnosticInfo;
string stateString;
Phase phase;
}
enum JobState {
NEW,
INITED,
RUNNING,
SUCCEEDED,
FAILED,
KILL_WAIT,
KILLED,
ERROR
}
record JobReport {
JobID id;
JobState state;
float mapProgress;
float reduceProgress;
float cleanupProgress;
float setupProgress;
long startTime;
long finishTime;
}
enum TaskAttemptCompletionEventStatus {
FAILED,
KILLED,
SUCCEEDED,
OBSOLETE,
TIPFAILED
}
record TaskAttemptCompletionEvent {
TaskAttemptID attemptId;
TaskAttemptCompletionEventStatus status;
string mapOutputServerAddress;
int attemptRunTime;
int eventId;
}
JobReport getJobReport(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
TaskReport getTaskReport(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
Counters getCounters(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
array<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID, int fromEventId, int maxEvents) throws org.apache.hadoop.yarn.YarnRemoteException;
array<TaskReport> getTaskReports(JobID jobID, TaskType taskType) throws org.apache.hadoop.yarn.YarnRemoteException;
array<string> getDiagnostics(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
void killJob(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
void killTask(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
void killTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
void failTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
}

View File

@ -1,27 +0,0 @@
@namespace("org.apache.hadoop.yarn")
protocol AMRMProtocol {
import idl "yarn/yarn-api/src/main/avro/yarn-types.genavro";
// Scheduler
record Priority {
int priority;
}
record ResourceRequest {
Priority priority;
string hostName;
Resource capability;
int numContainers;
}
record AMResponse {
boolean reboot;
int responseId;
array<Container> containers;
}
void registerApplicationMaster(ApplicationMaster applicationMaster) throws YarnRemoteException;
void finishApplicationMaster(ApplicationMaster applicationMaster) throws YarnRemoteException;
AMResponse allocate(ApplicationStatus status, array<ResourceRequest> ask, array<Container> release) throws YarnRemoteException;
}

View File

@ -1,45 +0,0 @@
@namespace("org.apache.hadoop.yarn")
protocol ClientRMProtocol {
import idl "yarn/yarn-api/src/main/avro/yarn-types.genavro";
record Priority {
int priority;
}
record ApplicationSubmissionContext {
ApplicationID applicationId;
union {null, string} applicationName;
Resource masterCapability; // TODO: Needs RM validation
//all the files required by the container to run the ApplicationMaster
//KEY-> destination dir name
//VALUE-> source path
map<URL> resources;
union {null, map<LocalResource>} resources_todo;
// TODO - Remove fsTokens (url encoded)
union {null, array<string>} fsTokens;
union {null, bytes} fsTokens_todo;
//env to be set before launching the command for ApplicationMaster
//KEY-> env variable name
//VALUE -> env variable value.
map<string> environment;
//command-line of the container that is going to launch the ApplicationMaster.
array<string> command;
union {null, string} queue;
union {null, Priority} priority;
string user; // TODO: Shouldn't pass it like this.
}
record YarnClusterMetrics {
int numNodeManagers;
}
ApplicationID getNewApplicationId() throws YarnRemoteException;
ApplicationMaster getApplicationMaster(ApplicationID applicationId) throws YarnRemoteException;
void submitApplication(ApplicationSubmissionContext context) throws YarnRemoteException;
void finishApplication(ApplicationID applicationId) throws YarnRemoteException;
YarnClusterMetrics getClusterMetrics() throws YarnRemoteException;
}

View File

@ -1,37 +0,0 @@
@namespace("org.apache.hadoop.yarn")
protocol ContainerManager {
import idl "yarn/yarn-api/src/main/avro/yarn-types.genavro";
record ContainerLaunchContext {
ContainerID id;
string user; // TODO: Shouldn't pass it like this.
Resource resource; // TODO: Needs RM validation
union {null, map<LocalResource>} resources;
union {null, bytes} containerTokens; // FileSystem related and other application specific tokens.
union {null, map<bytes>} serviceData;
//env to be set before launching the command
//KEY-> env variable name
//VALUE -> env variable value.
map<string> env;
//commandline to launch the container. All resources are downloaded in the
//working directory of the command.
array<string> command;
}
record ContainerStatus {
ContainerID containerID;
ContainerState state;
int exitStatus;
}
void startContainer(ContainerLaunchContext container) throws YarnRemoteException;
void stopContainer(ContainerID containerID) throws YarnRemoteException;
void cleanupContainer(ContainerID containerID) throws YarnRemoteException;
ContainerStatus getContainerStatus(ContainerID containerID) throws YarnRemoteException;
}

View File

@ -1,109 +0,0 @@
@namespace("org.apache.hadoop.yarn")
protocol types {
record ApplicationID {
int id;
long clusterTimeStamp;
}
record ContainerID {
ApplicationID appID; // the application id to which this container belong.
int id;// unique string for this application
}
error YarnRemoteException {
union { null, string } message;
union { null, string } trace; //stackTrace
union { null, YarnRemoteException } cause;
}
record Resource {
int memory;
//int diskspace;
}
// State of the container on the ContainerManager.
enum ContainerState {
INTIALIZING,
RUNNING,
COMPLETE
}
record ContainerToken {
bytes identifier;
bytes password;
string kind;
string service;
}
record Container {
ContainerID id;
string hostName;
Resource resource;
ContainerState state;
union {ContainerToken, null} containerToken;
}
enum ApplicationState {
PENDING,
ALLOCATING,
ALLOCATED,
EXPIRED_PENDING,
LAUNCHING,
LAUNCHED,
RUNNING,
PAUSED,
CLEANUP,
COMPLETED,
KILLED,
FAILED
}
record ApplicationStatus {
int responseID; // TODO: This should be renamed as previousResponseID
ApplicationID applicationId;
float progress;
long lastSeen;
}
record ApplicationMaster {
ApplicationID applicationId;
union { null, string } host;
int rpcPort;
int httpPort;
ApplicationStatus status;
ApplicationState state;
union { null, string } clientToken;
}
record URL {
string scheme;
union { null, string } host;
int port;
string file;
}
enum LocalResourceVisibility {
// accessible to applications from all users
PUBLIC,
// accessible only to applications from the submitting user
PRIVATE,
// accessible only to this application
APPLICATION
}
enum LocalResourceType {
// an archive to be expanded
ARCHIVE,
// uninterpreted file
FILE
}
record LocalResource {
URL resource;
long size;
long timestamp;
LocalResourceType type;
LocalResourceVisibility state;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.security.UserGroupInformation;
* This is the API for the applications comprising of constants that YARN sets * This is the API for the applications comprising of constants that YARN sets
* up for the applications and the containers. * up for the applications and the containers.
* *
* TODO: Should also be defined in avro/pb IDLs
* TODO: Investigate the semantics and security of each cross-boundary refs. * TODO: Investigate the semantics and security of each cross-boundary refs.
*/ */
public interface ApplicationConstants { public interface ApplicationConstants {

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.AvroSpecificRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
/**
* This uses Hadoop RPC. Uses a tunnel AvroSpecificRpcEngine over
* Hadoop connection.
* This does not give cross-language wire compatibility, since the Hadoop
* RPC wire format is non-standard, but it does permit use of Avro's protocol
* versioning features for inter-Java RPCs.
*/
public class HadoopYarnRPC extends YarnRPC {
private static final Log LOG = LogFactory.getLog(HadoopYarnRPC.class);
@Override
public Object getProxy(Class protocol, InetSocketAddress addr,
Configuration conf) {
LOG.debug("Creating a HadoopYarnRpc proxy for protocol " + protocol);
RPC.setProtocolEngine(conf, protocol, AvroSpecificRpcEngine.class);
try {
return RPC.getProxy(protocol, 1, addr, conf);
} catch (IOException e) {
throw new YarnException(e);
}
}
@Override
public void stopProxy(Object proxy, Configuration conf) {
RPC.stopProxy(proxy);
}
@Override
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers) {
LOG.debug("Creating a HadoopYarnRpc server for protocol " + protocol +
" with " + numHandlers + " handlers");
RPC.setProtocolEngine(conf, protocol, AvroSpecificRpcEngine.class);
final RPC.Server hadoopServer;
try {
hadoopServer = RPC.getServer(protocol, instance, addr.getHostName(),
addr.getPort(), numHandlers, false, conf, secretManager);
} catch (IOException e) {
throw new YarnException(e);
}
return hadoopServer;
}
}

View File

@ -58,16 +58,6 @@ public class TestRPC {
private static final String EXCEPTION_CAUSE = "exception cause"; private static final String EXCEPTION_CAUSE = "exception cause";
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
// @Test
// public void testAvroRPC() throws Exception {
// test(AvroYarnRPC.class.getName());
// }
//
// @Test
// public void testHadoopNativeRPC() throws Exception {
// test(HadoopYarnRPC.class.getName());
// }
@Test @Test
public void testUnknownCall() { public void testUnknownCall() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -1,40 +0,0 @@
@namespace("org.apache.hadoop.yarn")
protocol ResourceTracker {
import idl "yarn/yarn-api/src/main/avro/yarn-types.genavro";
// ResourceTracker
record NodeID {
int id;
}
record NodeHealthStatus {
boolean isNodeHealthy;
union {string, null} healthReport;
long lastHealthReportTime;
}
record NodeStatus {
NodeID nodeId;
int responseId;
long lastSeen;
map<array<org.apache.hadoop.yarn.Container>> containers;
NodeHealthStatus nodeHealthStatus;
}
record RegistrationResponse {
NodeID nodeID;
union {bytes, null} secretKey;
}
record HeartbeatResponse {
int responseId;
boolean reboot;
array<org.apache.hadoop.yarn.Container> containersToCleanup;
array<org.apache.hadoop.yarn.ApplicationID> appplicationsToCleanup;
}
RegistrationResponse registerNodeManager(string node, org.apache.hadoop.yarn.Resource resource) throws YarnRemoteException;
HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus) throws YarnRemoteException;
}

View File

@ -1,11 +0,0 @@
@namespace("org.apache.hadoop.yarn")
protocol LocalizationProtocol {
import idl "yarn/yarn-api/src/main/avro/yarn-types.genavro";
void successfulLocalization(string user, LocalResource resource, URL path)
throws YarnRemoteException;
void failedLocalization(string user, LocalResource resource, YarnRemoteException path)
throws YarnRemoteException;
}

View File

@ -72,37 +72,6 @@
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<version>2.4.0a</version> <version>2.4.0a</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.5.3</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<artifactId>paranamer-ant</artifactId>
<groupId>com.thoughtworks.paranamer</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
@ -186,38 +155,6 @@
<artifactId>clover</artifactId> <artifactId>clover</artifactId>
<version>3.0.2</version> <version>3.0.2</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.5.3</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<artifactId>paranamer-ant</artifactId>
<groupId>com.thoughtworks.paranamer</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
@ -228,10 +165,6 @@
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<exclusions> <exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>commons-el</groupId> <groupId>commons-el</groupId>
<artifactId>commons-el</artifactId> <artifactId>commons-el</artifactId>