MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a ServiceProvider for the actual implementation. Contributed by Tom White.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1145679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-07-12 17:10:59 +00:00
parent 224972e055
commit fd5a762df0
8 changed files with 205 additions and 28 deletions

View File

@ -37,6 +37,9 @@ Trunk (unreleased changes)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a
ServiceProvider for the actual implementation. (tomwhite via acmurthy)
MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk) MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk)
MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to

View File

@ -416,6 +416,7 @@
<copy todir="${build.classes}"> <copy todir="${build.classes}">
<fileset dir="${mapred.src.dir}" includes="**/*.properties"/> <fileset dir="${mapred.src.dir}" includes="**/*.properties"/>
<fileset dir="${mapred.src.dir}" includes="**/META-INF/services/*"/>
<fileset dir="${mapred.src.dir}" includes="mapred-default.xml"/> <fileset dir="${mapred.src.dir}" includes="mapred-default.xml"/>
<fileset dir="${mapred.src.dir}" includes="mapred-queues-default.xml"/> <fileset dir="${mapred.src.dir}" includes="mapred-queues-default.xml"/>
</copy> </copy>

View File

@ -0,0 +1,15 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.mapred.JobTrackerClientProtocolProvider
org.apache.hadoop.mapred.LocalClientProtocolProvider

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@InterfaceAudience.Private
public class JobTrackerClientProtocolProvider extends ClientProtocolProvider {
@Override
public ClientProtocol create(Configuration conf) throws IOException {
String framework = conf.get(MRConfig.FRAMEWORK_NAME);
if (framework != null && !framework.equals("classic")) {
return null;
}
String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
if (!"local".equals(tracker)) {
return createRPCProxy(JobTracker.getAddress(conf), conf);
}
return null;
}
@Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
return createRPCProxy(addr, conf);
}
private ClientProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
}
@Override
public void close(ClientProtocol clientProtocol) throws IOException {
RPC.stopProxy(clientProtocol);
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@InterfaceAudience.Private
public class LocalClientProtocolProvider extends ClientProtocolProvider {
@Override
public ClientProtocol create(Configuration conf) throws IOException {
String framework = conf.get(MRConfig.FRAMEWORK_NAME);
if (framework != null && !framework.equals("local")) {
return null;
}
if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
conf.setInt("mapreduce.job.maps", 1);
return new LocalJobRunner(conf);
}
return null;
}
@Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
return null; // LocalJobRunner doesn't use a socket
}
@Override
public void close(ClientProtocol clientProtocol) {
// no clean up required
}
}

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.ServiceLoader;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -30,14 +31,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.State; import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -56,6 +56,7 @@ public class Cluster {
@InterfaceStability.Evolving @InterfaceStability.Evolving
public static enum JobTrackerStatus {INITIALIZING, RUNNING}; public static enum JobTrackerStatus {INITIALIZING, RUNNING};
private ClientProtocolProvider clientProtocolProvider;
private ClientProtocol client; private ClientProtocol client;
private UserGroupInformation ugi; private UserGroupInformation ugi;
private Configuration conf; private Configuration conf;
@ -71,35 +72,30 @@ public class Cluster {
public Cluster(Configuration conf) throws IOException { public Cluster(Configuration conf) throws IOException {
this.conf = conf; this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
client = createClient(conf); for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
ClientProtocol clientProtocol = provider.create(conf);
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
break;
}
}
} }
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException { throws IOException {
this.conf = conf; this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
client = createRPCProxy(jobTrackAddr, conf); for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
} ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
if (clientProtocol != null) {
private ClientProtocol createRPCProxy(InetSocketAddress addr, clientProtocolProvider = provider;
Configuration conf) throws IOException { client = clientProtocol;
return (ClientProtocol) RPC.getProxy(ClientProtocol.class, break;
ClientProtocol.versionID, addr, ugi, conf, }
NetUtils.getSocketFactory(conf, ClientProtocol.class));
}
private ClientProtocol createClient(Configuration conf) throws IOException {
ClientProtocol client;
String tracker = conf.get("mapreduce.jobtracker.address", "local");
if ("local".equals(tracker)) {
conf.setInt("mapreduce.job.maps", 1);
client = new LocalJobRunner(conf);
} else {
client = createRPCProxy(JobTracker.getAddress(conf), conf);
} }
return client;
} }
ClientProtocol getClient() { ClientProtocol getClient() {
return client; return client;
} }
@ -112,9 +108,7 @@ public class Cluster {
* Close the <code>Cluster</code>. * Close the <code>Cluster</code>.
*/ */
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (!(client instanceof LocalJobRunner)) { clientProtocolProvider.close(client);
RPC.stopProxy(client);
}
} }
private Job[] getJobs(JobStatus[] stats) throws IOException { private Job[] getJobs(JobStatus[] stats) throws IOException {
@ -353,7 +347,8 @@ public class Cluster {
getDelegationToken(Text renewer) throws IOException, InterruptedException{ getDelegationToken(Text renewer) throws IOException, InterruptedException{
Token<DelegationTokenIdentifier> result = Token<DelegationTokenIdentifier> result =
client.getDelegationToken(renewer); client.getDelegationToken(renewer);
InetSocketAddress addr = JobTracker.getAddress(conf); InetSocketAddress addr = NetUtils.createSocketAddr(
conf.get(JTConfig.JT_IPC_ADDRESS, "localhost:8012"));
StringBuilder service = new StringBuilder(); StringBuilder service = new StringBuilder();
service.append(NetUtils.normalizeHostName(addr.getAddress(). service.append(NetUtils.normalizeHostName(addr.getAddress().
getHostAddress())); getHostAddress()));

View File

@ -57,4 +57,6 @@ public interface MRConfig {
"mapreduce.cluster.delegation.token.max-lifetime"; "mapreduce.cluster.delegation.token.max-lifetime";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days 7*24*60*60*1000; // 7 days
public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
} }

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.protocol;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
public abstract class ClientProtocolProvider {
public abstract ClientProtocol create(Configuration conf) throws IOException;
public abstract ClientProtocol create(InetSocketAddress addr,
Configuration conf) throws IOException;
public abstract void close(ClientProtocol clientProtocol) throws IOException;
}