YARN-9789. Disable Option for Write Ahead Logs of LogMutation. Contributed by Prabhu Joseph

This commit is contained in:
Szilard Nemeth 2019-12-05 20:43:26 +01:00
parent 4627dd6708
commit 5cc6f945da
4 changed files with 78 additions and 15 deletions

View File

@ -224,12 +224,14 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@Override @Override
public void logMutation(LogMutation logMutation) throws IOException { public void logMutation(LogMutation logMutation) throws IOException {
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY))); if (maxLogs > 0) {
logs.add(logMutation); LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
if (logs.size() > maxLogs) { logs.add(logMutation);
logs.removeFirst(); if (logs.size() > maxLogs) {
logs.removeFirst();
}
db.put(bytes(LOG_KEY), serLogMutations(logs));
} }
db.put(bytes(LOG_KEY), serLogMutations(logs));
pendingMutation = logMutation; pendingMutation = logMutation;
} }
@ -337,6 +339,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return deserLogMutations(db.get(bytes(LOG_KEY))); return deserLogMutations(db.get(bytes(LOG_KEY)));
} }
@VisibleForTesting
protected DB getDB() {
return db;
}
@Override @Override
public void storeVersion() throws Exception { public void storeVersion() throws Exception {
String key = VERSION_KEY; String key = VERSION_KEY;

View File

@ -162,17 +162,19 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
@Override @Override
public void logMutation(LogMutation logMutation) throws Exception { public void logMutation(LogMutation logMutation) throws Exception {
byte[] storedLogs = zkManager.getData(logsPath); if (maxLogs > 0) {
LinkedList<LogMutation> logs = new LinkedList<>(); byte[] storedLogs = zkManager.getData(logsPath);
if (storedLogs != null) { LinkedList<LogMutation> logs = new LinkedList<>();
logs = (LinkedList<LogMutation>) deserializeObject(storedLogs); if (storedLogs != null) {
logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
}
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.remove(logs.removeFirst());
}
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
fencingNodePath);
} }
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.remove(logs.removeFirst());
}
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
fencingNodePath);
pendingMutation = logMutation; pendingMutation = logMutation;
} }

View File

@ -31,13 +31,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
/** /**
@ -112,6 +116,33 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
confStore.close(); confStore.close();
} }
@Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
Map<String, String> update = new HashMap<>();
update.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
boolean logKeyPresent = false;
DB db = ((LeveldbConfigurationStore) confStore).getDB();
DBIterator itr = db.iterator();
itr.seekToFirst();
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (key.equals("log")) {
logKeyPresent = true;
break;
}
}
assertFalse("Audit Log is not disabled", logKeyPresent);
confStore.close();
}
@Test @Test
public void testMaxLogs() throws Exception { public void testMaxLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -221,6 +222,28 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
assertEquals("val3", logs.get(1).getUpdates().get("key3")); assertEquals("val3", logs.get(1).getUpdates().get("key3"));
} }
@Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
String znodeParentPath = conf.get(YarnConfiguration.
RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
String logsPath = ZKCuratorManager.getNodePath(znodeParentPath, "LOGS");
byte[] data = null;
((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1);
Map<String, String> update = new HashMap<>();
update.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath);
assertNull("Failed to Disable Audit Logs", data);
}
public Configuration createRMHAConf(String rmIds, String rmId, public Configuration createRMHAConf(String rmIds, String rmId,
int adminPort) { int adminPort) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();