Merging r1573814 through r1574170 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1574171 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-03-04 18:22:26 +00:00
commit f9ebdea446
24 changed files with 758 additions and 223 deletions

View File

@ -81,6 +81,10 @@ public final class DomainSocketWatcher implements Closeable {
*/
private static native void anchorNative();
public static String getLoadingFailureReason() {
return loadingFailureReason;
}
public interface Handler {
/**
* Handles an event on a socket. An event may be the socket becoming
@ -244,7 +248,9 @@ public final class DomainSocketWatcher implements Closeable {
lock.lock();
try {
if (closed) return;
LOG.info(this + ": closing");
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": closing");
}
closed = true;
} finally {
lock.unlock();
@ -390,8 +396,10 @@ public final class DomainSocketWatcher implements Closeable {
final Thread watcherThread = new Thread(new Runnable() {
@Override
public void run() {
LOG.info(this + ": starting with interruptCheckPeriodMs = " +
interruptCheckPeriodMs);
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": starting with interruptCheckPeriodMs = " +
interruptCheckPeriodMs);
}
final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
FdSet fdSet = new FdSet();
addNotificationSocket(entries, fdSet);
@ -431,7 +439,9 @@ public final class DomainSocketWatcher implements Closeable {
// toRemove are now empty and processedCond has been notified if it
// needed to be.
if (closed) {
LOG.info(toString() + " thread terminating.");
if (LOG.isDebugEnabled()) {
LOG.debug(toString() + " thread terminating.");
}
return;
}
// Check if someone sent our thread an InterruptedException while we

View File

@ -373,6 +373,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5950. The DFSClient and DataNode should use shared memory segments to
communicate short-circuit information. (cmccabe)
HDFS-6046. add dfs.client.mmap.enabled (cmccabe)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
@ -520,6 +522,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5866. '-maxSize' and '-step' option fail in OfflineImageViewer.
(Akira Ajisaka via wheat9)
HDFS-6040. fix DFSClient issue without libhadoop.so and some other
ShortCircuitShm cleanups (cmccabe)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
@ -666,6 +671,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6028. Print clearer error message when user attempts to delete required
mask entry from ACL. (cnauroth)
HDFS-6039. Uploading a File under a Dir with default acls throws "Duplicated
ACLFeature". (cnauroth)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -165,13 +165,13 @@
<Bug pattern="DM_STRING_CTOR" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Class name="org.apache.hadoop.hdfs.client.DfsClientShmManager$EndpointShmManager" />
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Class name="org.apache.hadoop.hdfs.client.DfsClientShmManager$EndpointShmManager" />
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
<!-- Manually verified to be okay, we want to throw away the top bit here -->

View File

@ -284,6 +284,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final long shortCircuitStreamsCacheExpiryMs;
final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
final boolean shortCircuitMmapEnabled;
final int shortCircuitMmapCacheSize;
final long shortCircuitMmapCacheExpiryMs;
final long shortCircuitMmapCacheRetryTimeout;
@ -403,6 +404,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
shortCircuitStreamsCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
shortCircuitMmapEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
shortCircuitMmapCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);

View File

@ -437,6 +437,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
public static final String DFS_CLIENT_MMAP_ENABLED= "dfs.client.mmap.enabled";
public static final boolean DFS_CLIENT_MMAP_ENABLED_DEFAULT = true;
public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 256;
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";

View File

@ -1571,7 +1571,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"at position " + pos);
}
}
ByteBuffer buffer = tryReadZeroCopy(maxLength, opts);
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}

View File

@ -30,7 +30,6 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.Shell;
@ -514,7 +513,9 @@ public class ShortCircuitShm {
* @return The base address of the slot.
*/
private final long calculateSlotAddress(int slotIdx) {
return this.baseAddress + (slotIdx * BYTES_PER_SLOT);
long offset = slotIdx;
offset *= BYTES_PER_SLOT;
return this.baseAddress + offset;
}
/**
@ -536,7 +537,6 @@ public class ShortCircuitShm {
slot.makeValid();
slots[idx] = slot;
if (LOG.isTraceEnabled()) {
//LOG.trace(this + ": allocAndRegisterSlot " + idx);
LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
StringUtils.getStackTrace(Thread.currentThread()));
}
@ -567,6 +567,14 @@ public class ShortCircuitShm {
*/
synchronized public final Slot registerSlot(int slotIdx,
ExtendedBlockId blockId) throws InvalidRequestException {
if (slotIdx < 0) {
throw new InvalidRequestException(this + ": invalid negative slot " +
"index " + slotIdx);
}
if (slotIdx >= slots.length) {
throw new InvalidRequestException(this + ": invalid slot " +
"index " + slotIdx);
}
if (allocatedSlots.get(slotIdx)) {
throw new InvalidRequestException(this + ": slot " + slotIdx +
" is already in use.");
@ -579,7 +587,6 @@ public class ShortCircuitShm {
slots[slotIdx] = slot;
allocatedSlots.set(slotIdx, true);
if (LOG.isTraceEnabled()) {
//LOG.trace(this + ": registerSlot " + slotIdx);
LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots +
StringUtils.getStackTrace(Thread.currentThread()));
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
@ -375,7 +376,8 @@ public class ShortCircuitCache implements Closeable {
this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
this.staleThresholdMs = staleThresholdMs;
DfsClientShmManager shmManager = null;
if (shmInterruptCheckMs > 0) {
if ((shmInterruptCheckMs > 0) &&
(DomainSocketWatcher.getLoadingFailureReason() == null)) {
try {
shmManager = new DfsClientShmManager(shmInterruptCheckMs);
} catch (IOException e) {

View File

@ -2246,6 +2246,7 @@ public class FSDirectory implements Closeable {
final Quota.Counts counts = child.computeQuotaUsage();
updateCount(iip, pos,
counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
boolean isRename = (child.getParent() != null);
final INodeDirectory parent = inodes[pos-1].asDirectory();
boolean added = false;
try {
@ -2260,7 +2261,9 @@ public class FSDirectory implements Closeable {
-counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
} else {
iip.setINode(pos - 1, child.getParent());
AclStorage.copyINodeDefaultAcl(child);
if (!isRename) {
AclStorage.copyINodeDefaultAcl(child);
}
addToInodeMap(child);
}
return added;

View File

@ -1532,26 +1532,34 @@
</description>
</property>
<property>
<name>dfs.client.mmap.enabled</name>
<value>true</value>
<description>
If this is set to false, the client won't attempt to perform memory-mapped reads.
</description>
</property>
<property>
<name>dfs.client.mmap.cache.size</name>
<value>1024</value>
<value>256</value>
<description>
When zero-copy reads are used, the DFSClient keeps a cache of recently used
memory mapped regions. This parameter controls the maximum number of
entries that we will keep in that cache.
If this is set to 0, we will not allow mmap.
The larger this number is, the more file descriptors we will potentially
use for memory-mapped files. mmaped files also use virtual address space.
You may need to increase your ulimit virtual address space limits before
increasing the client mmap cache size.
Note that you can still do zero-copy reads when this size is set to 0.
</description>
</property>
<property>
<name>dfs.client.mmap.cache.timeout.ms</name>
<value>900000</value>
<value>3600000</value>
<description>
The minimum length of time that we will keep an mmap entry in the cache
between uses. If an entry is in the cache longer than this, and nobody
@ -1570,7 +1578,7 @@
<property>
<name>dfs.client.short.circuit.replica.stale.threshold.ms</name>
<value>3000000</value>
<value>1800000</value>
<description>
The maximum amount of time that we will consider a short-circuit replica to
be valid, if there is no communication from the DataNode. After this time

View File

@ -21,6 +21,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
import java.io.File;
import java.io.FileInputStream;
@ -698,4 +700,63 @@ public class TestEnhancedByteBufferAccess {
}
}, 10, 60000);
}
@Test
public void testClientMmapDisable() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, false);
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
final String CONTEXT = "testClientMmapDisable";
FSDataInputStream fsIn = null;
DistributedFileSystem fs = null;
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
try {
// With DFS_CLIENT_MMAP_ENABLED set to false, we should not do memory
// mapped reads.
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
fsIn = fs.open(TEST_PATH);
try {
fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.fail("expected zero-copy read to fail when client mmaps " +
"were disabled.");
} catch (UnsupportedOperationException e) {
}
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
fsIn = null;
fs = null;
cluster = null;
try {
// Now try again with DFS_CLIENT_MMAP_CACHE_SIZE == 0. It should work.
conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, true);
conf.setInt(DFS_CLIENT_MMAP_CACHE_SIZE, 0);
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
fsIn = fs.open(TEST_PATH);
ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
fsIn.releaseBuffer(buf);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.ShortCircuitShm;
import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;
public class TestShortCircuitShm {
public static final Log LOG = LogFactory.getLog(TestShortCircuitShm.class);
private static final File TEST_BASE =
new File(System.getProperty("test.build.data", "/tmp"));
@Before
public void before() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
}
@Test(timeout=60000)
public void testStartupShutdown() throws Exception {
File path = new File(TEST_BASE, "testStartupShutdown");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
FileInputStream stream =
factory.createDescriptor("testStartupShutdown", 4096);
ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
shm.free();
stream.close();
FileUtil.fullyDelete(path);
}
@Test(timeout=60000)
public void testAllocateSlots() throws Exception {
File path = new File(TEST_BASE, "testAllocateSlots");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
FileInputStream stream =
factory.createDescriptor("testAllocateSlots", 4096);
ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
int numSlots = 0;
ArrayList<Slot> slots = new ArrayList<Slot>();
while (!shm.isFull()) {
Slot slot = shm.allocAndRegisterSlot(new ExtendedBlockId(123L, "test_bp1"));
slots.add(slot);
numSlots++;
}
LOG.info("allocated " + numSlots + " slots before running out.");
int slotIdx = 0;
for (Iterator<Slot> iter = shm.slotIterator();
iter.hasNext(); ) {
Assert.assertTrue(slots.contains(iter.next()));
}
for (Slot slot : slots) {
Assert.assertFalse(slot.addAnchor());
Assert.assertEquals(slotIdx++, slot.getSlotIdx());
}
for (Slot slot : slots) {
slot.makeAnchorable();
}
for (Slot slot : slots) {
Assert.assertTrue(slot.addAnchor());
}
for (Slot slot : slots) {
slot.removeAnchor();
}
for (Slot slot : slots) {
shm.unregisterSlot(slot.getSlotIdx());
slot.makeInvalid();
}
shm.free();
stream.close();
FileUtil.fullyDelete(path);
}
}

View File

@ -1063,6 +1063,45 @@ public abstract class FSAclBaseTest {
assertAclFeature(dirPath, true);
}
@Test
public void testDefaultAclRenamedFile() throws Exception {
Path dirPath = new Path(path, "dir");
FileSystem.mkdirs(fs, dirPath, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(dirPath, aclSpec);
Path filePath = new Path(path, "file1");
fs.create(filePath).close();
fs.setPermission(filePath, FsPermission.createImmutable((short)0640));
Path renamedFilePath = new Path(dirPath, "file1");
fs.rename(filePath, renamedFilePath);
AclEntry[] expected = new AclEntry[] { };
AclStatus s = fs.getAclStatus(renamedFilePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(renamedFilePath, (short)0640);
assertAclFeature(renamedFilePath, false);
}
@Test
public void testDefaultAclRenamedDir() throws Exception {
Path dirPath = new Path(path, "dir");
FileSystem.mkdirs(fs, dirPath, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(dirPath, aclSpec);
Path subdirPath = new Path(path, "subdir");
FileSystem.mkdirs(fs, subdirPath, FsPermission.createImmutable((short)0750));
Path renamedSubdirPath = new Path(dirPath, "subdir");
fs.rename(subdirPath, renamedSubdirPath);
AclEntry[] expected = new AclEntry[] { };
AclStatus s = fs.getAclStatus(renamedSubdirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(renamedSubdirPath, (short)0750);
assertAclFeature(renamedSubdirPath, false);
}
@Test
public void testSkipAclEnforcementPermsDisabled() throws Exception {
Path bruceDir = new Path(path, "bruce");

View File

@ -972,5 +972,55 @@
</comparator>
</comparators>
</test>
<test>
<description>copyFromLocal: copying file into a directory with a default ACL</description>
<test-commands>
<command>-fs NAMENODE -mkdir /dir1</command>
<command>-fs NAMENODE -setfacl -m default:user:charlie:rwx /dir1</command>
<command>-fs NAMENODE -copyFromLocal CLITEST_DATA/data15bytes /dir1/data15bytes</command>
<command>-fs NAMENODE -getfacl /dir1/data15bytes</command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rm -R /dir1</command>
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^# file: /dir1/data15bytes$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^# owner: USERNAME$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^# group: supergroup$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^user::rw-$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^user:charlie:rwx\t#effective:r--$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^group::r-x\t#effective:r--$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^mask::r--$</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^other::r--$</expected-output>
</comparator>
<comparator>
<type>RegexpAcrossOutputComparator</type>
<expected-output>.*(?!default).*</expected-output>
</comparator>
</comparators>
</test>
</tests>
</configuration>

View File

@ -249,6 +249,9 @@ Release 2.4.0 - UNRELEASED
YARN-1765. Added test cases to verify that killApplication API works across
ResourceManager failover. (Xuan Gong via vinodkv)
YARN-1730. Implemented simple write-locking in the LevelDB based timeline-
store. (Billie Rinaldi via vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -381,6 +384,11 @@ Release 2.4.0 - UNRELEASED
YARN-1748. Excluded core-site.xml from hadoop-yarn-server-tests package's jar
and thus avoid breaking downstream tests. (Sravya Tirukkovalur via vinodkv)
YARN-1729. Made TimelineWebServices deserialize the string primary- and
secondary-filters param into the JSON-compatible object. (Billie Rinaldi via
zjshen)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1073,9 +1073,22 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
/** Timeline service leveldb path */
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.path";
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
/** Timeline service leveldb start time read cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
/** Timeline service leveldb start time write cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
////////////////////////////////
// Other Configs

View File

@ -17,27 +17,18 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
/**
* A utility class providing methods for serializing and deserializing
* objects. The {@link #write(Object)}, {@link #read(byte[])} and {@link
* #write(java.io.DataOutputStream, Object)}, {@link
* #read(java.io.DataInputStream)} methods are used by the
* {@link LeveldbTimelineStore} to store and retrieve arbitrary
* objects. The {@link #write(Object)} and {@link #read(byte[])} methods are
* used by the {@link LeveldbTimelineStore} to store and retrieve arbitrary
* JSON, while the {@link #writeReverseOrderedLong} and {@link
* #readReverseOrderedLong} methods are used to sort entities in descending
* start time order.
@ -47,79 +38,31 @@ import org.codehaus.jackson.map.ObjectMapper;
public class GenericObjectMapper {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final byte LONG = 0x1;
private static final byte INTEGER = 0x2;
private static final byte DOUBLE = 0x3;
private static final byte STRING = 0x4;
private static final byte BOOLEAN = 0x5;
private static final byte LIST = 0x6;
private static final byte MAP = 0x7;
public static final ObjectReader OBJECT_READER;
public static final ObjectWriter OBJECT_WRITER;
static {
ObjectMapper mapper = new ObjectMapper();
OBJECT_READER = mapper.reader(Object.class);
OBJECT_WRITER = mapper.writer();
}
/**
* Serializes an Object into a byte array. Along with {@link #read(byte[]) },
* Serializes an Object into a byte array. Along with {@link #read(byte[])},
* can be used to serialize an Object and deserialize it into an Object of
* the same type without needing to specify the Object's type,
* as long as it is one of the JSON-compatible objects Long, Integer,
* Double, String, Boolean, List, or Map. The current implementation uses
* ObjectMapper to serialize complex objects (List and Map) while using
* Writable to serialize simpler objects, to produce fewer bytes.
* as long as it is one of the JSON-compatible objects understood by
* ObjectMapper.
*
* @param o An Object
* @return A byte array representation of the Object
* @throws IOException
*/
public static byte[] write(Object o) throws IOException {
if (o == null)
if (o == null) {
return EMPTY_BYTES;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
write(new DataOutputStream(baos), o);
return baos.toByteArray();
}
/**
* Serializes an Object and writes it to a DataOutputStream. Along with
* {@link #read(java.io.DataInputStream)}, can be used to serialize an Object
* and deserialize it into an Object of the same type without needing to
* specify the Object's type, as long as it is one of the JSON-compatible
* objects Long, Integer, Double, String, Boolean, List, or Map. The current
* implementation uses ObjectMapper to serialize complex objects (List and
* Map) while using Writable to serialize simpler objects, to produce fewer
* bytes.
*
* @param dos A DataOutputStream
* @param o An Object
* @throws IOException
*/
public static void write(DataOutputStream dos, Object o)
throws IOException {
if (o == null)
return;
if (o instanceof Long) {
dos.write(LONG);
WritableUtils.writeVLong(dos, (Long) o);
} else if(o instanceof Integer) {
dos.write(INTEGER);
WritableUtils.writeVInt(dos, (Integer) o);
} else if(o instanceof Double) {
dos.write(DOUBLE);
dos.writeDouble((Double) o);
} else if (o instanceof String) {
dos.write(STRING);
WritableUtils.writeString(dos, (String) o);
} else if (o instanceof Boolean) {
dos.write(BOOLEAN);
dos.writeBoolean((Boolean) o);
} else if (o instanceof List) {
dos.write(LIST);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(dos, o);
} else if (o instanceof Map) {
dos.write(MAP);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(dos, o);
} else {
throw new IOException("Couldn't serialize object");
}
return OBJECT_WRITER.writeValueAsBytes(o);
}
/**
@ -147,42 +90,7 @@ public class GenericObjectMapper {
if (b == null || b.length == 0) {
return null;
}
ByteArrayInputStream bais = new ByteArrayInputStream(b, offset,
b.length - offset);
return read(new DataInputStream(bais));
}
/**
* Reads an Object from a DataInputStream whose data has been written with
* {@link #write(java.io.DataOutputStream, Object)}.
*
* @param dis A DataInputStream
* @return An Object, null if an unrecognized type
* @throws IOException
*/
public static Object read(DataInputStream dis) throws IOException {
byte code = (byte)dis.read();
ObjectMapper mapper;
switch (code) {
case LONG:
return WritableUtils.readVLong(dis);
case INTEGER:
return WritableUtils.readVInt(dis);
case DOUBLE:
return dis.readDouble();
case STRING:
return WritableUtils.readString(dis);
case BOOLEAN:
return dis.readBoolean();
case LIST:
mapper = new ObjectMapper();
return mapper.readValue(dis, ArrayList.class);
case MAP:
mapper = new ObjectMapper();
return mapper.readValue(dis, HashMap.class);
default:
return null;
}
return OBJECT_READER.readValue(b, offset, b.length - offset);
}
/**
@ -195,8 +103,9 @@ public class GenericObjectMapper {
public static byte[] writeReverseOrderedLong(long l) {
byte[] b = new byte[8];
b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff));
for (int i = 1; i < 7; i++)
for (int i = 1; i < 7; i++) {
b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
}
b[7] = (byte)(0xff ^ (l & 0xff));
return b;
}

View File

@ -33,6 +33,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.map.LRUMap;
@ -84,11 +85,17 @@ public class LeveldbTimelineStore extends AbstractService
private static final byte[] EMPTY_BYTES = new byte[0];
private static final int START_TIME_CACHE_SIZE = 10000;
private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000;
private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000;
@SuppressWarnings("unchecked")
private final Map<EntityIdentifier, Long> startTimeCache =
Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
private Map<EntityIdentifier, Long> startTimeWriteCache;
private Map<EntityIdentifier, Long> startTimeReadCache;
/**
* Per-entity locks are obtained when writing.
*/
private final LockMap<EntityIdentifier> writeLocks =
new LockMap<EntityIdentifier>();
private DB db;
@ -97,6 +104,7 @@ public class LeveldbTimelineStore extends AbstractService
}
@Override
@SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception {
Options options = new Options();
options.createIfMissing(true);
@ -109,6 +117,12 @@ public class LeveldbTimelineStore extends AbstractService
"timeline store " + path);
LOG.info("Using leveldb path " + path);
db = factory.open(new File(path, FILENAME), options);
startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
conf)));
startTimeReadCache =
Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
conf)));
super.serviceInit(conf);
}
@ -118,6 +132,45 @@ public class LeveldbTimelineStore extends AbstractService
super.serviceStop();
}
private static class LockMap<K> {
private static class CountingReentrantLock<K> extends ReentrantLock {
private int count;
private K key;
CountingReentrantLock(K key) {
super();
this.count = 0;
this.key = key;
}
}
private Map<K, CountingReentrantLock<K>> locks =
new HashMap<K, CountingReentrantLock<K>>();
synchronized CountingReentrantLock<K> getLock(K key) {
CountingReentrantLock<K> lock = locks.get(key);
if (lock == null) {
lock = new CountingReentrantLock<K>(key);
locks.put(key, lock);
}
lock.count++;
return lock;
}
synchronized void returnLock(CountingReentrantLock<K> lock) {
if (lock.count == 0) {
throw new IllegalStateException("Returned lock more times than it " +
"was retrieved");
}
lock.count--;
if (lock.count == 0) {
locks.remove(lock.key);
}
}
}
private static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
@ -214,7 +267,7 @@ public class LeveldbTimelineStore extends AbstractService
EnumSet<Field> fields) throws IOException {
DBIterator iterator = null;
try {
byte[] revStartTime = getStartTime(entityId, entityType, null, null, null);
byte[] revStartTime = getStartTime(entityId, entityType);
if (revStartTime == null)
return null;
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
@ -338,7 +391,7 @@ public class LeveldbTimelineStore extends AbstractService
// look up start times for the specified entities
// skip entities with no start time
for (String entity : entityIds) {
byte[] startTime = getStartTime(entity, entityType, null, null, null);
byte[] startTime = getStartTime(entity, entityType);
if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) {
@ -529,12 +582,16 @@ public class LeveldbTimelineStore extends AbstractService
* response.
*/
private void put(TimelineEntity entity, TimelinePutResponse response) {
LockMap.CountingReentrantLock<EntityIdentifier> lock =
writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
entity.getEntityType()));
lock.lock();
WriteBatch writeBatch = null;
try {
writeBatch = db.createWriteBatch();
List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity
byte[] revStartTime = getStartTime(entity.getEntityId(),
byte[] revStartTime = getAndSetStartTime(entity.getEntityId(),
entity.getEntityType(), entity.getStartTime(), events,
writeBatch);
if (revStartTime == null) {
@ -571,7 +628,7 @@ public class LeveldbTimelineStore extends AbstractService
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// look up start time of related entity
byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId,
relatedEntityType, null, null, writeBatch);
if (relatedEntityStartTime == null) {
// if start time is not found, set start time of the related
@ -580,7 +637,7 @@ public class LeveldbTimelineStore extends AbstractService
relatedEntityStartTime = revStartTime;
writeBatch.put(createStartTimeLookupKey(relatedEntityId,
relatedEntityType), relatedEntityStartTime);
startTimeCache.put(new EntityIdentifier(relatedEntityId,
startTimeWriteCache.put(new EntityIdentifier(relatedEntityId,
relatedEntityType), revStartTimeLong);
}
// write reverse entry (related entity -> entity)
@ -629,6 +686,8 @@ public class LeveldbTimelineStore extends AbstractService
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
IOUtils.cleanup(LOG, writeBatch);
}
}
@ -666,6 +725,39 @@ public class LeveldbTimelineStore extends AbstractService
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @return A byte array
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
// start time is not provided, so try to look it up
if (startTimeReadCache.containsKey(entity)) {
// found the start time in the cache
return writeReverseOrderedLong(startTimeReadCache.get(entity));
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
return null;
} else {
// found the start time in the db
startTimeReadCache.put(entity, readReverseOrderedLong(v, 0));
return v;
}
}
}
/**
* Get the unique start time for a given entity as a byte array that sorts
* the timestamps in reverse order (see {@link
* GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
* doesn't exist, set it based on the information provided.
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @param startTime The start time of the entity, or null
* @param events A list of events for the entity, or null
* @param writeBatch A leveldb write batch, if the method is called by a
@ -673,62 +765,76 @@ public class LeveldbTimelineStore extends AbstractService
* @return A byte array
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType,
private byte[] getAndSetStartTime(String entityId, String entityType,
Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) {
// start time is not provided, so try to look it up
if (startTimeCache.containsKey(entity)) {
if (startTimeWriteCache.containsKey(entity)) {
// found the start time in the cache
startTime = startTimeCache.get(entity);
startTime = startTimeWriteCache.get(entity);
return writeReverseOrderedLong(startTime);
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
// if this is a put, try to set it from the provided events
if (events == null || writeBatch == null) {
// no events, or not a put, so return null
return null;
}
if (events != null) {
// prepare a start time from events in case it is needed
Long min = Long.MAX_VALUE;
for (TimelineEvent e : events)
if (min > e.getTimestamp())
for (TimelineEvent e : events) {
if (min > e.getTimestamp()) {
min = e.getTimestamp();
startTime = min;
// selected start time as minimum timestamp of provided events
// write start time to db and cache
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
} else {
// found the start time in the db
startTime = readReverseOrderedLong(v, 0);
if (writeBatch != null) {
// if this is a put, re-add the start time to the cache
startTimeCache.put(entity, startTime);
}
}
startTime = min;
}
return checkStartTimeInDb(entity, startTime, writeBatch);
}
} else {
// start time is provided
// TODO: verify start time in db as well as cache?
if (startTimeCache.containsKey(entity)) {
// if the start time is already in the cache,
// and it is different from the provided start time,
// use the one from the cache
if (!startTime.equals(startTimeCache.get(entity)))
startTime = startTimeCache.get(entity);
} else if (writeBatch != null) {
// if this is a put, write the provided start time to the db and the
// cache
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
if (startTimeWriteCache.containsKey(entity)) {
// check the provided start time matches the cache
if (!startTime.equals(startTimeWriteCache.get(entity))) {
// the start time is already in the cache,
// and it is different from the provided start time,
// so use the one from the cache
startTime = startTimeWriteCache.get(entity);
}
return writeReverseOrderedLong(startTime);
} else {
// check the provided start time matches the db
return checkStartTimeInDb(entity, startTime, writeBatch);
}
}
return writeReverseOrderedLong(startTime);
}
/**
* Checks db for start time and returns it if it exists. If it doesn't
* exist, writes the suggested start time (if it is not null). This is
* only called when the start time is not found in the cache,
* so it adds it back into the cache if it is found.
*/
private byte[] checkStartTimeInDb(EntityIdentifier entity,
Long suggestedStartTime, WriteBatch writeBatch) throws IOException {
// create lookup key for start time
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
// retrieve value for key
byte[] v = db.get(b);
byte[] revStartTime;
if (v == null) {
// start time doesn't exist in db
if (suggestedStartTime == null) {
return null;
}
// write suggested start time
revStartTime = writeReverseOrderedLong(suggestedStartTime);
writeBatch.put(b, revStartTime);
} else {
// found start time in db, so ignore suggested start time
suggestedStartTime = readReverseOrderedLong(v, 0);
revStartTime = v;
}
startTimeWriteCache.put(entity, suggestedStartTime);
startTimeReadCache.put(entity, suggestedStartTime);
return revStartTime;
}
/**
@ -868,6 +974,21 @@ public class LeveldbTimelineStore extends AbstractService
*/
@VisibleForTesting
void clearStartTimeCache() {
startTimeCache.clear();
startTimeWriteCache.clear();
startTimeReadCache.clear();
}
@VisibleForTesting
static int getStartTimeReadCacheSize(Configuration conf) {
return conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
DEFAULT_START_TIME_READ_CACHE_SIZE);
}
@VisibleForTesting
static int getStartTimeWriteCacheSize(Configuration conf) {
return conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
DEFAULT_START_TIME_WRITE_CACHE_SIZE);
}
}

View File

@ -26,6 +26,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
@ -94,12 +95,13 @@ public class MemoryTimelineStore
!matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // OR logic
boolean flag = false;
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null &&
matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = true;
if (secondaryFilter != null && !matchPrimaryFilter(
entity.getPrimaryFilters(), secondaryFilter) &&
!matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
}
@ -220,16 +222,22 @@ public class MemoryTimelineStore
}
if (entity.getPrimaryFilters() != null) {
if (existingEntity.getPrimaryFilters() == null) {
existingEntity.setPrimaryFilters(entity.getPrimaryFilters());
} else {
existingEntity.addPrimaryFilters(entity.getPrimaryFilters());
existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
}
for (Entry<String, Set<Object>> pf :
entity.getPrimaryFilters().entrySet()) {
for (Object pfo : pf.getValue()) {
existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
}
}
}
if (entity.getOtherInfo() != null) {
if (existingEntity.getOtherInfo() == null) {
existingEntity.setOtherInfo(entity.getOtherInfo());
} else {
existingEntity.addOtherInfo(entity.getOtherInfo());
existingEntity.setOtherInfo(new HashMap<String, Object>());
}
for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
existingEntity.addOtherInfo(info.getKey(),
maybeConvert(info.getValue()));
}
}
// relate it to other entities
@ -303,4 +311,14 @@ public class MemoryTimelineStore
}
}
private static Object maybeConvert(Object o) {
if (o instanceof Long) {
Long l = (Long)o;
if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
return l.intValue();
}
}
return o;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
@ -273,7 +274,13 @@ public class TimelineWebServices {
return null;
}
String[] strs = str.split(delimiter, 2);
return new NameValuePair(strs[0].trim(), strs[1].trim());
try {
return new NameValuePair(strs[0].trim(),
GenericObjectMapper.OBJECT_READER.readValue(strs[1].trim()));
} catch (Exception e) {
// didn't work as an Object, keep it as a String
return new NameValuePair(strs[0].trim(), strs[1].trim());
}
}
private static Collection<NameValuePair> parsePairsStr(
@ -297,24 +304,29 @@ public class TimelineWebServices {
List<Field> fieldList = new ArrayList<Field>();
for (String s : strs) {
s = s.trim().toUpperCase();
if (s.equals("EVENTS"))
if (s.equals("EVENTS")) {
fieldList.add(Field.EVENTS);
else if (s.equals("LASTEVENTONLY"))
} else if (s.equals("LASTEVENTONLY")) {
fieldList.add(Field.LAST_EVENT_ONLY);
else if (s.equals("RELATEDENTITIES"))
} else if (s.equals("RELATEDENTITIES")) {
fieldList.add(Field.RELATED_ENTITIES);
else if (s.equals("PRIMARYFILTERS"))
} else if (s.equals("PRIMARYFILTERS")) {
fieldList.add(Field.PRIMARY_FILTERS);
else if (s.equals("OTHERINFO"))
} else if (s.equals("OTHERINFO")) {
fieldList.add(Field.OTHER_INFO);
} else {
throw new IllegalArgumentException("Requested nonexistent field " + s);
}
}
if (fieldList.size() == 0)
if (fieldList.size() == 0) {
return null;
}
Field f1 = fieldList.remove(fieldList.size() - 1);
if (fieldList.size() == 0)
if (fieldList.size() == 0) {
return EnumSet.of(f1);
else
} else {
return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
}
}
private static Long parseLongStr(String str) {

View File

@ -72,7 +72,19 @@ public class TestGenericObjectMapper {
@Test
public void testValueTypes() throws IOException {
verify(42l);
verify(Integer.MAX_VALUE);
verify(Integer.MIN_VALUE);
assertEquals(Integer.MAX_VALUE, GenericObjectMapper.read(
GenericObjectMapper.write((long) Integer.MAX_VALUE)));
assertEquals(Integer.MIN_VALUE, GenericObjectMapper.read(
GenericObjectMapper.write((long) Integer.MIN_VALUE)));
verify((long)Integer.MAX_VALUE + 1l);
verify((long)Integer.MIN_VALUE - 1l);
verify(Long.MAX_VALUE);
verify(Long.MIN_VALUE);
assertEquals(42, GenericObjectMapper.read(GenericObjectMapper.write(42l)));
verify(42);
verify(1.23);
verify("abc");

View File

@ -30,6 +30,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestLeveldbTimelineStore
@ -64,6 +66,7 @@ public class TestLeveldbTimelineStore
super.testGetSingleEntity();
((LeveldbTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
loadTestData();
}
@Test
@ -86,4 +89,20 @@ public class TestLeveldbTimelineStore
super.testGetEvents();
}
@Test
public void testCacheSizes() {
Configuration conf = new Configuration();
assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
10001);
assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
conf = new Configuration();
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
10002);
assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
}
}

View File

@ -66,6 +66,9 @@ public class TimelineStoreTestUtils {
protected Map<String, Object> otherInfo;
protected Map<String, Set<String>> relEntityMap;
protected NameValuePair userFilter;
protected NameValuePair numericFilter1;
protected NameValuePair numericFilter2;
protected NameValuePair numericFilter3;
protected Collection<NameValuePair> goodTestingFilters;
protected Collection<NameValuePair> badTestingFilters;
protected TimelineEvent ev1;
@ -86,9 +89,15 @@ public class TimelineStoreTestUtils {
Set<Object> l1 = new HashSet<Object>();
l1.add("username");
Set<Object> l2 = new HashSet<Object>();
l2.add(12345l);
l2.add((long)Integer.MAX_VALUE);
Set<Object> l3 = new HashSet<Object>();
l3.add("123abc");
Set<Object> l4 = new HashSet<Object>();
l4.add((long)Integer.MAX_VALUE + 1l);
primaryFilters.put("user", l1);
primaryFilters.put("appname", l2);
primaryFilters.put("other", l3);
primaryFilters.put("long", l4);
Map<String, Object> secondaryFilters = new HashMap<String, Object>();
secondaryFilters.put("startTime", 123456l);
secondaryFilters.put("status", "RUNNING");
@ -158,24 +167,32 @@ public class TimelineStoreTestUtils {
* Load verification data
*/
protected void loadVerificationData() throws Exception {
userFilter = new NameValuePair("user",
"username");
userFilter = new NameValuePair("user", "username");
numericFilter1 = new NameValuePair("appname", Integer.MAX_VALUE);
numericFilter2 = new NameValuePair("long", (long)Integer.MAX_VALUE + 1l);
numericFilter3 = new NameValuePair("other", "123abc");
goodTestingFilters = new ArrayList<NameValuePair>();
goodTestingFilters.add(new NameValuePair("appname", 12345l));
goodTestingFilters.add(new NameValuePair("appname", Integer.MAX_VALUE));
goodTestingFilters.add(new NameValuePair("status", "RUNNING"));
badTestingFilters = new ArrayList<NameValuePair>();
badTestingFilters.add(new NameValuePair("appname", 12345l));
badTestingFilters.add(new NameValuePair("appname", Integer.MAX_VALUE));
badTestingFilters.add(new NameValuePair("status", "FINISHED"));
primaryFilters = new HashMap<String, Set<Object>>();
Set<Object> l1 = new HashSet<Object>();
l1.add("username");
Set<Object> l2 = new HashSet<Object>();
l2.add(12345l);
l2.add(Integer.MAX_VALUE);
Set<Object> l3 = new HashSet<Object>();
l3.add("123abc");
Set<Object> l4 = new HashSet<Object>();
l4.add((long)Integer.MAX_VALUE + 1l);
primaryFilters.put("user", l1);
primaryFilters.put("appname", l2);
primaryFilters.put("other", l3);
primaryFilters.put("long", l4);
secondaryFilters = new HashMap<String, Object>();
secondaryFilters.put("startTime", 123456l);
secondaryFilters.put("startTime", 123456);
secondaryFilters.put("status", "RUNNING");
allFilters = new HashMap<String, Object>();
allFilters.putAll(secondaryFilters);
@ -353,6 +370,30 @@ public class TimelineStoreTestUtils {
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
store.getEntities("type_1", null, null, null,
numericFilter1, null, EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
store.getEntities("type_1", null, null, null,
numericFilter2, null, EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
store.getEntities("type_1", null, null, null,
numericFilter3, null, EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
entities = store.getEntities("type_2", null, null, null, userFilter, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(0, entities.size());

View File

@ -109,15 +109,7 @@ public class TestTimelineWebServices extends JerseyTest {
Assert.assertEquals("Timeline API", about.getAbout());
}
@Test
public void testGetEntities() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
TimelineEntities entities = response.getEntity(TimelineEntities.class);
private static void verifyEntities(TimelineEntities entities) {
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
TimelineEntity entity1 = entities.getEntities().get(0);
@ -126,7 +118,7 @@ public class TestTimelineWebServices extends JerseyTest {
Assert.assertEquals("type_1", entity1.getEntityType());
Assert.assertEquals(123l, entity1.getStartTime().longValue());
Assert.assertEquals(2, entity1.getEvents().size());
Assert.assertEquals(2, entity1.getPrimaryFilters().size());
Assert.assertEquals(4, entity1.getPrimaryFilters().size());
Assert.assertEquals(4, entity1.getOtherInfo().size());
TimelineEntity entity2 = entities.getEntities().get(1);
Assert.assertNotNull(entity2);
@ -134,10 +126,94 @@ public class TestTimelineWebServices extends JerseyTest {
Assert.assertEquals("type_1", entity2.getEntityType());
Assert.assertEquals(123l, entity2.getStartTime().longValue());
Assert.assertEquals(2, entity2.getEvents().size());
Assert.assertEquals(2, entity2.getPrimaryFilters().size());
Assert.assertEquals(4, entity2.getPrimaryFilters().size());
Assert.assertEquals(4, entity2.getOtherInfo().size());
}
@Test
public void testGetEntities() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
verifyEntities(response.getEntity(TimelineEntities.class));
}
@Test
public void testPrimaryFilterString() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").queryParam("primaryFilter", "user:username")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
verifyEntities(response.getEntity(TimelineEntities.class));
}
@Test
public void testPrimaryFilterInteger() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").queryParam("primaryFilter",
"appname:" + Integer.toString(Integer.MAX_VALUE))
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
verifyEntities(response.getEntity(TimelineEntities.class));
}
@Test
public void testPrimaryFilterLong() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").queryParam("primaryFilter",
"long:" + Long.toString((long)Integer.MAX_VALUE + 1l))
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
verifyEntities(response.getEntity(TimelineEntities.class));
}
@Test
public void testPrimaryFilterNumericString() {
// without quotes, 123abc is interpreted as the number 123,
// which finds no entities
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").queryParam("primaryFilter", "other:123abc")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
assertEquals(0, response.getEntity(TimelineEntities.class).getEntities()
.size());
}
@Test
public void testPrimaryFilterNumericStringWithQuotes() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").queryParam("primaryFilter", "other:\"123abc\"")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
verifyEntities(response.getEntity(TimelineEntities.class));
}
@Test
public void testSecondaryFilters() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1")
.queryParam("secondaryFilter",
"user:username,appname:" + Integer.toString(Integer.MAX_VALUE))
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
verifyEntities(response.getEntity(TimelineEntities.class));
}
@Test
public void testGetEntity() throws Exception {
WebResource r = resource();
@ -152,7 +228,7 @@ public class TestTimelineWebServices extends JerseyTest {
Assert.assertEquals("type_1", entity.getEntityType());
Assert.assertEquals(123l, entity.getStartTime().longValue());
Assert.assertEquals(2, entity.getEvents().size());
Assert.assertEquals(2, entity.getPrimaryFilters().size());
Assert.assertEquals(4, entity.getPrimaryFilters().size());
Assert.assertEquals(4, entity.getOtherInfo().size());
}
@ -189,7 +265,7 @@ public class TestTimelineWebServices extends JerseyTest {
Assert.assertEquals("type_1", entity.getEntityType());
Assert.assertEquals(123l, entity.getStartTime().longValue());
Assert.assertEquals(1, entity.getEvents().size());
Assert.assertEquals(2, entity.getPrimaryFilters().size());
Assert.assertEquals(4, entity.getPrimaryFilters().size());
Assert.assertEquals(0, entity.getOtherInfo().size());
}