YARN-3082. Non thread safe access to systemCredentials in NodeHeartbeatResponse processing. Contributed by Anubhav Dhoot.

(cherry picked from commit 3aab354e66)
This commit is contained in:
Tsuyoshi Ozawa 2015-01-23 16:04:18 +09:00
parent 13067cf4b1
commit ff627d94e7
3 changed files with 68 additions and 1 deletions

View File

@ -366,6 +366,9 @@ Release 2.7.0 - UNRELEASED
YARN-3078. LogCLIHelpers lacks of a blank space before string 'does not exist'.
(Sam Liu via ozawa)
YARN-3082. Non thread safe access to systemCredentials in NodeHeartbeatResponse
processing. (Anubhav Dhoot via ozawa)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -104,7 +104,8 @@ public class NodeHeartbeatResponsePBImpl extends
for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue())));
.setCredentialsForApp(ProtoUtils.convertToProtoFormat(
entry.getValue().duplicate())));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -37,7 +38,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -74,12 +78,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -1463,6 +1469,63 @@ public class TestNodeStatusUpdater {
nm.stop();
}
@Test
public void testConcurrentAccessToSystemCredentials(){
final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]);
ApplicationId applicationId = ApplicationId.newInstance(123456, 120);
testCredentials.put(applicationId, byteBuffer);
final List<Throwable> exceptions = Collections.synchronizedList(new
ArrayList<Throwable>());
final int NUM_THREADS = 10;
final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
final ExecutorService threadPool = Executors.newFixedThreadPool(
NUM_THREADS);
final AtomicBoolean stop = new AtomicBoolean(false);
try {
for (int i = 0; i < NUM_THREADS; i++) {
threadPool.submit(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 100 && !stop.get(); i++) {
NodeHeartbeatResponse nodeHeartBeatResponse =
newNodeHeartbeatResponse(0, NodeAction.NORMAL,
null, null, null, null, 0);
nodeHeartBeatResponse.setSystemCredentialsForApps(
testCredentials);
NodeHeartbeatResponseProto proto =
((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse)
.getProto();
Assert.assertNotNull(proto);
}
} catch (Throwable t) {
exceptions.add(t);
stop.set(true);
} finally {
allDone.countDown();
}
}
});
}
int testTimeout = 2;
Assert.assertTrue("Timeout waiting for more than " + testTimeout + " " +
"seconds",
allDone.await(testTimeout, TimeUnit.SECONDS));
} catch (InterruptedException ie) {
exceptions.add(ie);
} finally {
threadPool.shutdownNow();
}
Assert.assertTrue("Test failed with exception(s)" + exceptions,
exceptions.isEmpty());
}
// Add new containers info into NM context each time node heart beats.
private class MyNMContext extends NMContext {