diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bdc31db4d1b..7f0628da707 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -400,6 +400,9 @@ Release 2.7.0 - UNRELEASED YARN-3078. LogCLIHelpers lacks of a blank space before string 'does not exist'. (Sam Liu via ozawa) + YARN-3082. Non thread safe access to systemCredentials in NodeHeartbeatResponse + processing. (Anubhav Dhoot via ozawa) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 1e915143842..630a5bf5889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -104,7 +104,8 @@ public class NodeHeartbeatResponsePBImpl extends for (Map.Entry entry : systemCredentials.entrySet()) { builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue()))); + .setCredentialsForApp(ProtoUtils.convertToProtoFormat( + entry.getValue().duplicate()))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 46d7b101a63..71a420eba45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,7 +38,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -74,12 +78,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -1463,6 +1469,63 @@ public class TestNodeStatusUpdater { nm.stop(); } + @Test + public void testConcurrentAccessToSystemCredentials(){ + final Map testCredentials = new HashMap<>(); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]); + ApplicationId applicationId = ApplicationId.newInstance(123456, 120); + testCredentials.put(applicationId, byteBuffer); + + final List exceptions = Collections.synchronizedList(new + ArrayList()); + + final int NUM_THREADS = 10; + final CountDownLatch allDone = new CountDownLatch(NUM_THREADS); + final ExecutorService threadPool = Executors.newFixedThreadPool( + NUM_THREADS); + + final AtomicBoolean stop = new AtomicBoolean(false); + + try { + for (int i = 0; i < NUM_THREADS; i++) { + threadPool.submit(new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < 100 && !stop.get(); i++) { + NodeHeartbeatResponse nodeHeartBeatResponse = + newNodeHeartbeatResponse(0, NodeAction.NORMAL, + null, null, null, null, 0); + nodeHeartBeatResponse.setSystemCredentialsForApps( + testCredentials); + NodeHeartbeatResponseProto proto = + ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse) + .getProto(); + Assert.assertNotNull(proto); + } + } catch (Throwable t) { + exceptions.add(t); + stop.set(true); + } finally { + allDone.countDown(); + } + } + }); + } + + int testTimeout = 2; + Assert.assertTrue("Timeout waiting for more than " + testTimeout + " " + + "seconds", + allDone.await(testTimeout, TimeUnit.SECONDS)); + } catch (InterruptedException ie) { + exceptions.add(ie); + } finally { + threadPool.shutdownNow(); + } + Assert.assertTrue("Test failed with exception(s)" + exceptions, + exceptions.isEmpty()); + } + // Add new containers info into NM context each time node heart beats. private class MyNMContext extends NMContext {