From fd5a762df0ec42a1ffae2990f26864c70ceefd66 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 12 Jul 2011 17:10:59 +0000 Subject: [PATCH] MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a ServiceProvider for the actual implementation. Contributed by Tom White. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1145679 13f79535-47bb-0310-9956-ffa450edef68 --- mapreduce/CHANGES.txt | 3 + mapreduce/build.xml | 1 + ....mapreduce.protocol.ClientProtocolProvider | 15 +++++ .../JobTrackerClientProtocolProvider.java | 67 +++++++++++++++++++ .../mapred/LocalClientProtocolProvider.java | 57 ++++++++++++++++ .../org/apache/hadoop/mapreduce/Cluster.java | 51 +++++++------- .../org/apache/hadoop/mapreduce/MRConfig.java | 2 + .../protocol/ClientProtocolProvider.java | 37 ++++++++++ 8 files changed, 205 insertions(+), 28 deletions(-) create mode 100644 mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider create mode 100644 mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java create mode 100644 mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java create mode 100644 mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 28d4619129c..ee6fba9e05c 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -37,6 +37,9 @@ Trunk (unreleased changes) IMPROVEMENTS + MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a + ServiceProvider for the actual implementation. (tomwhite via acmurthy) + MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk) MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to diff --git a/mapreduce/build.xml b/mapreduce/build.xml index 8474e795a59..c2ad71ffdd0 100644 --- a/mapreduce/build.xml +++ b/mapreduce/build.xml @@ -416,6 +416,7 @@ + diff --git a/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider new file mode 100644 index 00000000000..1a54e30048d --- /dev/null +++ b/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider @@ -0,0 +1,15 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.hadoop.mapred.JobTrackerClientProtocolProvider +org.apache.hadoop.mapred.LocalClientProtocolProvider diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java new file mode 100644 index 00000000000..42c958d77c1 --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +@InterfaceAudience.Private +public class JobTrackerClientProtocolProvider extends ClientProtocolProvider { + + @Override + public ClientProtocol create(Configuration conf) throws IOException { + String framework = conf.get(MRConfig.FRAMEWORK_NAME); + if (framework != null && !framework.equals("classic")) { + return null; + } + String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local"); + if (!"local".equals(tracker)) { + return createRPCProxy(JobTracker.getAddress(conf), conf); + } + return null; + } + + @Override + public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { + return createRPCProxy(addr, conf); + } + + private ClientProtocol createRPCProxy(InetSocketAddress addr, + Configuration conf) throws IOException { + return (ClientProtocol) RPC.getProxy(ClientProtocol.class, + ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), + conf, NetUtils.getSocketFactory(conf, ClientProtocol.class)); + } + + @Override + public void close(ClientProtocol clientProtocol) throws IOException { + RPC.stopProxy(clientProtocol); + } + +} diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java b/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java new file mode 100644 index 00000000000..68d10bc4d00 --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; + +@InterfaceAudience.Private +public class LocalClientProtocolProvider extends ClientProtocolProvider { + + @Override + public ClientProtocol create(Configuration conf) throws IOException { + String framework = conf.get(MRConfig.FRAMEWORK_NAME); + if (framework != null && !framework.equals("local")) { + return null; + } + if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) { + conf.setInt("mapreduce.job.maps", 1); + return new LocalJobRunner(conf); + } + return null; + } + + @Override + public ClientProtocol create(InetSocketAddress addr, Configuration conf) { + return null; // LocalJobRunner doesn't use a socket + } + + @Override + public void close(ClientProtocol clientProtocol) { + // no clean up required + } + +} diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java index 1f4067b9e6f..fcc0ee1c638 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.ServiceLoader; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -30,14 +31,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobTracker; -import org.apache.hadoop.mapred.LocalJobRunner; import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.State; import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.net.NetUtils; @@ -56,6 +56,7 @@ public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; + private ClientProtocolProvider clientProtocolProvider; private ClientProtocol client; private UserGroupInformation ugi; private Configuration conf; @@ -71,35 +72,30 @@ public class Cluster { public Cluster(Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); - client = createClient(conf); + for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) { + ClientProtocol clientProtocol = provider.create(conf); + if (clientProtocol != null) { + clientProtocolProvider = provider; + client = clientProtocol; + break; + } + } } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); - client = createRPCProxy(jobTrackAddr, conf); - } - - private ClientProtocol createRPCProxy(InetSocketAddress addr, - Configuration conf) throws IOException { - return (ClientProtocol) RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, addr, ugi, conf, - NetUtils.getSocketFactory(conf, ClientProtocol.class)); - } - - private ClientProtocol createClient(Configuration conf) throws IOException { - ClientProtocol client; - String tracker = conf.get("mapreduce.jobtracker.address", "local"); - if ("local".equals(tracker)) { - conf.setInt("mapreduce.job.maps", 1); - client = new LocalJobRunner(conf); - } else { - client = createRPCProxy(JobTracker.getAddress(conf), conf); + for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) { + ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf); + if (clientProtocol != null) { + clientProtocolProvider = provider; + client = clientProtocol; + break; + } } - return client; } - + ClientProtocol getClient() { return client; } @@ -112,9 +108,7 @@ public class Cluster { * Close the Cluster. */ public synchronized void close() throws IOException { - if (!(client instanceof LocalJobRunner)) { - RPC.stopProxy(client); - } + clientProtocolProvider.close(client); } private Job[] getJobs(JobStatus[] stats) throws IOException { @@ -353,7 +347,8 @@ public class Cluster { getDelegationToken(Text renewer) throws IOException, InterruptedException{ Token result = client.getDelegationToken(renewer); - InetSocketAddress addr = JobTracker.getAddress(conf); + InetSocketAddress addr = NetUtils.createSocketAddr( + conf.get(JTConfig.JT_IPC_ADDRESS, "localhost:8012")); StringBuilder service = new StringBuilder(); service.append(NetUtils.normalizeHostName(addr.getAddress(). getHostAddress())); diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java index 39df8b8e05b..9e88ea182ea 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -57,4 +57,6 @@ public interface MRConfig { "mapreduce.cluster.delegation.token.max-lifetime"; public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days + + public static final String FRAMEWORK_NAME = "mapreduce.framework.name"; } diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java new file mode 100644 index 00000000000..a9b4233448e --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.protocol; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +@InterfaceAudience.Private +public abstract class ClientProtocolProvider { + + public abstract ClientProtocol create(Configuration conf) throws IOException; + + public abstract ClientProtocol create(InetSocketAddress addr, + Configuration conf) throws IOException; + + public abstract void close(ClientProtocol clientProtocol) throws IOException; + +}