HADOOP-13649 s3guard: implement time-based (TTL) expiry for LocalMetadataStore (Gabor Bota)
This commit is contained in:
parent
1ef0a1db1d
commit
69aac696d9
|
@ -21,7 +21,10 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -37,6 +40,7 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a local, in-memory, implementation of MetadataStore.
|
* This is a local, in-memory, implementation of MetadataStore.
|
||||||
|
@ -51,24 +55,35 @@ import java.util.Map;
|
||||||
* This MetadataStore does not enforce filesystem rules such as disallowing
|
* This MetadataStore does not enforce filesystem rules such as disallowing
|
||||||
* non-recursive removal of non-empty directories. It is assumed the caller
|
* non-recursive removal of non-empty directories. It is assumed the caller
|
||||||
* already has to perform these sorts of checks.
|
* already has to perform these sorts of checks.
|
||||||
|
*
|
||||||
|
* Contains cache internally with time based eviction.
|
||||||
*/
|
*/
|
||||||
public class LocalMetadataStore implements MetadataStore {
|
public class LocalMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
|
public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
|
||||||
// TODO HADOOP-13649: use time instead of capacity for eviction.
|
|
||||||
public static final int DEFAULT_MAX_RECORDS = 128;
|
public static final int DEFAULT_MAX_RECORDS = 128;
|
||||||
|
public static final int DEFAULT_CACHE_ENTRY_TTL_MSEC = 10 * 1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum number of records.
|
* Maximum number of records.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceStability.Evolving
|
||||||
public static final String CONF_MAX_RECORDS =
|
public static final String CONF_MAX_RECORDS =
|
||||||
"fs.metadatastore.local.max_records";
|
"fs.metadatastore.local.max_records";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time to live in milliseconds. If zero, time-based expiration is
|
||||||
|
* disabled.
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public static final String CONF_CACHE_ENTRY_TTL =
|
||||||
|
"fs.metadatastore.local.ttl";
|
||||||
|
|
||||||
/** Contains directories and files. */
|
/** Contains directories and files. */
|
||||||
private LruHashMap<Path, PathMetadata> fileHash;
|
private Cache<Path, PathMetadata> fileCache;
|
||||||
|
|
||||||
/** Contains directory listings. */
|
/** Contains directory listings. */
|
||||||
private LruHashMap<Path, DirListingMetadata> dirHash;
|
private Cache<Path, DirListingMetadata> dirCache;
|
||||||
|
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
/* Null iff this FS does not have an associated URI host. */
|
/* Null iff this FS does not have an associated URI host. */
|
||||||
|
@ -94,9 +109,15 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
if (maxRecords < 4) {
|
if (maxRecords < 4) {
|
||||||
maxRecords = 4;
|
maxRecords = 4;
|
||||||
}
|
}
|
||||||
// Start w/ less than max capacity. Space / time trade off.
|
int ttl = conf.getInt(CONF_CACHE_ENTRY_TTL, DEFAULT_CACHE_ENTRY_TTL_MSEC);
|
||||||
fileHash = new LruHashMap<>(maxRecords/2, maxRecords);
|
|
||||||
dirHash = new LruHashMap<>(maxRecords/4, maxRecords);
|
CacheBuilder builder = CacheBuilder.newBuilder().maximumSize(maxRecords);
|
||||||
|
if (ttl >= 0) {
|
||||||
|
builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
fileCache = builder.build();
|
||||||
|
dirCache = builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -130,12 +151,12 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
// Delete entry from file cache, then from cached parent directory, if any
|
// Delete entry from file cache, then from cached parent directory, if any
|
||||||
|
|
||||||
deleteHashEntries(path, tombstone);
|
deleteCacheEntries(path, tombstone);
|
||||||
|
|
||||||
if (recursive) {
|
if (recursive) {
|
||||||
// Remove all entries that have this dir as path prefix.
|
// Remove all entries that have this dir as path prefix.
|
||||||
deleteHashByAncestor(path, dirHash, tombstone);
|
deleteEntryByAncestor(path, dirCache, tombstone);
|
||||||
deleteHashByAncestor(path, fileHash, tombstone);
|
deleteEntryByAncestor(path, fileCache, tombstone);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +170,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path path = standardize(p);
|
Path path = standardize(p);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
PathMetadata m = fileHash.mruGet(path);
|
PathMetadata m = fileCache.getIfPresent(path);
|
||||||
|
|
||||||
if (wantEmptyDirectoryFlag && m != null &&
|
if (wantEmptyDirectoryFlag && m != null &&
|
||||||
m.getFileStatus().isDirectory()) {
|
m.getFileStatus().isDirectory()) {
|
||||||
|
@ -170,7 +191,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
* @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
|
* @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
|
||||||
*/
|
*/
|
||||||
private Tristate isEmptyDirectory(Path p) {
|
private Tristate isEmptyDirectory(Path p) {
|
||||||
DirListingMetadata dirMeta = dirHash.get(p);
|
DirListingMetadata dirMeta = dirCache.getIfPresent(p);
|
||||||
return dirMeta.withoutTombstones().isEmpty();
|
return dirMeta.withoutTombstones().isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +199,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
public synchronized DirListingMetadata listChildren(Path p) throws
|
public synchronized DirListingMetadata listChildren(Path p) throws
|
||||||
IOException {
|
IOException {
|
||||||
Path path = standardize(p);
|
Path path = standardize(p);
|
||||||
DirListingMetadata listing = dirHash.mruGet(path);
|
DirListingMetadata listing = dirCache.getIfPresent(path);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("listChildren({}) -> {}", path,
|
LOG.debug("listChildren({}) -> {}", path,
|
||||||
listing == null ? "null" : listing.prettyPrint());
|
listing == null ? "null" : listing.prettyPrint());
|
||||||
|
@ -237,10 +258,10 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("put {} -> {}", path, meta.prettyPrint());
|
LOG.debug("put {} -> {}", path, meta.prettyPrint());
|
||||||
}
|
}
|
||||||
fileHash.put(path, meta);
|
fileCache.put(path, meta);
|
||||||
|
|
||||||
/* Directory case:
|
/* Directory case:
|
||||||
* We also make sure we have an entry in the dirHash, so subsequent
|
* We also make sure we have an entry in the dirCache, so subsequent
|
||||||
* listStatus(path) at least see the directory.
|
* listStatus(path) at least see the directory.
|
||||||
*
|
*
|
||||||
* If we had a boolean flag argument "isNew", we would know whether this
|
* If we had a boolean flag argument "isNew", we would know whether this
|
||||||
|
@ -251,9 +272,9 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (status.isDirectory()) {
|
if (status.isDirectory()) {
|
||||||
DirListingMetadata dir = dirHash.mruGet(path);
|
DirListingMetadata dir = dirCache.getIfPresent(path);
|
||||||
if (dir == null) {
|
if (dir == null) {
|
||||||
dirHash.put(path, new DirListingMetadata(path, DirListingMetadata
|
dirCache.put(path, new DirListingMetadata(path, DirListingMetadata
|
||||||
.EMPTY_DIR, false));
|
.EMPTY_DIR, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,14 +282,14 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
/* Update cached parent dir. */
|
/* Update cached parent dir. */
|
||||||
Path parentPath = path.getParent();
|
Path parentPath = path.getParent();
|
||||||
if (parentPath != null) {
|
if (parentPath != null) {
|
||||||
DirListingMetadata parent = dirHash.mruGet(parentPath);
|
DirListingMetadata parent = dirCache.getIfPresent(parentPath);
|
||||||
if (parent == null) {
|
if (parent == null) {
|
||||||
/* Track this new file's listing in parent. Parent is not
|
/* Track this new file's listing in parent. Parent is not
|
||||||
* authoritative, since there may be other items in it we don't know
|
* authoritative, since there may be other items in it we don't know
|
||||||
* about. */
|
* about. */
|
||||||
parent = new DirListingMetadata(parentPath,
|
parent = new DirListingMetadata(parentPath,
|
||||||
DirListingMetadata.EMPTY_DIR, false);
|
DirListingMetadata.EMPTY_DIR, false);
|
||||||
dirHash.put(parentPath, parent);
|
dirCache.put(parentPath, parent);
|
||||||
}
|
}
|
||||||
parent.put(status);
|
parent.put(status);
|
||||||
}
|
}
|
||||||
|
@ -280,7 +301,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("put dirMeta {}", meta.prettyPrint());
|
LOG.debug("put dirMeta {}", meta.prettyPrint());
|
||||||
}
|
}
|
||||||
dirHash.put(standardize(meta.getPath()), meta);
|
dirCache.put(standardize(meta.getPath()), meta);
|
||||||
put(meta.getListing());
|
put(meta.getListing());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -298,8 +319,8 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() throws IOException {
|
public void destroy() throws IOException {
|
||||||
if (dirHash != null) {
|
if (dirCache != null) {
|
||||||
dirHash.clear();
|
dirCache.invalidateAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +333,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
public synchronized void prune(long modTime, String keyPrefix)
|
public synchronized void prune(long modTime, String keyPrefix)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Iterator<Map.Entry<Path, PathMetadata>> files =
|
Iterator<Map.Entry<Path, PathMetadata>> files =
|
||||||
fileHash.entrySet().iterator();
|
fileCache.asMap().entrySet().iterator();
|
||||||
while (files.hasNext()) {
|
while (files.hasNext()) {
|
||||||
Map.Entry<Path, PathMetadata> entry = files.next();
|
Map.Entry<Path, PathMetadata> entry = files.next();
|
||||||
if (expired(entry.getValue().getFileStatus(), modTime, keyPrefix)) {
|
if (expired(entry.getValue().getFileStatus(), modTime, keyPrefix)) {
|
||||||
|
@ -320,7 +341,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
|
Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
|
||||||
dirHash.entrySet().iterator();
|
dirCache.asMap().entrySet().iterator();
|
||||||
while (dirs.hasNext()) {
|
while (dirs.hasNext()) {
|
||||||
Map.Entry<Path, DirListingMetadata> entry = dirs.next();
|
Map.Entry<Path, DirListingMetadata> entry = dirs.next();
|
||||||
Path path = entry.getKey();
|
Path path = entry.getKey();
|
||||||
|
@ -335,9 +356,10 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (newChildren.size() != oldChildren.size()) {
|
if (newChildren.size() != oldChildren.size()) {
|
||||||
dirHash.put(path, new DirListingMetadata(path, newChildren, false));
|
dirCache.put(path, new DirListingMetadata(path, newChildren, false));
|
||||||
if (!path.isRoot()) {
|
if (!path.isRoot()) {
|
||||||
DirListingMetadata parent = dirHash.get(path.getParent());
|
DirListingMetadata parent = null;
|
||||||
|
parent = dirCache.getIfPresent(path.getParent());
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
parent.setAuthoritative(false);
|
parent.setAuthoritative(false);
|
||||||
}
|
}
|
||||||
|
@ -354,9 +376,9 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash,
|
static <T> void deleteEntryByAncestor(Path ancestor, Cache<Path, T> cache,
|
||||||
boolean tombstone) {
|
boolean tombstone) {
|
||||||
for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
|
for (Iterator<Map.Entry<Path, T>> it = cache.asMap().entrySet().iterator();
|
||||||
it.hasNext();) {
|
it.hasNext();) {
|
||||||
Map.Entry<Path, T> entry = it.next();
|
Map.Entry<Path, T> entry = it.next();
|
||||||
Path f = entry.getKey();
|
Path f = entry.getKey();
|
||||||
|
@ -364,11 +386,11 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
if (isAncestorOf(ancestor, f)) {
|
if (isAncestorOf(ancestor, f)) {
|
||||||
if (tombstone) {
|
if (tombstone) {
|
||||||
if (meta instanceof PathMetadata) {
|
if (meta instanceof PathMetadata) {
|
||||||
entry.setValue((T) PathMetadata.tombstone(f));
|
cache.put(f, (T) PathMetadata.tombstone(f));
|
||||||
} else if (meta instanceof DirListingMetadata) {
|
} else if (meta instanceof DirListingMetadata) {
|
||||||
it.remove();
|
it.remove();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Unknown type in hash");
|
throw new IllegalStateException("Unknown type in cache");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
it.remove();
|
it.remove();
|
||||||
|
@ -391,17 +413,17 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update fileHash and dirHash to reflect deletion of file 'f'. Call with
|
* Update fileCache and dirCache to reflect deletion of file 'f'. Call with
|
||||||
* lock held.
|
* lock held.
|
||||||
*/
|
*/
|
||||||
private void deleteHashEntries(Path path, boolean tombstone) {
|
private void deleteCacheEntries(Path path, boolean tombstone) {
|
||||||
|
|
||||||
// Remove target file/dir
|
// Remove target file/dir
|
||||||
LOG.debug("delete file entry for {}", path);
|
LOG.debug("delete file entry for {}", path);
|
||||||
if (tombstone) {
|
if (tombstone) {
|
||||||
fileHash.put(path, PathMetadata.tombstone(path));
|
fileCache.put(path, PathMetadata.tombstone(path));
|
||||||
} else {
|
} else {
|
||||||
fileHash.remove(path);
|
fileCache.invalidate(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update this and parent dir listing, if any
|
// Update this and parent dir listing, if any
|
||||||
|
@ -409,12 +431,13 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
/* If this path is a dir, remove its listing */
|
/* If this path is a dir, remove its listing */
|
||||||
LOG.debug("removing listing of {}", path);
|
LOG.debug("removing listing of {}", path);
|
||||||
|
|
||||||
dirHash.remove(path);
|
dirCache.invalidate(path);
|
||||||
|
|
||||||
/* Remove this path from parent's dir listing */
|
/* Remove this path from parent's dir listing */
|
||||||
Path parent = path.getParent();
|
Path parent = path.getParent();
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
DirListingMetadata dir = dirHash.get(parent);
|
DirListingMetadata dir = null;
|
||||||
|
dir = dirCache.getIfPresent(parent);
|
||||||
if (dir != null) {
|
if (dir != null) {
|
||||||
LOG.debug("removing parent's entry for {} ", path);
|
LOG.debug("removing parent's entry for {} ", path);
|
||||||
if (tombstone) {
|
if (tombstone) {
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* LinkedHashMap that implements a maximum size and LRU eviction policy.
|
|
||||||
*/
|
|
||||||
public class LruHashMap<K, V> extends LinkedHashMap<K, V> {
|
|
||||||
private final int maxSize;
|
|
||||||
public LruHashMap(int initialCapacity, int maxSize) {
|
|
||||||
super(initialCapacity);
|
|
||||||
this.maxSize = maxSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
|
|
||||||
return size() > maxSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get() plus side-effect of making the element Most Recently Used.
|
|
||||||
* @param key lookup key
|
|
||||||
* @return value
|
|
||||||
*/
|
|
||||||
|
|
||||||
public V mruGet(K key) {
|
|
||||||
V val = remove(key);
|
|
||||||
if (val != null) {
|
|
||||||
put(key, val);
|
|
||||||
}
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -19,9 +19,11 @@
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
import com.google.common.base.Ticker;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -75,38 +77,105 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClearByAncestor() {
|
public void testClearByAncestor() {
|
||||||
Map<Path, PathMetadata> map = new HashMap<>();
|
Cache<Path, PathMetadata> cache = CacheBuilder.newBuilder().build();
|
||||||
|
|
||||||
// 1. Test paths without scheme/host
|
// 1. Test paths without scheme/host
|
||||||
assertClearResult(map, "", "/", 0);
|
assertClearResult(cache, "", "/", 0);
|
||||||
assertClearResult(map, "", "/dirA/dirB", 2);
|
assertClearResult(cache, "", "/dirA/dirB", 2);
|
||||||
assertClearResult(map, "", "/invalid", 5);
|
assertClearResult(cache, "", "/invalid", 5);
|
||||||
|
|
||||||
|
|
||||||
// 2. Test paths w/ scheme/host
|
// 2. Test paths w/ scheme/host
|
||||||
String p = "s3a://fake-bucket-name";
|
String p = "s3a://fake-bucket-name";
|
||||||
assertClearResult(map, p, "/", 0);
|
assertClearResult(cache, p, "/", 0);
|
||||||
assertClearResult(map, p, "/dirA/dirB", 2);
|
assertClearResult(cache, p, "/dirA/dirB", 2);
|
||||||
assertClearResult(map, p, "/invalid", 5);
|
assertClearResult(cache, p, "/invalid", 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void populateMap(Map<Path, PathMetadata> map,
|
static class TestTicker extends Ticker {
|
||||||
|
private long myTicker = 0;
|
||||||
|
@Override public long read() {
|
||||||
|
return myTicker;
|
||||||
|
}
|
||||||
|
public void set(long val) {
|
||||||
|
this.myTicker = val;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that time eviction in cache used in {@link LocalMetadataStore}
|
||||||
|
* implementation working properly.
|
||||||
|
*
|
||||||
|
* The test creates a Ticker instance, which will be used to control the
|
||||||
|
* internal clock of the cache to achieve eviction without having to wait
|
||||||
|
* for the system clock.
|
||||||
|
* The test creates 3 entry: 2nd and 3rd entry will survive the eviction,
|
||||||
|
* because it will be created later than the 1st - using the ticker.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCacheTimedEvictionAfterWrite() {
|
||||||
|
TestTicker testTicker = new TestTicker();
|
||||||
|
final long t0 = testTicker.read();
|
||||||
|
final long t1 = t0 + 100;
|
||||||
|
final long t2 = t1 + 100;
|
||||||
|
|
||||||
|
final long ttl = t1 + 50; // between t1 and t2
|
||||||
|
|
||||||
|
Cache<Path, PathMetadata> cache = CacheBuilder.newBuilder()
|
||||||
|
.expireAfterWrite(ttl,
|
||||||
|
TimeUnit.NANOSECONDS /* nanos to avoid conversions */)
|
||||||
|
.ticker(testTicker)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
String p = "s3a://fake-bucket-name";
|
||||||
|
Path path1 = new Path(p + "/dirA/dirB/file1");
|
||||||
|
Path path2 = new Path(p + "/dirA/dirB/file2");
|
||||||
|
Path path3 = new Path(p + "/dirA/dirB/file3");
|
||||||
|
|
||||||
|
// Test time is t0
|
||||||
|
populateEntry(cache, path1);
|
||||||
|
|
||||||
|
// set new value on the ticker, so the next two entries will be added later
|
||||||
|
testTicker.set(t1); // Test time is now t1
|
||||||
|
populateEntry(cache, path2);
|
||||||
|
populateEntry(cache, path3);
|
||||||
|
|
||||||
|
assertEquals("Cache should contain 3 records before eviction",
|
||||||
|
3, cache.size());
|
||||||
|
PathMetadata pm1 = cache.getIfPresent(path1);
|
||||||
|
assertNotNull("PathMetadata should not be null before eviction", pm1);
|
||||||
|
|
||||||
|
// set the ticker to a time when timed eviction should occur
|
||||||
|
// for the first entry
|
||||||
|
testTicker.set(t2);
|
||||||
|
|
||||||
|
// call cleanup explicitly, as timed expiration is performed with
|
||||||
|
// periodic maintenance during writes and occasionally during reads only
|
||||||
|
cache.cleanUp();
|
||||||
|
|
||||||
|
assertEquals("Cache size should be 2 after eviction", 2, cache.size());
|
||||||
|
pm1 = cache.getIfPresent(path1);
|
||||||
|
assertNull("PathMetadata should be null after eviction", pm1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void populateMap(Cache<Path, PathMetadata> cache,
|
||||||
String prefix) {
|
String prefix) {
|
||||||
populateEntry(map, new Path(prefix + "/dirA/dirB/"));
|
populateEntry(cache, new Path(prefix + "/dirA/dirB/"));
|
||||||
populateEntry(map, new Path(prefix + "/dirA/dirB/dirC"));
|
populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC"));
|
||||||
populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file1"));
|
populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC/file1"));
|
||||||
populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file2"));
|
populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC/file2"));
|
||||||
populateEntry(map, new Path(prefix + "/dirA/file1"));
|
populateEntry(cache, new Path(prefix + "/dirA/file1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void populateEntry(Map<Path, PathMetadata> map,
|
private static void populateEntry(Cache<Path, PathMetadata> cache,
|
||||||
Path path) {
|
Path path) {
|
||||||
map.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
|
cache.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int sizeOfMap(Map<Path, PathMetadata> map) {
|
private static int sizeOfMap(Cache<Path, PathMetadata> cache) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (PathMetadata meta : map.values()) {
|
for (PathMetadata meta : cache.asMap().values()) {
|
||||||
if (!meta.isDeleted()) {
|
if (!meta.isDeleted()) {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
@ -114,14 +183,14 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertClearResult(Map <Path, PathMetadata> map,
|
private static void assertClearResult(Cache<Path, PathMetadata> cache,
|
||||||
String prefixStr, String pathStr, int leftoverSize) {
|
String prefixStr, String pathStr, int leftoverSize) {
|
||||||
populateMap(map, prefixStr);
|
populateMap(cache, prefixStr);
|
||||||
LocalMetadataStore.deleteHashByAncestor(new Path(prefixStr + pathStr), map,
|
LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr),
|
||||||
true);
|
cache, true);
|
||||||
assertEquals(String.format("Map should have %d entries", leftoverSize),
|
assertEquals(String.format("Cache should have %d entries", leftoverSize),
|
||||||
leftoverSize, sizeOfMap(map));
|
leftoverSize, sizeOfMap(cache));
|
||||||
map.clear();
|
cache.invalidateAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue