YARN-10828. Backport YARN-9789 to branch-3.2. Contributed by Tarun Parimi

This commit is contained in:
Szilard Nemeth 2021-06-24 20:20:23 +02:00
parent 7d2186ffcf
commit c8b090d9a4
3 changed files with 52 additions and 14 deletions

View File

@ -193,12 +193,14 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@Override @Override
public void logMutation(LogMutation logMutation) throws IOException { public void logMutation(LogMutation logMutation) throws IOException {
if(maxLogs > 0) {
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY))); LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
logs.add(logMutation); logs.add(logMutation);
if (logs.size() > maxLogs) { if (logs.size() > maxLogs) {
logs.removeFirst(); logs.removeFirst();
} }
db.put(bytes(LOG_KEY), serLogMutations(logs)); db.put(bytes(LOG_KEY), serLogMutations(logs));
}
pendingMutation = logMutation; pendingMutation = logMutation;
} }
@ -293,6 +295,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return deserLogMutations(db.get(bytes(LOG_KEY))); return deserLogMutations(db.get(bytes(LOG_KEY)));
} }
@VisibleForTesting
protected DB getDB() {
return db;
}
@Override @Override
public void storeVersion() throws Exception { public void storeVersion() throws Exception {
try { try {

View File

@ -156,6 +156,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
@Override @Override
public void logMutation(LogMutation logMutation) throws Exception { public void logMutation(LogMutation logMutation) throws Exception {
if(maxLogs > 0) {
byte[] storedLogs = getZkData(logsPath); byte[] storedLogs = getZkData(logsPath);
LinkedList<LogMutation> logs = new LinkedList<>(); LinkedList<LogMutation> logs = new LinkedList<>();
if (storedLogs != null) { if (storedLogs != null) {
@ -166,6 +167,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
logs.remove(logs.removeFirst()); logs.remove(logs.removeFirst());
} }
safeSetZkData(logsPath, logs); safeSetZkData(logsPath, logs);
}
pendingMutation = logMutation; pendingMutation = logMutation;
} }

View File

@ -18,7 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -36,6 +41,7 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
/** /**
@ -119,6 +125,29 @@ public class TestLeveldbConfigurationStore extends
rm2.close(); rm2.close();
} }
@Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
prepareLogMutation("key1", "val1");
boolean logKeyPresent = false;
DB db = ((LeveldbConfigurationStore) confStore).getDB();
DBIterator itr = db.iterator();
itr.seekToFirst();
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (key.equals("log")) {
logKeyPresent = true;
break;
}
}
assertFalse("Audit Log is not disabled", logKeyPresent);
confStore.close();
}
@Override @Override
public YarnConfigurationStore createConfStore() { public YarnConfigurationStore createConfStore() {
return new LeveldbConfigurationStore(); return new LeveldbConfigurationStore();