HDFS-12773. RBF: Improve State Store FS implementation. Contributed by Inigo Goiri.

This commit is contained in:
Yiqun Lin 2018-03-14 11:20:59 +08:00
parent 427fd027a3
commit 76be6cbf6c
10 changed files with 444 additions and 366 deletions

View File

@ -140,5 +140,10 @@ public final class StateStoreMetrics implements StateStoreMBean {
writes.resetMinMax();
removes.resetMinMax();
failures.resetMinMax();
reads.lastStat().reset();
writes.lastStat().reset();
removes.lastStat().reset();
failures.lastStat().reset();
}
}

View File

@ -48,21 +48,6 @@ public interface StateStoreRecordOperations {
@Idempotent
<T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException;
/**
* Get all records of the requested record class from the data store. To use
* the default implementations in this class, getAll must return new instances
* of the records on each call. It is recommended to override the default
* implementations for better performance.
*
* @param clazz Class of record to fetch.
* @param sub Sub path.
* @return List of all records that match the clazz and the sub path.
* @throws IOException
*/
@Idempotent
<T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
throws IOException;
/**
* Get a single record from the store that matches the query.
*

View File

@ -18,28 +18,39 @@
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.util.Time.now;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* {@link StateStoreDriver} implementation based on a local file.
* {@link StateStoreDriver} implementation based on files. In this approach, we
* use temporary files for the writes and renaming "atomically" to the final
* value. Instead of writing to the final location, it will go to a temporary
* one and then rename to the final destination.
*/
public abstract class StateStoreFileBaseImpl
extends StateStoreSerializableImpl {
@ -47,59 +58,35 @@ public abstract class StateStoreFileBaseImpl
private static final Logger LOG =
LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
/** File extension for temporary files. */
private static final String TMP_MARK = ".tmp";
/** We remove temporary files older than 10 seconds. */
private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
/** File pattern for temporary records: file.XYZ.tmp. */
private static final Pattern OLD_TMP_RECORD_PATTERN =
Pattern.compile(".+\\.(\\d+)\\.tmp");
/** If it is initialized. */
private boolean initialized = false;
/** Name of the file containing the data. */
private static final String DATA_FILE_NAME = "records.data";
/**
* Lock reading records.
* Get the reader of a record for the file system.
*
* @param clazz Class of the record.
*/
protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz);
/**
* Unlock reading records.
*
* @param clazz Class of the record.
*/
protected abstract <T extends BaseRecord> void unlockRecordRead(
Class<T> clazz);
/**
* Lock writing records.
*
* @param clazz Class of the record.
*/
protected abstract <T extends BaseRecord> void lockRecordWrite(
Class<T> clazz);
/**
* Unlock writing records.
*
* @param clazz Class of the record.
*/
protected abstract <T extends BaseRecord> void unlockRecordWrite(
Class<T> clazz);
/**
* Get the reader for the file system.
*
* @param clazz Class of the record.
* @param path Path of the record to read.
* @return Reader for the record.
*/
protected abstract <T extends BaseRecord> BufferedReader getReader(
Class<T> clazz, String sub);
String path);
/**
* Get the writer for the file system.
* Get the writer of a record for the file system.
*
* @param clazz Class of the record.
* @param path Path of the record to write.
* @return Writer for the record.
*/
protected abstract <T extends BaseRecord> BufferedWriter getWriter(
Class<T> clazz, String sub);
String path);
/**
* Check if a path exists.
@ -117,6 +104,31 @@ public abstract class StateStoreFileBaseImpl
*/
protected abstract boolean mkdir(String path);
/**
* Rename a file. This should be atomic.
*
* @param src Source name.
* @param dst Destination name.
* @return If the rename was successful.
*/
protected abstract boolean rename(String src, String dst);
/**
* Remove a file.
*
* @param path Path for the file to remove
* @return If the file was removed.
*/
protected abstract boolean remove(String path);
/**
* Get the children for a path.
*
* @param path Path to check.
* @return List of children.
*/
protected abstract List<String> getChildren(String path);
/**
* Get root directory.
*
@ -171,15 +183,6 @@ public abstract class StateStoreFileBaseImpl
LOG.error("Cannot create data directory {}", dataDirPath);
return false;
}
String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
if (!exists(dataFilePath)) {
// Create empty file
List<T> emtpyList = new ArrayList<>();
if(!writeAll(emtpyList, recordClass)) {
LOG.error("Cannot create data file {}", dataFilePath);
return false;
}
}
}
} catch (Exception ex) {
LOG.error("Cannot create data directory {}", dataDirPath, ex);
@ -188,138 +191,110 @@ public abstract class StateStoreFileBaseImpl
return true;
}
/**
* Read all lines from a file and deserialize into the desired record type.
*
* @param reader Open handle for the file.
* @param clazz Record class to create.
* @param includeDates True if dateModified/dateCreated are serialized.
* @return List of records.
* @throws IOException
*/
private <T extends BaseRecord> List<T> getAllFile(
BufferedReader reader, Class<T> clazz, boolean includeDates)
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
throws IOException {
verifyDriverReady();
long start = monotonicNow();
StateStoreMetrics metrics = getMetrics();
List<T> ret = new ArrayList<>();
try {
String path = getPathForClass(clazz);
List<String> children = getChildren(path);
for (String child : children) {
String pathRecord = path + "/" + child;
if (child.endsWith(TMP_MARK)) {
LOG.debug("There is a temporary file {} in {}", child, path);
if (isOldTempRecord(child)) {
LOG.warn("Removing {} as it's an old temporary record", child);
remove(pathRecord);
}
} else {
T record = getRecord(pathRecord, clazz);
ret.add(record);
}
}
} catch (Exception e) {
if (metrics != null) {
metrics.addFailure(monotonicNow() - start);
}
String msg = "Cannot fetch records for " + clazz.getSimpleName();
LOG.error(msg, e);
throw new IOException(msg, e);
}
List<T> ret = new ArrayList<T>();
if (metrics != null) {
metrics.addRead(monotonicNow() - start);
}
return new QueryResult<T>(ret, getTime());
}
/**
* Check if a record is temporary and old.
*
* @param pathRecord Path for the record to check.
* @return If the record is temporary and old.
*/
@VisibleForTesting
public static boolean isOldTempRecord(final String pathRecord) {
if (!pathRecord.endsWith(TMP_MARK)) {
return false;
}
// Extract temporary record creation time
Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
if (m.find()) {
long time = Long.parseLong(m.group(1));
return now() - time > OLD_TMP_RECORD_MS;
}
return false;
}
/**
* Read a record from a file.
*
* @param path Path to the file containing the record.
* @param clazz Class of the record.
* @return Record read from the file.
* @throws IOException If the file cannot be read.
*/
private <T extends BaseRecord> T getRecord(
final String path, final Class<T> clazz) throws IOException {
BufferedReader reader = getReader(path);
try {
String line;
while ((line = reader.readLine()) != null) {
if (!line.startsWith("#") && line.length() > 0) {
try {
T record = newRecord(line, clazz, includeDates);
ret.add(record);
T record = newRecord(line, clazz, false);
return record;
} catch (Exception ex) {
LOG.error("Cannot parse line in data source file: {}", line, ex);
LOG.error("Cannot parse line {} in file {}", line, path, ex);
}
}
}
return ret;
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
throws IOException {
return get(clazz, (String)null);
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
throws IOException {
verifyDriverReady();
BufferedReader reader = null;
lockRecordRead(clazz);
try {
reader = getReader(clazz, sub);
List<T> data = getAllFile(reader, clazz, true);
return new QueryResult<T>(data, getTime());
} catch (Exception ex) {
LOG.error("Cannot fetch records {}", clazz.getSimpleName());
throw new IOException("Cannot read from data store " + ex.getMessage());
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
LOG.error("Failed closing file", e);
}
}
unlockRecordRead(clazz);
}
throw new IOException("Cannot read " + path + " for record " +
clazz.getSimpleName());
}
/**
* Overwrite the existing data with a new data set.
*
* @param records List of records to write.
* @param writer BufferedWriter stream to write to.
* @return If the records were succesfully written.
* Get the path for a record class.
* @param clazz Class of the record.
* @return Path for this record class.
*/
private <T extends BaseRecord> boolean writeAllFile(
Collection<T> records, BufferedWriter writer) {
try {
for (BaseRecord record : records) {
try {
String data = serializeString(record);
writer.write(data);
writer.newLine();
} catch (IllegalArgumentException ex) {
LOG.error("Cannot write record {} to file", record, ex);
private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
String className = StateStoreUtils.getRecordName(clazz);
StringBuilder sb = new StringBuilder();
sb.append(getRootDir());
if (sb.charAt(sb.length() - 1) != '/') {
sb.append("/");
}
}
writer.flush();
return true;
} catch (IOException e) {
LOG.error("Cannot commit records to file", e);
return false;
}
}
/**
* Overwrite the existing data with a new data set. Replaces all records in
* the data store for this record class. If all records in the data store are
* not successfully committed, this function must return false and leave the
* data store unchanged.
*
* @param records List of records to write. All records must be of type
* recordClass.
* @param recordClass Class of record to replace.
* @return true if all operations were successful, false otherwise.
* @throws StateStoreUnavailableException
*/
public <T extends BaseRecord> boolean writeAll(
Collection<T> records, Class<T> recordClass)
throws StateStoreUnavailableException {
verifyDriverReady();
lockRecordWrite(recordClass);
BufferedWriter writer = null;
try {
writer = getWriter(recordClass, null);
return writeAllFile(records, writer);
} catch (Exception e) {
LOG.error(
"Cannot add records to file for {}", recordClass.getSimpleName(), e);
return false;
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
LOG.error(
"Cannot close writer for {}", recordClass.getSimpleName(), e);
}
}
unlockRecordWrite(recordClass);
}
}
/**
* Get the data file name.
*
* @return Data file name.
*/
protected String getDataFileName() {
return DATA_FILE_NAME;
sb.append(className);
return sb.toString();
}
@Override
@ -332,56 +307,80 @@ public abstract class StateStoreFileBaseImpl
List<T> records, boolean allowUpdate, boolean errorIfExists)
throws StateStoreUnavailableException {
verifyDriverReady();
if (records.isEmpty()) {
return true;
}
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
QueryResult<T> result;
try {
result = get(clazz);
} catch (IOException e) {
return false;
}
Map<Object, T> writeList = new HashMap<>();
long start = monotonicNow();
StateStoreMetrics metrics = getMetrics();
// Write all of the existing records
for (T existingRecord : result.getRecords()) {
String key = existingRecord.getPrimaryKey();
writeList.put(key, existingRecord);
}
// Check if any record exists
Map<String, T> toWrite = new HashMap<>();
for (T record : records) {
Class<? extends BaseRecord> recordClass = record.getClass();
String path = getPathForClass(recordClass);
String primaryKey = getPrimaryKey(record);
String recordPath = path + "/" + primaryKey;
// Add inserts and updates, overwrite any existing values
for (T updatedRecord : records) {
try {
updatedRecord.validate();
String key = updatedRecord.getPrimaryKey();
if (writeList.containsKey(key) && allowUpdate) {
// Update
writeList.put(key, updatedRecord);
if (exists(recordPath)) {
if (allowUpdate) {
// Update the mod time stamp. Many backends will use their
// own timestamp for the mod time.
updatedRecord.setDateModified(this.getTime());
} else if (!writeList.containsKey(key)) {
// Insert
// Create/Mod timestamps are already initialized
writeList.put(key, updatedRecord);
record.setDateModified(this.getTime());
toWrite.put(recordPath, record);
} else if (errorIfExists) {
LOG.error("Attempt to insert record {} that already exists",
updatedRecord);
return false;
recordPath);
if (metrics != null) {
metrics.addFailure(monotonicNow() - start);
}
} catch (IllegalArgumentException ex) {
LOG.error("Cannot write invalid record to State Store", ex);
return false;
} else {
LOG.debug("Not updating {}", record);
}
} else {
toWrite.put(recordPath, record);
}
}
// Write all
boolean status = writeAll(writeList.values(), clazz);
return status;
// Write the records
boolean success = true;
for (Entry<String, T> entry : toWrite.entrySet()) {
String recordPath = entry.getKey();
String recordPathTemp = recordPath + "." + now() + TMP_MARK;
BufferedWriter writer = getWriter(recordPathTemp);
try {
T record = entry.getValue();
String line = serializeString(record);
writer.write(line);
} catch (IOException e) {
LOG.error("Cannot write {}", recordPathTemp, e);
success = false;
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
LOG.error("Cannot close the writer for {}", recordPathTemp);
}
}
}
// Commit
if (!rename(recordPathTemp, recordPath)) {
LOG.error("Failed committing record into {}", recordPath);
success = false;
}
}
long end = monotonicNow();
if (metrics != null) {
if (success) {
metrics.addWrite(end - start);
} else {
metrics.addFailure(end - start);
}
}
return success;
}
@Override
@ -393,6 +392,8 @@ public abstract class StateStoreFileBaseImpl
return 0;
}
long start = Time.monotonicNow();
StateStoreMetrics metrics = getMetrics();
int removed = 0;
// Get the current records
try {
@ -400,21 +401,34 @@ public abstract class StateStoreFileBaseImpl
final List<T> existingRecords = result.getRecords();
// Write all of the existing records except those to be removed
final List<T> recordsToRemove = filterMultiple(query, existingRecords);
removed = recordsToRemove.size();
final List<T> newRecords = new LinkedList<>();
for (T record : existingRecords) {
if (!recordsToRemove.contains(record)) {
newRecords.add(record);
boolean success = true;
for (T recordToRemove : recordsToRemove) {
String path = getPathForClass(clazz);
String primaryKey = getPrimaryKey(recordToRemove);
String recordToRemovePath = path + "/" + primaryKey;
if (remove(recordToRemovePath)) {
removed++;
} else {
LOG.error("Cannot remove record {}", recordToRemovePath);
success = false;
}
}
if (!writeAll(newRecords, clazz)) {
throw new IOException(
"Cannot remove record " + clazz + " query " + query);
if (!success) {
LOG.error("Cannot remove records {} query {}", clazz, query);
if (metrics != null) {
metrics.addFailure(monotonicNow() - start);
}
}
} catch (IOException e) {
LOG.error("Cannot remove records {} query {}", clazz, query, e);
if (metrics != null) {
metrics.addFailure(monotonicNow() - start);
}
}
if (removed > 0 && metrics != null) {
metrics.addRemove(monotonicNow() - start);
}
return removed;
}
@ -422,8 +436,27 @@ public abstract class StateStoreFileBaseImpl
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
throws StateStoreUnavailableException {
verifyDriverReady();
List<T> emptyList = new ArrayList<>();
boolean status = writeAll(emptyList, clazz);
return status;
long start = Time.monotonicNow();
StateStoreMetrics metrics = getMetrics();
boolean success = true;
String path = getPathForClass(clazz);
List<String> children = getChildren(path);
for (String child : children) {
String pathRecord = path + "/" + child;
if (!remove(pathRecord)) {
success = false;
}
}
if (metrics != null) {
long time = Time.monotonicNow() - start;
if (success) {
metrics.addRemove(time);
} else {
metrics.addFailure(time);
}
}
return success;
}
}

View File

@ -26,11 +26,10 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,10 +48,6 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
public static final String FEDERATION_STORE_FILE_DIRECTORY =
DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
/** Synchronization. */
private static final ReadWriteLock READ_WRITE_LOCK =
new ReentrantReadWriteLock();
/** Root directory for the state store. */
private String rootDirectory;
@ -69,6 +64,23 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
return dir.mkdirs();
}
@Override
protected boolean rename(String src, String dst) {
try {
Files.move(new File(src), new File(dst));
return true;
} catch (IOException e) {
LOG.error("Cannot rename {} to {}", src, dst, e);
return false;
}
}
@Override
protected boolean remove(String path) {
File file = new File(path);
return file.delete();
}
@Override
protected String getRootDir() {
if (this.rootDirectory == null) {
@ -76,6 +88,7 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
if (dir == null) {
File tempDir = Files.createTempDir();
dir = tempDir.getAbsolutePath();
LOG.warn("The root directory is not available, using {}", dir);
}
this.rootDirectory = dir;
}
@ -83,79 +96,53 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
}
@Override
protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) {
// TODO - Synchronize via FS
READ_WRITE_LOCK.writeLock().lock();
}
@Override
protected <T extends BaseRecord> void unlockRecordWrite(
Class<T> recordClass) {
// TODO - Synchronize via FS
READ_WRITE_LOCK.writeLock().unlock();
}
@Override
protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) {
// TODO - Synchronize via FS
READ_WRITE_LOCK.readLock().lock();
}
@Override
protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) {
// TODO - Synchronize via FS
READ_WRITE_LOCK.readLock().unlock();
}
@Override
protected <T extends BaseRecord> BufferedReader getReader(
Class<T> clazz, String sub) {
String filename = StateStoreUtils.getRecordName(clazz);
if (sub != null && sub.length() > 0) {
filename += "/" + sub;
}
filename += "/" + getDataFileName();
protected <T extends BaseRecord> BufferedReader getReader(String filename) {
BufferedReader reader = null;
try {
LOG.debug("Loading file: {}", filename);
File file = new File(getRootDir(), filename);
File file = new File(filename);
FileInputStream fis = new FileInputStream(file);
InputStreamReader isr =
new InputStreamReader(fis, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(isr);
return reader;
reader = new BufferedReader(isr);
} catch (Exception ex) {
LOG.error(
"Cannot open read stream for record {}", clazz.getSimpleName(), ex);
return null;
LOG.error("Cannot open read stream for record {}", filename, ex);
}
return reader;
}
@Override
protected <T extends BaseRecord> BufferedWriter getWriter(
Class<T> clazz, String sub) {
String filename = StateStoreUtils.getRecordName(clazz);
if (sub != null && sub.length() > 0) {
filename += "/" + sub;
}
filename += "/" + getDataFileName();
protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
BufferedWriter writer = null;
try {
File file = new File(getRootDir(), filename);
LOG.debug("Writing file: {}", filename);
File file = new File(filename);
FileOutputStream fos = new FileOutputStream(file, false);
OutputStreamWriter osw =
new OutputStreamWriter(fos, StandardCharsets.UTF_8);
BufferedWriter writer = new BufferedWriter(osw);
return writer;
} catch (IOException ex) {
LOG.error(
"Cannot open read stream for record {}", clazz.getSimpleName(), ex);
return null;
writer = new BufferedWriter(osw);
} catch (IOException e) {
LOG.error("Cannot open write stream for record {}", filename, e);
}
return writer;
}
@Override
public void close() throws Exception {
setInitialized(false);
}
@Override
protected List<String> getChildren(String path) {
List<String> ret = new LinkedList<>();
File dir = new File(path);
File[] files = dir.listFiles();
if (files != null) {
for (File file : files) {
String filename = file.getName();
ret.add(filename);
}
}
return ret;
}
}

View File

@ -24,13 +24,17 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,6 +76,36 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
}
}
@Override
protected boolean rename(String src, String dst) {
try {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
return true;
} else {
// Replace should be atomic but not available
if (fs.exists(new Path(dst))) {
fs.delete(new Path(dst), true);
}
return fs.rename(new Path(src), new Path(dst));
}
} catch (Exception e) {
LOG.error("Cannot rename {} to {}", src, dst, e);
return false;
}
}
@Override
protected boolean remove(String path) {
try {
return fs.delete(new Path(path), true);
} catch (Exception e) {
LOG.error("Cannot remove {}", path, e);
return false;
}
}
@Override
protected String getRootDir() {
if (this.workPath == null) {
@ -95,84 +129,50 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
}
}
/**
* Get the folder path for the record class' data.
*
* @param clazz Data record class.
* @return Path of the folder containing the record class' data files.
*/
private Path getPathForClass(Class<? extends BaseRecord> clazz) {
if (clazz == null) {
return null;
}
// TODO extract table name from class: entry.getTableName()
String className = StateStoreUtils.getRecordName(clazz);
return new Path(workPath, className);
}
@Override
protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) {
// Not required, synced with HDFS leasing
}
@Override
protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) {
// Not required, synced with HDFS leasing
}
@Override
protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) {
// TODO -> wait for lease to be available
}
@Override
protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) {
// TODO -> ensure lease is closed for the file
}
@Override
protected <T extends BaseRecord> BufferedReader getReader(
Class<T> clazz, String sub) {
Path path = getPathForClass(clazz);
if (sub != null && sub.length() > 0) {
path = Path.mergePaths(path, new Path("/" + sub));
}
path = Path.mergePaths(path, new Path("/" + getDataFileName()));
protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
BufferedReader reader = null;
Path path = new Path(pathName);
try {
FSDataInputStream fdis = fs.open(path);
InputStreamReader isr =
new InputStreamReader(fdis, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(isr);
return reader;
reader = new BufferedReader(isr);
} catch (IOException ex) {
LOG.error("Cannot open write stream for {} to {}",
clazz.getSimpleName(), path);
return null;
LOG.error("Cannot open read stream for {}", path);
}
return reader;
}
@Override
protected <T extends BaseRecord> BufferedWriter getWriter(
Class<T> clazz, String sub) {
Path path = getPathForClass(clazz);
if (sub != null && sub.length() > 0) {
path = Path.mergePaths(path, new Path("/" + sub));
}
path = Path.mergePaths(path, new Path("/" + getDataFileName()));
protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
BufferedWriter writer = null;
Path path = new Path(pathName);
try {
FSDataOutputStream fdos = fs.create(path, true);
OutputStreamWriter osw =
new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
BufferedWriter writer = new BufferedWriter(osw);
return writer;
writer = new BufferedWriter(osw);
} catch (IOException ex) {
LOG.error("Cannot open write stream for {} to {}",
clazz.getSimpleName(), path);
return null;
}
LOG.error("Cannot open write stream for {}", path);
}
return writer;
}
@Override
protected List<String> getChildren(String pathName) {
List<String> ret = new LinkedList<>();
Path path = new Path(workPath, pathName);
try {
FileStatus[] files = fs.listStatus(path);
for (FileStatus file : files) {
Path filePath = file.getPath();
String fileName = filePath.getName();
ret.add(fileName);
}
} catch (Exception e) {
LOG.error("Cannot get children for {}", pathName, e);
}
return ret;
}
}

View File

@ -117,12 +117,6 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
throws IOException {
return get(clazz, (String)null);
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
throws IOException {
verifyDriverReady();
long start = monotonicNow();
List<T> ret = new ArrayList<>();

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import org.junit.After;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,6 +75,14 @@ public class TestStateStoreDriverBase {
return stateStore.getDriver();
}
@After
public void cleanMetrics() {
if (stateStore != null) {
StateStoreMetrics metrics = stateStore.getMetrics();
metrics.reset();
}
}
@AfterClass
public static void tearDownCluster() {
if (stateStore != null) {

View File

@ -61,4 +61,16 @@ public class TestStateStoreFile extends TestStateStoreDriverBase {
throws IllegalArgumentException, IllegalAccessException, IOException {
testRemove(getStateStoreDriver());
}
@Test
public void testFetchErrors()
throws IllegalArgumentException, IllegalAccessException, IOException {
testFetchErrors(getStateStoreDriver());
}
@Test
public void testMetrics()
throws IllegalArgumentException, IllegalAccessException, IOException {
testMetrics(getStateStoreDriver());
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver;
import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl.isOldTempRecord;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.Time;
import org.junit.Test;
/**
* Tests for the State Store file based implementation.
*/
public class TestStateStoreFileBase {
@Test
public void testTempOld() {
assertFalse(isOldTempRecord("test.txt"));
assertFalse(isOldTempRecord("testfolder/test.txt"));
long tnow = Time.now();
String tmpFile1 = "test." + tnow + ".tmp";
assertFalse(isOldTempRecord(tmpFile1));
long told = Time.now() - TimeUnit.MINUTES.toMillis(1);
String tmpFile2 = "test." + told + ".tmp";
assertTrue(isOldTempRecord(tmpFile2));
}
}

View File

@ -69,15 +69,15 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
}
@Test
public void testUpdate()
throws IllegalArgumentException, IllegalAccessException, IOException {
testInsert(getStateStoreDriver());
public void testUpdate() throws IllegalArgumentException, IOException,
SecurityException, ReflectiveOperationException {
testPut(getStateStoreDriver());
}
@Test
public void testDelete()
throws IllegalArgumentException, IllegalAccessException, IOException {
testInsert(getStateStoreDriver());
testRemove(getStateStoreDriver());
}
@Test
@ -85,4 +85,10 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
throws IllegalArgumentException, IllegalAccessException, IOException {
testFetchErrors(getStateStoreDriver());
}
@Test
public void testMetrics()
throws IllegalArgumentException, IllegalAccessException, IOException {
testMetrics(getStateStoreDriver());
}
}