Merging r1547121 through r1547473 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1547492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-12-03 17:33:46 +00:00
commit 674d51e62e
29 changed files with 1031 additions and 540 deletions

View File

@ -286,7 +286,7 @@ Trunk (Unreleased)
HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
Release 2.3.0 - UNRELEASED
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -455,7 +455,7 @@ Release 2.3.0 - UNRELEASED
HADOOP-10135 writes to swift fs over partition size leave temp files and
empty output file (David Dobbins via stevel)
Release 2.2.1 - UNRELEASED
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -519,6 +519,9 @@ Release 2.2.1 - UNRELEASED
HADOOP-10130. RawLocalFS::LocalFSFileInputStream.pread does not track
FS::Statistics (Binglin Chang via Colin Patrick McCabe)
HDFS-5560. Trash configuration log statements prints incorrect units.
(Josh Elser via Andrew Wang)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -90,8 +90,8 @@ public class TrashPolicyDefault extends TrashPolicy {
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
LOG.info("Namenode trash configuration: Deletion interval = " +
this.deletionInterval + " minutes, Emptier interval = " +
this.emptierInterval + " minutes.");
(this.deletionInterval / MSECS_PER_MINUTE) + " minutes, Emptier interval = " +
(this.emptierInterval / MSECS_PER_MINUTE) + " minutes.");
}
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {

View File

@ -421,7 +421,7 @@ Trunk (Unreleased)
HDFS-5562. TestCacheDirectives and TestFsDatasetCache should stub out
native mlock. (Colin McCabe and Akira Ajisaka via wang)
Release 2.3.0 - UNRELEASED
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -643,7 +643,7 @@ Release 2.3.0 - UNRELEASED
HDFS-5533. Symlink delete/create should be treated as DELETE/CREATE in snapshot diff
report. (Binglin Chang via jing9)
Release 2.2.1 - UNRELEASED
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -4060,6 +4060,12 @@ Release 0.23.10 - UNRELEASED
HDFS-5526. Datanode cannot roll back to previous layout version (kihwal)
HDFS-5557. Write pipeline recovery for the last packet in the block may
cause rejection of valid replicas. (kihwal)
HDFS-5558. LeaseManager monitor thread can crash if the last block is
complete but another block is not. (kihwal)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -844,7 +844,6 @@ public class DFSOutputStream extends FSOutputSummer
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) {
assert dataQueue.size() == 1;
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
assert endOfBlockPacket.lastPacketInBlock;
assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
@ -1056,7 +1055,7 @@ public class DFSOutputStream extends FSOutputSummer
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, newGS-1, isRecovery);
success = createBlockOutputStream(nodes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,

View File

@ -235,6 +235,8 @@ public class BlockInfoUnderConstruction extends BlockInfo {
* @param genStamp The final generation stamp for the block.
*/
public void setGenerationStampAndVerifyReplicas(long genStamp) {
// Set the generation stamp for the block.
setGenerationStamp(genStamp);
if (replicas == null)
return;
@ -244,12 +246,9 @@ public class BlockInfoUnderConstruction extends BlockInfo {
if (genStamp != r.getGenerationStamp()) {
r.getExpectedStorageLocation().removeBlock(this);
NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+ "from location: " + r);
+ "from location: " + r.getExpectedStorageLocation());
}
}
// Set the generation stamp for the block.
setGenerationStamp(genStamp);
}
/**
@ -264,6 +263,8 @@ public class BlockInfoUnderConstruction extends BlockInfo {
+ block.getBlockId() + ", expected id = " + getBlockId());
blockUCState = BlockUCState.COMMITTED;
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
// Sort out invalid replicas.
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
}
/**

View File

@ -1555,13 +1555,15 @@ public class BlockManager {
* Besides the block in question, it provides the ReplicaState
* reported by the datanode in the block report.
*/
private static class StatefulBlockInfo {
static class StatefulBlockInfo {
final BlockInfoUnderConstruction storedBlock;
final Block reportedBlock;
final ReplicaState reportedState;
StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
ReplicaState reportedState) {
Block reportedBlock, ReplicaState reportedState) {
this.storedBlock = storedBlock;
this.reportedBlock = reportedBlock;
this.reportedState = reportedState;
}
}
@ -1719,8 +1721,7 @@ public class BlockManager {
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node,
storage.getStorageID(), b.reportedState);
addStoredBlockUnderConstruction(b, node, storage.getStorageID());
}
for (Block b : toRemove) {
removeStoredBlock(b, node);
@ -1950,7 +1951,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo(
(BlockInfoUnderConstruction)storedBlock, reportedState));
(BlockInfoUnderConstruction)storedBlock, block, reportedState));
return storedBlock;
}
@ -2120,18 +2121,18 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
return false;
}
}
void addStoredBlockUnderConstruction(
BlockInfoUnderConstruction block,
DatanodeDescriptor node, String storageID,
ReplicaState reportedState)
throws IOException {
block.addReplicaIfNotPresent(node.getStorageInfo(storageID), block, reportedState);
if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
DatanodeDescriptor node, String storageID) throws IOException {
BlockInfoUnderConstruction block = ucBlock.storedBlock;
block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
ucBlock.reportedBlock, ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
addStoredBlock(block, node, storageID, null, true);
}
}
}
/**
* Faster version of
* {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
@ -2702,7 +2703,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
: "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, storageID, b.reportedState);
addStoredBlockUnderConstruction(b, node, storageID);
}
long numBlocksLogged = 0;
for (BlockInfo b : toAdd) {

View File

@ -2923,6 +2923,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
if (!checkFileProgress(pendingFile, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, last);
@ -2991,7 +2997,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
//
BlockInfo b = v.getPenultimateBlock();
if (b != null && !b.isComplete()) {
LOG.info("BLOCK* checkFileProgress: " + b
LOG.warn("BLOCK* checkFileProgress: " + b
+ " has not reached minimal replication "
+ blockManager.minReplication);
return false;

View File

@ -139,16 +139,10 @@ public class TestClientProtocolForPipelineRecovery {
Path file = new Path("dataprotocol1.dat");
Mockito.when(faultInjector.failPacket()).thenReturn(true);
try {
DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
} catch (IOException e) {
// completeFile() should fail.
Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
return;
}
DFSTestUtil.createFile(fileSys, file, 68000000L, (short)numDataNodes, 0L);
// At this point, NN let data corruption to happen.
// Before failing test, try reading the file. It should fail.
// At this point, NN should have accepted only valid replicas.
// Read should succeed.
FSDataInputStream in = fileSys.open(file);
try {
int c = in.read();
@ -158,8 +152,6 @@ public class TestClientProtocolForPipelineRecovery {
Assert.fail("Block is missing because the file was closed with"
+ " corrupt replicas.");
}
Assert.fail("The file was closed with corrupt replicas, but read still"
+ " works!");
} finally {
DFSClientFaultInjector.instance = oldInjector;
if (cluster != null) {

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -1119,8 +1120,9 @@ public class TestReplicationPolicy {
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(info,
TestReplicationPolicy.dataNodes[0], "STORAGE", ReplicaState.FINALIZED);
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
"STORAGE");
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.

View File

@ -137,7 +137,7 @@ Trunk (Unreleased)
MAPREDUCE-5191. TestQueue#testQueue fails with timeout on Windows. (Ivan
Mitic via hitesh)
Release 2.3.0 - UNRELEASED
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -226,7 +226,7 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5631. TestJobEndNotifier.testNotifyRetries fails with Should
have taken more than 5 seconds in jdk7 (Jonathan Eagles via jlowe)
Release 2.2.1 - UNRELEASED
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -1512,6 +1512,9 @@ Release 0.23.10 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-5640. Rename TestLineRecordReader in jobclient module (Jason Lowe
via jeagles)
OPTIMIZATIONS
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus

View File

@ -20,17 +20,21 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -55,11 +59,13 @@ import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
@Path("/ws/v1/history")
@ -78,11 +84,31 @@ public class HsWebServices {
this.webapp = webapp;
}
private boolean hasAccess(Job job, HttpServletRequest request) {
String remoteUser = request.getRemoteUser();
if (remoteUser != null) {
return job.checkAccess(UserGroupInformation.createRemoteUser(remoteUser),
JobACL.VIEW_JOB);
}
return true;
}
private void checkAccess(Job job, HttpServletRequest request) {
if (!hasAccess(job, request)) {
throw new WebApplicationException(Status.UNAUTHORIZED);
}
}
private void init() {
//clear content type
response.setContentType(null);
}
@VisibleForTesting
void setResponse(HttpServletResponse response) {
this.response = response;
}
@GET
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public HistoryInfo get() {
@ -190,10 +216,12 @@ public class HsWebServices {
@GET
@Path("/mapreduce/jobs/{jobid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobInfo getJob(@PathParam("jobid") String jid) {
public JobInfo getJob(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
return new JobInfo(job);
}
@ -217,20 +245,24 @@ public class HsWebServices {
@GET
@Path("/mapreduce/jobs/{jobid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) {
public JobCounterInfo getJobCounters(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
return new JobCounterInfo(this.ctx, job);
}
@GET
@Path("/mapreduce/jobs/{jobid}/conf")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ConfInfo getJobConf(@PathParam("jobid") String jid) {
public ConfInfo getJobConf(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
ConfInfo info;
try {
info = new ConfInfo(job);
@ -244,11 +276,12 @@ public class HsWebServices {
@GET
@Path("/mapreduce/jobs/{jobid}/tasks")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TasksInfo getJobTasks(@PathParam("jobid") String jid,
@QueryParam("type") String type) {
public TasksInfo getJobTasks(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @QueryParam("type") String type) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
TasksInfo allTasks = new TasksInfo();
for (Task task : job.getTasks().values()) {
TaskType ttype = null;
@ -270,11 +303,12 @@ public class HsWebServices {
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskInfo getJobTask(@PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
public TaskInfo getJobTask(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
return new TaskInfo(task);
@ -284,10 +318,12 @@ public class HsWebServices {
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobTaskCounterInfo getSingleTaskCounters(
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
@Context HttpServletRequest hsr, @PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
TaskId taskID = MRApps.toTaskID(tid);
if (taskID == null) {
throw new NotFoundException("taskid " + tid + " not found or invalid");
@ -302,12 +338,13 @@ public class HsWebServices {
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskAttemptsInfo getJobTaskAttempts(@PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
init();
TaskAttemptsInfo attempts = new TaskAttemptsInfo();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
@ -324,11 +361,13 @@ public class HsWebServices {
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskAttemptInfo getJobTaskAttemptId(@PathParam("jobid") String jid,
@PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid,
@PathParam("attemptid") String attId) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
@ -343,11 +382,12 @@ public class HsWebServices {
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
@PathParam("jobid") String jid, @PathParam("taskid") String tid,
@PathParam("attemptid") String attId) {
@Context HttpServletRequest hsr, @PathParam("jobid") String jid,
@PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);

View File

@ -0,0 +1,419 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryContext;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.junit.Before;
import org.junit.Test;
public class TestHsWebServicesAcls {
private static String FRIENDLY_USER = "friendly";
private static String ENEMY_USER = "enemy";
private JobConf conf;
private HistoryContext ctx;
private String jobIdStr;
private String taskIdStr;
private String taskAttemptIdStr;
private HsWebServices hsWebServices;
@Before
public void setup() throws IOException {
this.conf = new JobConf();
this.conf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
NullGroupsProvider.class.getName());
this.conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
Groups.getUserToGroupsMappingService(conf);
this.ctx = buildHistoryContext(this.conf);
WebApp webApp = mock(HsWebApp.class);
when(webApp.name()).thenReturn("hsmockwebapp");
this.hsWebServices= new HsWebServices(ctx, conf, webApp);
this.hsWebServices.setResponse(mock(HttpServletResponse.class));
Job job = ctx.getAllJobs().values().iterator().next();
this.jobIdStr = job.getID().toString();
Task task = job.getTasks().values().iterator().next();
this.taskIdStr = task.getID().toString();
this.taskAttemptIdStr =
task.getAttempts().keySet().iterator().next().toString();
}
@Test
public void testGetJobAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJob(hsr, jobIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJob(hsr, jobIdStr);
}
@Test
public void testGetJobCountersAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobCounters(hsr, jobIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobCounters(hsr, jobIdStr);
}
@Test
public void testGetJobConfAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobConf(hsr, jobIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobConf(hsr, jobIdStr);
}
@Test
public void testGetJobTasksAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobTasks(hsr, jobIdStr, "m");
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobTasks(hsr, jobIdStr, "m");
}
@Test
public void testGetJobTaskAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobTask(hsr, jobIdStr, this.taskIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobTask(hsr, this.jobIdStr, this.taskIdStr);
}
@Test
public void testGetSingleTaskCountersAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getSingleTaskCounters(hsr, this.jobIdStr, this.taskIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getSingleTaskCounters(hsr, this.jobIdStr, this.taskIdStr);
}
@Test
public void testGetJobTaskAttemptsAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobTaskAttempts(hsr, this.jobIdStr, this.taskIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobTaskAttempts(hsr, this.jobIdStr, this.taskIdStr);
}
@Test
public void testGetJobTaskAttemptIdAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobTaskAttemptId(hsr, this.jobIdStr, this.taskIdStr,
this.taskAttemptIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobTaskAttemptId(hsr, this.jobIdStr, this.taskIdStr,
this.taskAttemptIdStr);
}
@Test
public void testGetJobTaskAttemptIdCountersAcls() {
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(ENEMY_USER);
try {
hsWebServices.getJobTaskAttemptIdCounters(hsr, this.jobIdStr,
this.taskIdStr, this.taskAttemptIdStr);
fail("enemy can access job");
} catch (WebApplicationException e) {
assertEquals(Status.UNAUTHORIZED,
Status.fromStatusCode(e.getResponse().getStatus()));
}
when(hsr.getRemoteUser()).thenReturn(FRIENDLY_USER);
hsWebServices.getJobTaskAttemptIdCounters(hsr, this.jobIdStr,
this.taskIdStr, this.taskAttemptIdStr);
}
private static HistoryContext buildHistoryContext(final Configuration conf)
throws IOException {
HistoryContext ctx = new MockHistoryContext(1, 1, 1);
Map<JobId, Job> jobs = ctx.getAllJobs();
JobId jobId = jobs.keySet().iterator().next();
Job mockJob = new MockJobForAcls(jobs.get(jobId), conf);
jobs.put(jobId, mockJob);
return ctx;
}
private static class NullGroupsProvider
implements GroupMappingServiceProvider {
@Override
public List<String> getGroups(String user) throws IOException {
return Collections.emptyList();
}
@Override
public void cacheGroupsRefresh() throws IOException {
}
@Override
public void cacheGroupsAdd(List<String> groups) throws IOException {
}
}
private static class MockJobForAcls implements Job {
private Job mockJob;
private Configuration conf;
private Map<JobACL, AccessControlList> jobAcls;
private JobACLsManager aclsMgr;
public MockJobForAcls(Job mockJob, Configuration conf) {
this.mockJob = mockJob;
this.conf = conf;
AccessControlList viewAcl = new AccessControlList(FRIENDLY_USER);
this.jobAcls = new HashMap<JobACL, AccessControlList>();
this.jobAcls.put(JobACL.VIEW_JOB, viewAcl);
this.aclsMgr = new JobACLsManager(conf);
}
@Override
public JobId getID() {
return mockJob.getID();
}
@Override
public String getName() {
return mockJob.getName();
}
@Override
public JobState getState() {
return mockJob.getState();
}
@Override
public JobReport getReport() {
return mockJob.getReport();
}
@Override
public Counters getAllCounters() {
return mockJob.getAllCounters();
}
@Override
public Map<TaskId, Task> getTasks() {
return mockJob.getTasks();
}
@Override
public Map<TaskId, Task> getTasks(TaskType taskType) {
return mockJob.getTasks(taskType);
}
@Override
public Task getTask(TaskId taskID) {
return mockJob.getTask(taskID);
}
@Override
public List<String> getDiagnostics() {
return mockJob.getDiagnostics();
}
@Override
public int getTotalMaps() {
return mockJob.getTotalMaps();
}
@Override
public int getTotalReduces() {
return mockJob.getTotalReduces();
}
@Override
public int getCompletedMaps() {
return mockJob.getCompletedMaps();
}
@Override
public int getCompletedReduces() {
return mockJob.getCompletedReduces();
}
@Override
public float getProgress() {
return mockJob.getProgress();
}
@Override
public boolean isUber() {
return mockJob.isUber();
}
@Override
public String getUserName() {
return mockJob.getUserName();
}
@Override
public String getQueueName() {
return mockJob.getQueueName();
}
@Override
public Path getConfFile() {
return new Path("/some/path/to/conf");
}
@Override
public Configuration loadConfFile() throws IOException {
return conf;
}
@Override
public Map<JobACL, AccessControlList> getJobACLs() {
return jobAcls;
}
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
return mockJob.getTaskAttemptCompletionEvents(fromEventId, maxEvents);
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return mockJob.getMapAttemptCompletionEvents(startIndex, maxEvents);
}
@Override
public List<AMInfo> getAMInfos() {
return mockJob.getAMInfos();
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
return aclsMgr.checkAccess(callerUGI, jobOperation,
this.getUserName(), jobAcls.get(jobOperation));
}
}
}

View File

@ -17,22 +17,20 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.junit.Test;
public class TestLineRecordReader extends TestCase {
public class TestLineRecordReaderJobs {
private static Path workDir = new Path(new Path(System.getProperty(
"test.build.data", "."), "data"), "TestTextInputFormat");
@ -77,7 +75,7 @@ public class TestLineRecordReader extends TestCase {
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(conf);
job.setJarByClass(TestLineRecordReader.class);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
FileInputFormat.addInputPath(job, inputDir);
@ -106,7 +104,7 @@ public class TestLineRecordReader extends TestCase {
createInputFile(conf);
createAndRunJob(conf);
String expected = "0\tabc\ndef\n9\tghi\njkl\n";
this.assertEquals(expected, readOutputFile(conf));
assertEquals(expected, readOutputFile(conf));
}
/**
@ -128,7 +126,7 @@ public class TestLineRecordReader extends TestCase {
createInputFile(conf);
createAndRunJob(conf);
String expected = "0\tabc\n4\tdef\t\n9\tghi\n13\tjkl\n";
this.assertEquals(expected, readOutputFile(conf));
assertEquals(expected, readOutputFile(conf));
}
}

View File

@ -18,12 +18,12 @@
package org.apache.hadoop.mapreduce.lib.input;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
public class TestLineRecordReader extends TestCase {
public class TestLineRecordReaderJobs {
private static Path workDir = new Path(new Path(System.getProperty(
"test.build.data", "."), "data"), "TestTextInputFormat");
@ -79,7 +79,7 @@ public class TestLineRecordReader extends TestCase {
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(conf);
job.setJarByClass(TestLineRecordReader.class);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
FileInputFormat.addInputPath(job, inputDir);
@ -107,7 +107,7 @@ public class TestLineRecordReader extends TestCase {
createInputFile(conf);
createAndRunJob(conf);
String expected = "0\tabc\ndef\n9\tghi\njkl\n";
this.assertEquals(expected, readOutputFile(conf));
assertEquals(expected, readOutputFile(conf));
}
/**
@ -129,7 +129,7 @@ public class TestLineRecordReader extends TestCase {
createInputFile(conf);
createAndRunJob(conf);
String expected = "0\tabc\n4\tdef\t\n9\tghi\n13\tjkl\n";
this.assertEquals(expected, readOutputFile(conf));
assertEquals(expected, readOutputFile(conf));
}
}

View File

@ -15,7 +15,7 @@ Trunk - Unreleased
YARN-524 TestYarnVersionInfo failing if generated properties doesn't
include an SVN URL. (stevel)
Release 2.3.0 - UNRELEASED
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -129,6 +129,9 @@ Release 2.3.0 - UNRELEASED
YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
queues. (Sandy Ryza)
YARN-1318. Promoted AdminService to an Always-On service and merged it into
RMHAProtocolService. (Karthik Kambatla via vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -188,7 +191,7 @@ Release 2.3.0 - UNRELEASED
YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some
tests. (Jian He via vinodkv)
Release 2.2.1 - UNRELEASED
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -285,18 +285,6 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String RM_HA_ADMIN_ADDRESS =
RM_HA_PREFIX + "admin.address";
public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034;
public static String DEFAULT_RM_HA_ADMIN_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT;
public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT =
RM_HA_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
// end @Private
public static final List<String> RM_RPC_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(
RM_ADDRESS,
@ -304,9 +292,7 @@ public class YarnConfiguration extends Configuration {
RM_ADMIN_ADDRESS,
RM_RESOURCE_TRACKER_ADDRESS,
RM_WEBAPP_ADDRESS,
RM_WEBAPP_HTTPS_ADDRESS,
// TODO Remove after YARN-1318
RM_HA_ADMIN_ADDRESS));
RM_WEBAPP_HTTPS_ADDRESS));
////////////////////////////////
// RM state store configs
@ -786,11 +772,6 @@ public class YarnConfiguration extends Configuration {
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL =
CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL;
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Exception to be thrown when an Active-Only operation is attempted on a
* ResourceManager that is not Active.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RMNotYetActiveException extends YarnException {
private static final long serialVersionUID = 1L;
public RMNotYetActiveException() {
super("ResourceManager is not yet Active!");
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
@ -45,25 +46,25 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
@Public
@Stable
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable

View File

@ -32,9 +32,9 @@ public class RMHAServiceTarget extends HAServiceTarget {
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
@Override

View File

@ -21,18 +21,31 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -51,22 +64,20 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol {
public class AdminService extends AbstractService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol {
private static final Log LOG = LogFactory.getLog(AdminService.class);
private final Configuration conf;
private final ResourceScheduler scheduler;
private final RMContext rmContext;
private final NodesListManager nodesListManager;
private final ClientRMService clientRMService;
private final ApplicationMasterService applicationMasterService;
private final ResourceTrackerService resourceTrackerService;
private final ResourceManager rm;
@VisibleForTesting
protected HAServiceProtocol.HAServiceState
haState = HAServiceProtocol.HAServiceState.INITIALIZING;
boolean haEnabled;
private Server server;
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
@ -74,23 +85,21 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public AdminService(Configuration conf, ResourceScheduler scheduler,
RMContext rmContext, NodesListManager nodesListManager,
ClientRMService clientRMService,
ApplicationMasterService applicationMasterService,
ResourceTrackerService resourceTrackerService) {
public AdminService(ResourceManager rm, RMContext rmContext) {
super(AdminService.class.getName());
this.conf = conf;
this.scheduler = scheduler;
this.rm = rm;
this.rmContext = rmContext;
this.nodesListManager = nodesListManager;
this.clientRMService = clientRMService;
this.applicationMasterService = applicationMasterService;
this.resourceTrackerService = resourceTrackerService;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
public synchronized void serviceInit(Configuration conf) throws Exception {
haEnabled = HAUtil.isHAEnabled(conf);
if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(conf);
}
rm.createAndInitActiveServices();
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -102,50 +111,185 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
} else {
transitionToActive();
}
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
server.getListenerAddress());
startServer();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
protected synchronized void serviceStop() throws Exception {
stopServer();
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();
}
protected void startServer() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server = (Server) rpc.getServer(
ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
}
if (haEnabled) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService =
HAServiceProtocolProtos.HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
HAServiceProtocol.class, haPbService);
}
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
server.getListenerAddress());
}
protected void stopServer() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
private UserGroupInformation checkAcls(String method) throws YarnException {
try {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
return checkAccess(method);
} catch (IOException ioe) {
throw RPCUtil.getRemoteException(ioe);
}
}
private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == haState;
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
synchronized void transitionToActive() throws Exception {
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active");
rm.startActiveServices();
haState = HAServiceProtocol.HAServiceState.ACTIVE;
LOG.info("Transitioned to active");
}
@Override
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
}
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby");
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
rm.stopActiveServices();
if (initialize) {
rm.createAndInitActiveServices();
}
}
haState = HAServiceProtocol.HAServiceState.STANDBY;
LOG.info("Transitioned to standby");
}
@Override
public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
}
}
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshQueues");
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh queues.");
throw new RMNotYetActiveException();
}
try {
scheduler.reinitialize(conf, this.rmContext);
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues",
"AdminService");
return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
@ -162,8 +306,16 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshNodes");
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh nodes.");
throw new RMNotYetActiveException();
}
try {
this.nodesListManager.refreshNodes(new YarnConfiguration());
rmContext.getNodesListManager().refreshNodes(new YarnConfiguration());
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes",
"AdminService");
return recordFactory.newRecordInstance(RefreshNodesResponse.class);
@ -180,7 +332,16 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
// TODO (YARN-1459): Revisit handling super-user-groups on Standby RM
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(),
"refreshSuperUserGroupsConfiguration",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh super-user-groups.");
throw new RMNotYetActiveException();
}
ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration());
RMAuditLogger.logSuccess(user.getShortUserName(),
"refreshSuperUserGroupsConfiguration", "AdminService");
@ -193,7 +354,16 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request) throws YarnException {
UserGroupInformation user = checkAcls("refreshUserToGroupsMappings");
// TODO (YARN-1459): Revisit handling user-groups on Standby RM
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(),
"refreshUserToGroupsMapping",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh user-groups.");
throw new RMNotYetActiveException();
}
Groups.getUserToGroupsMappingService().refresh();
RMAuditLogger.logSuccess(user.getShortUserName(),
"refreshUserToGroupsMappings", "AdminService");
@ -233,9 +403,16 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
PolicyProvider policyProvider = new RMPolicyProvider();
refreshServiceAcls(conf, policyProvider);
clientRMService.refreshServiceAcls(conf, policyProvider);
applicationMasterService.refreshServiceAcls(conf, policyProvider);
resourceTrackerService.refreshServiceAcls(conf, policyProvider);
if (isRMActive()) {
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
} else {
LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
"Clients, ApplicationMasters and NodeManagers");
}
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
@ -249,5 +426,4 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -64,12 +65,22 @@ public interface RMContext {
NMTokenSecretManagerInRM getNMTokenSecretManager();
ResourceScheduler getScheduler();
NodesListManager getNodesListManager();
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
void setClientRMService(ClientRMService clientRMService);
AdminService getRMAdminService();
ClientRMService getClientRMService();
ApplicationMasterService getApplicationMasterService();
ResourceTrackerService getResourceTrackerService();
void setClientRMService(ClientRMService clientRMService);
RMDelegationTokenSecretManager getRMDelegationTokenSecretManager();
void setRMDelegationTokenSecretManager(

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -42,7 +43,7 @@ import com.google.common.annotations.VisibleForTesting;
public class RMContextImpl implements RMContext {
private final Dispatcher rmDispatcher;
private Dispatcher rmDispatcher;
private final ConcurrentMap<ApplicationId, RMApp> applications
= new ConcurrentHashMap<ApplicationId, RMApp>();
@ -57,34 +58,25 @@ public class RMContextImpl implements RMContext {
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer delegationTokenRenewer;
private final AMRMTokenSecretManager amRMTokenSecretManager;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private AdminService adminService;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
*/
public RMContextImpl() {
public RMContextImpl(Dispatcher rmDispatcher,
RMStateStore store,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager amRMTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.rmDispatcher = rmDispatcher;
this.stateStore = store;
this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
this.delegationTokenRenewer = delegationTokenRenewer;
this.amRMTokenSecretManager = amRMTokenSecretManager;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
@VisibleForTesting
@ -98,10 +90,17 @@ public class RMContextImpl implements RMContext {
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager);
this();
this.setDispatcher(rmDispatcher);
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
this.setAMFinishingMonitor(amFinishingMonitor);
this.setDelegationTokenRenewer(delegationTokenRenewer);
this.setAMRMTokenSecretManager(appTokenSecretManager);
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
@ -171,12 +170,27 @@ public class RMContextImpl implements RMContext {
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager;
}
@Override
public ResourceScheduler getScheduler() {
return this.scheduler;
}
@Override
public NodesListManager getNodesListManager() {
return this.nodesListManager;
}
@Override
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
@Override
public AdminService getRMAdminService() {
return this.adminService;
}
@VisibleForTesting
public void setStateStore(RMStateStore store) {
stateStore = store;
@ -186,7 +200,25 @@ public class RMContextImpl implements RMContext {
public ClientRMService getClientRMService() {
return this.clientRMService;
}
@Override
public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService;
}
@Override
public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService;
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
void setRMAdminService(AdminService adminService) {
this.adminService = adminService;
}
@Override
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
@ -202,4 +234,60 @@ public class RMContextImpl implements RMContext {
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer;
}
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor;
}
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor;
}
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
}
void setNMTokenSecretManager(
NMTokenSecretManagerInRM nmTokenSecretManager) {
this.nmTokenSecretManager = nmTokenSecretManager;
}
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
void setDelegationTokenRenewer(
DelegationTokenRenewer delegationTokenRenewer) {
this.delegationTokenRenewer = delegationTokenRenewer;
}
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
void setAMRMTokenSecretManager(
AMRMTokenSecretManager amRMTokenSecretManager) {
this.amRMTokenSecretManager = amRMTokenSecretManager;
}
void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager;
}
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService;
}
void setResourceTrackerService(
ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
}

View File

@ -1,264 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Internal class to handle HA related aspects of the {@link ResourceManager}.
*
* TODO (YARN-1318): Some/ all of this functionality should be merged with
* {@link AdminService}. Currently, marking this as Private and Unstable for
* those reasons.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMHAProtocolService extends AbstractService implements
HAServiceProtocol {
private static final Log LOG = LogFactory.getLog(RMHAProtocolService.class);
private Configuration conf;
private ResourceManager rm;
@VisibleForTesting
protected HAServiceState haState = HAServiceState.INITIALIZING;
private AccessControlList adminAcl;
private Server haAdminServer;
@InterfaceAudience.Private
boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) {
super("RMHAProtocolService");
this.rm = resourceManager;
}
@Override
protected synchronized void serviceInit(Configuration conf) throws
Exception {
this.conf = conf;
haEnabled = HAUtil.isHAEnabled(this.conf);
if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(this.conf);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
}
rm.createAndInitActiveServices();
super.serviceInit(this.conf);
}
@Override
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
startHAAdminServer();
} else {
transitionToActive();
}
super.serviceStart();
}
@Override
protected synchronized void serviceStop() throws Exception {
if (haEnabled) {
stopHAAdminServer();
}
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();
}
protected void startHAAdminServer() throws Exception {
InetSocketAddress haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService =
HAServiceProtocolProtos.HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
WritableRpcEngine.ensureInitialized();
String bindHost = haAdminServiceAddress.getHostName();
int serviceHandlerCount = conf.getInt(
YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT);
haAdminServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(haAdminServiceAddress.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.build();
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider());
}
haAdminServer.start();
conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS,
haAdminServer.getListenerAddress());
}
private void stopHAAdminServer() throws Exception {
if (haAdminServer != null) {
haAdminServer.stop();
haAdminServer.join();
haAdminServer = null;
}
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
@InterfaceAudience.Private
synchronized void transitionToActive() throws Exception {
if (haState == HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active");
rm.startActiveServices();
haState = HAServiceState.ACTIVE;
LOG.info("Transitioned to active");
}
@Override
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
}
@InterfaceAudience.Private
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby");
if (haState == HAServiceState.ACTIVE) {
rm.stopActiveServices();
if (initialize) {
rm.createAndInitActiveServices();
}
}
haState = HAServiceState.STANDBY;
LOG.info("Transitioned to standby");
}
@Override
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
}
}
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
}

View File

@ -118,7 +118,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
* the HA state of the RM.
*/
@VisibleForTesting
protected RMHAProtocolService haService;
protected RMContextImpl rmContext;
@VisibleForTesting
protected AdminService adminService;
/**
* "Active" services. Services that need to run only on the Active RM.
@ -129,8 +131,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* in Active state.
*/
protected RMActiveServices activeServices;
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager =
new ClientToAMTokenSecretManagerInRM();
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
protected RMContainerTokenSecretManager containerTokenSecretManager;
protected NMTokenSecretManagerInRM nmTokenSecretManager;
@ -143,7 +144,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ClientRMService clientRM;
protected ApplicationMasterService masterService;
private ApplicationMasterLauncher applicationMasterLauncher;
private AdminService adminService;
private ContainerAllocationExpirer containerAllocationExpirer;
protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager;
@ -154,7 +154,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMDelegationTokenSecretManager rmDTSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp;
protected RMContext rmContext;
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
@ -166,10 +165,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
super("ResourceManager");
}
public RMHAProtocolService getHAService() {
return this.haService;
}
public RMContext getRMContext() {
return this.rmContext;
}
@ -187,9 +182,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration conf) throws Exception {
validateConfigs(conf);
this.conf = conf;
this.rmContext = new RMContextImpl();
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
haService = createRMHAProtocolService();
addService(haService);
super.serviceInit(conf);
}
@ -201,11 +199,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@VisibleForTesting
protected void setRMStateStore(RMStateStore rmStore) {
rmStore.setRMDispatcher(rmDispatcher);
((RMContextImpl) rmContext).setStateStore(rmStore);
}
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this);
rmContext.setStateStore(rmStore);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
@ -224,7 +218,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMStateStoreOperationFailedEventDispatcher
createRMStateStoreOperationFailedEventDispatcher() {
return new RMStateStoreOperationFailedEventDispatcher(haService);
return new RMStateStoreOperationFailedEventDispatcher(
rmContext.getRMAdminService());
}
protected Dispatcher createDispatcher() {
@ -319,20 +314,31 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmDispatcher = createDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM();
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
containerTokenSecretManager = createContainerTokenSecretManager(conf);
rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
nmTokenSecretManager = createNMTokenSecretManager(conf);
rmContext.setNMTokenSecretManager(nmTokenSecretManager);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
@ -358,24 +364,23 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
rmContext.setStateStore(rmStore);
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
rmContext = new RMContextImpl(
rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMSecretManager);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
rmContext.setNodesListManager(nodesListManager);
// Initialize the scheduler
scheduler = createScheduler();
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
@ -397,6 +402,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
@ -412,6 +418,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
masterService = createApplicationMasterService();
addService(masterService) ;
rmContext.setApplicationMasterService(masterService);
applicationACLsManager = new ApplicationACLsManager(conf);
@ -422,12 +429,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
addService(clientRM);
adminService = createAdminService(clientRM, masterService, resourceTracker);
addService(adminService);
rmContext.setClientRMService(clientRM);
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
@ -649,11 +655,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
private final RMHAProtocolService haService;
private final AdminService adminService;
public RMStateStoreOperationFailedEventDispatcher(
RMHAProtocolService haService) {
this.haService = haService;
AdminService adminService) {
this.adminService = adminService;
}
@Override
@ -665,12 +671,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
synchronized(haService) {
if (haService.haEnabled) {
synchronized(adminService) {
if (adminService.haEnabled) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
haService.transitionToStandby(true);
adminService.transitionToStandby(true);
return;
} catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode.");
@ -853,6 +859,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
rmContext.getRMNodes().clear();
rmContext.getInactiveRMNodes().clear();
rmContext.getRMApps().clear();
}
}
@ -913,13 +922,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new ApplicationMasterService(this.rmContext, scheduler);
}
protected AdminService createAdminService(
ClientRMService clientRMService,
ApplicationMasterService applicationMasterService,
ResourceTrackerService resourceTrackerService) {
return new AdminService(this.conf, scheduler, rmContext,
this.nodesListManager, clientRMService, applicationMasterService,
resourceTrackerService);
protected AdminService createAdminService() {
return new AdminService(this, rmContext);
}
@Private

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMPolicyProvider extends PolicyProvider {
private static final Service[] resourceManagerServices =
new Service[] {
new Service(
@ -53,9 +53,6 @@ public class RMPolicyProvider extends PolicyProvider {
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
ContainerManagementProtocolPB.class),
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
HAServiceProtocol.class),
};
@Override

View File

@ -306,16 +306,6 @@ public class MockRM extends ResourceManager {
.handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
}
@Override
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this) {
@Override
protected void startHAAdminServer() {
// do nothing
}
};
}
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
@ -391,19 +381,15 @@ public class MockRM extends ResourceManager {
}
@Override
protected AdminService createAdminService(ClientRMService clientRMService,
ApplicationMasterService applicationMasterService,
ResourceTrackerService resourceTrackerService) {
return new AdminService(getConfig(), scheduler, getRMContext(),
this.nodesListManager, clientRMService, applicationMasterService,
resourceTrackerService) {
protected AdminService createAdminService() {
return new AdminService(this, getRMContext()) {
@Override
protected void serviceStart() {
protected void startServer() {
// override to not start rpc handler
}
@Override
protected void serviceStop() {
protected void stopServer() {
// don't do anything
}
};

View File

@ -62,7 +62,7 @@ public class TestRMHA {
private void checkMonitorHealth() throws IOException {
try {
rm.haService.monitorHealth();
rm.adminService.monitorHealth();
} catch (HealthCheckFailedException e) {
fail("The RM is in bad health: it is Active, but the active services " +
"are not running");
@ -71,20 +71,20 @@ public class TestRMHA {
private void checkStandbyRMFunctionality() throws IOException {
assertEquals(STATE_ERR, HAServiceState.STANDBY,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertFalse("Active RM services are started",
rm.areActiveServicesRunning());
assertTrue("RM is not ready to become active",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
}
private void checkActiveRMFunctionality() throws IOException {
assertEquals(STATE_ERR, HAServiceState.ACTIVE,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertTrue("Active RM services aren't started",
rm.areActiveServicesRunning());
assertTrue("RM is not ready to become active",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
try {
rm.getNewAppId();
@ -113,9 +113,9 @@ public class TestRMHA {
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertFalse("RM is ready to become active before being started",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
checkMonitorHealth();
rm.start();
@ -123,27 +123,27 @@ public class TestRMHA {
checkStandbyRMFunctionality();
// 1. Transition to Standby - must be a no-op
rm.haService.transitionToStandby(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 2. Transition to active
rm.haService.transitionToActive(requestInfo);
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
// 3. Transition to active - no-op
rm.haService.transitionToActive(requestInfo);
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
// 4. Transition to standby
rm.haService.transitionToStandby(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 5. Transition to active to check Active->Standby->Active works
rm.haService.transitionToActive(requestInfo);
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
@ -151,9 +151,9 @@ public class TestRMHA {
// become active
rm.stop();
assertEquals(STATE_ERR, HAServiceState.STOPPING,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertFalse("RM is ready to become active even after it is stopped",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
assertFalse("Active RM services are started",
rm.areActiveServicesRunning());
checkMonitorHealth();

View File

@ -129,7 +129,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
for (String rpcAddress : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
}
conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
"localhost:" + adminPort);
return conf;
}
@ -143,23 +144,23 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
ResourceManager rm1 = new ResourceManager();
rm1.init(conf1);
rm1.start();
rm1.getHAService().transitionToActive(req);
rm1.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm1.getHAService().getServiceStatus().getState());
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
ResourceManager rm2 = new ResourceManager();
rm2.init(conf2);
rm2.start();
rm2.getHAService().transitionToActive(req);
rm2.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getHAService().getServiceStatus().getState());
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
// Submitting an application to RM1 to trigger a state store operation.
// RM1 should realize that it got fenced and is not the Active RM anymore.
@ -181,16 +182,16 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
for (int i = 0; i < 30; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService()
.getServiceStatus().getState()) {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
rm1.getHAService().getServiceStatus().getState());
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getHAService().getServiceStatus().getState());
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
}
}

View File

@ -179,12 +179,13 @@ public class TestRMAppTransitions {
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
this.rmContext =
new RMContextImpl(rmDispatcher, store,
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM());
((RMContextImpl)rmContext).setStateStore(store);
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext));