HDFS-12773. RBF: Improve State Store FS implementation. Contributed by Inigo Goiri.
(cherry picked from commit 76be6cbf6c
)
This commit is contained in:
parent
95a4665ad5
commit
a105069729
|
@ -140,5 +140,10 @@ public final class StateStoreMetrics implements StateStoreMBean {
|
|||
writes.resetMinMax();
|
||||
removes.resetMinMax();
|
||||
failures.resetMinMax();
|
||||
|
||||
reads.lastStat().reset();
|
||||
writes.lastStat().reset();
|
||||
removes.lastStat().reset();
|
||||
failures.lastStat().reset();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,21 +48,6 @@ public interface StateStoreRecordOperations {
|
|||
@Idempotent
|
||||
<T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException;
|
||||
|
||||
/**
|
||||
* Get all records of the requested record class from the data store. To use
|
||||
* the default implementations in this class, getAll must return new instances
|
||||
* of the records on each call. It is recommended to override the default
|
||||
* implementations for better performance.
|
||||
*
|
||||
* @param clazz Class of record to fetch.
|
||||
* @param sub Sub path.
|
||||
* @return List of all records that match the clazz and the sub path.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
<T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Get a single record from the store that matches the query.
|
||||
*
|
||||
|
|
|
@ -18,28 +18,39 @@
|
|||
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* {@link StateStoreDriver} implementation based on a local file.
|
||||
* {@link StateStoreDriver} implementation based on files. In this approach, we
|
||||
* use temporary files for the writes and renaming "atomically" to the final
|
||||
* value. Instead of writing to the final location, it will go to a temporary
|
||||
* one and then rename to the final destination.
|
||||
*/
|
||||
public abstract class StateStoreFileBaseImpl
|
||||
extends StateStoreSerializableImpl {
|
||||
|
@ -47,59 +58,35 @@ public abstract class StateStoreFileBaseImpl
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
|
||||
|
||||
/** File extension for temporary files. */
|
||||
private static final String TMP_MARK = ".tmp";
|
||||
/** We remove temporary files older than 10 seconds. */
|
||||
private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
|
||||
/** File pattern for temporary records: file.XYZ.tmp. */
|
||||
private static final Pattern OLD_TMP_RECORD_PATTERN =
|
||||
Pattern.compile(".+\\.(\\d+)\\.tmp");
|
||||
|
||||
/** If it is initialized. */
|
||||
private boolean initialized = false;
|
||||
|
||||
/** Name of the file containing the data. */
|
||||
private static final String DATA_FILE_NAME = "records.data";
|
||||
|
||||
|
||||
/**
|
||||
* Lock reading records.
|
||||
* Get the reader of a record for the file system.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Unlock reading records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void unlockRecordRead(
|
||||
Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Lock writing records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void lockRecordWrite(
|
||||
Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Unlock writing records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void unlockRecordWrite(
|
||||
Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Get the reader for the file system.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
* @param path Path of the record to read.
|
||||
* @return Reader for the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> BufferedReader getReader(
|
||||
Class<T> clazz, String sub);
|
||||
String path);
|
||||
|
||||
/**
|
||||
* Get the writer for the file system.
|
||||
* Get the writer of a record for the file system.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
* @param path Path of the record to write.
|
||||
* @return Writer for the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> BufferedWriter getWriter(
|
||||
Class<T> clazz, String sub);
|
||||
String path);
|
||||
|
||||
/**
|
||||
* Check if a path exists.
|
||||
|
@ -117,6 +104,31 @@ public abstract class StateStoreFileBaseImpl
|
|||
*/
|
||||
protected abstract boolean mkdir(String path);
|
||||
|
||||
/**
|
||||
* Rename a file. This should be atomic.
|
||||
*
|
||||
* @param src Source name.
|
||||
* @param dst Destination name.
|
||||
* @return If the rename was successful.
|
||||
*/
|
||||
protected abstract boolean rename(String src, String dst);
|
||||
|
||||
/**
|
||||
* Remove a file.
|
||||
*
|
||||
* @param path Path for the file to remove
|
||||
* @return If the file was removed.
|
||||
*/
|
||||
protected abstract boolean remove(String path);
|
||||
|
||||
/**
|
||||
* Get the children for a path.
|
||||
*
|
||||
* @param path Path to check.
|
||||
* @return List of children.
|
||||
*/
|
||||
protected abstract List<String> getChildren(String path);
|
||||
|
||||
/**
|
||||
* Get root directory.
|
||||
*
|
||||
|
@ -171,15 +183,6 @@ public abstract class StateStoreFileBaseImpl
|
|||
LOG.error("Cannot create data directory {}", dataDirPath);
|
||||
return false;
|
||||
}
|
||||
String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
|
||||
if (!exists(dataFilePath)) {
|
||||
// Create empty file
|
||||
List<T> emtpyList = new ArrayList<>();
|
||||
if(!writeAll(emtpyList, recordClass)) {
|
||||
LOG.error("Cannot create data file {}", dataFilePath);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot create data directory {}", dataDirPath, ex);
|
||||
|
@ -188,138 +191,110 @@ public abstract class StateStoreFileBaseImpl
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all lines from a file and deserialize into the desired record type.
|
||||
*
|
||||
* @param reader Open handle for the file.
|
||||
* @param clazz Record class to create.
|
||||
* @param includeDates True if dateModified/dateCreated are serialized.
|
||||
* @return List of records.
|
||||
* @throws IOException
|
||||
*/
|
||||
private <T extends BaseRecord> List<T> getAllFile(
|
||||
BufferedReader reader, Class<T> clazz, boolean includeDates)
|
||||
throws IOException {
|
||||
|
||||
List<T> ret = new ArrayList<T>();
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (!line.startsWith("#") && line.length() > 0) {
|
||||
try {
|
||||
T record = newRecord(line, clazz, includeDates);
|
||||
ret.add(record);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot parse line in data source file: {}", line, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
|
||||
throws IOException {
|
||||
return get(clazz, (String)null);
|
||||
verifyDriverReady();
|
||||
long start = monotonicNow();
|
||||
StateStoreMetrics metrics = getMetrics();
|
||||
List<T> ret = new ArrayList<>();
|
||||
try {
|
||||
String path = getPathForClass(clazz);
|
||||
List<String> children = getChildren(path);
|
||||
for (String child : children) {
|
||||
String pathRecord = path + "/" + child;
|
||||
if (child.endsWith(TMP_MARK)) {
|
||||
LOG.debug("There is a temporary file {} in {}", child, path);
|
||||
if (isOldTempRecord(child)) {
|
||||
LOG.warn("Removing {} as it's an old temporary record", child);
|
||||
remove(pathRecord);
|
||||
}
|
||||
} else {
|
||||
T record = getRecord(pathRecord, clazz);
|
||||
ret.add(record);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(monotonicNow() - start);
|
||||
}
|
||||
String msg = "Cannot fetch records for " + clazz.getSimpleName();
|
||||
LOG.error(msg, e);
|
||||
throw new IOException(msg, e);
|
||||
}
|
||||
|
||||
if (metrics != null) {
|
||||
metrics.addRead(monotonicNow() - start);
|
||||
}
|
||||
return new QueryResult<T>(ret, getTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
|
||||
throws IOException {
|
||||
verifyDriverReady();
|
||||
BufferedReader reader = null;
|
||||
lockRecordRead(clazz);
|
||||
/**
|
||||
* Check if a record is temporary and old.
|
||||
*
|
||||
* @param pathRecord Path for the record to check.
|
||||
* @return If the record is temporary and old.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static boolean isOldTempRecord(final String pathRecord) {
|
||||
if (!pathRecord.endsWith(TMP_MARK)) {
|
||||
return false;
|
||||
}
|
||||
// Extract temporary record creation time
|
||||
Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
|
||||
if (m.find()) {
|
||||
long time = Long.parseLong(m.group(1));
|
||||
return now() - time > OLD_TMP_RECORD_MS;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a record from a file.
|
||||
*
|
||||
* @param path Path to the file containing the record.
|
||||
* @param clazz Class of the record.
|
||||
* @return Record read from the file.
|
||||
* @throws IOException If the file cannot be read.
|
||||
*/
|
||||
private <T extends BaseRecord> T getRecord(
|
||||
final String path, final Class<T> clazz) throws IOException {
|
||||
BufferedReader reader = getReader(path);
|
||||
try {
|
||||
reader = getReader(clazz, sub);
|
||||
List<T> data = getAllFile(reader, clazz, true);
|
||||
return new QueryResult<T>(data, getTime());
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot fetch records {}", clazz.getSimpleName());
|
||||
throw new IOException("Cannot read from data store " + ex.getMessage());
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (!line.startsWith("#") && line.length() > 0) {
|
||||
try {
|
||||
T record = newRecord(line, clazz, false);
|
||||
return record;
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot parse line {} in file {}", line, path, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed closing file", e);
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
unlockRecordRead(clazz);
|
||||
}
|
||||
throw new IOException("Cannot read " + path + " for record " +
|
||||
clazz.getSimpleName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwrite the existing data with a new data set.
|
||||
*
|
||||
* @param records List of records to write.
|
||||
* @param writer BufferedWriter stream to write to.
|
||||
* @return If the records were succesfully written.
|
||||
* Get the path for a record class.
|
||||
* @param clazz Class of the record.
|
||||
* @return Path for this record class.
|
||||
*/
|
||||
private <T extends BaseRecord> boolean writeAllFile(
|
||||
Collection<T> records, BufferedWriter writer) {
|
||||
|
||||
try {
|
||||
for (BaseRecord record : records) {
|
||||
try {
|
||||
String data = serializeString(record);
|
||||
writer.write(data);
|
||||
writer.newLine();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
LOG.error("Cannot write record {} to file", record, ex);
|
||||
}
|
||||
}
|
||||
writer.flush();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot commit records to file", e);
|
||||
return false;
|
||||
private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
|
||||
String className = StateStoreUtils.getRecordName(clazz);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(getRootDir());
|
||||
if (sb.charAt(sb.length() - 1) != '/') {
|
||||
sb.append("/");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwrite the existing data with a new data set. Replaces all records in
|
||||
* the data store for this record class. If all records in the data store are
|
||||
* not successfully committed, this function must return false and leave the
|
||||
* data store unchanged.
|
||||
*
|
||||
* @param records List of records to write. All records must be of type
|
||||
* recordClass.
|
||||
* @param recordClass Class of record to replace.
|
||||
* @return true if all operations were successful, false otherwise.
|
||||
* @throws StateStoreUnavailableException
|
||||
*/
|
||||
public <T extends BaseRecord> boolean writeAll(
|
||||
Collection<T> records, Class<T> recordClass)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
lockRecordWrite(recordClass);
|
||||
BufferedWriter writer = null;
|
||||
try {
|
||||
writer = getWriter(recordClass, null);
|
||||
return writeAllFile(records, writer);
|
||||
} catch (Exception e) {
|
||||
LOG.error(
|
||||
"Cannot add records to file for {}", recordClass.getSimpleName(), e);
|
||||
return false;
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Cannot close writer for {}", recordClass.getSimpleName(), e);
|
||||
}
|
||||
}
|
||||
unlockRecordWrite(recordClass);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the data file name.
|
||||
*
|
||||
* @return Data file name.
|
||||
*/
|
||||
protected String getDataFileName() {
|
||||
return DATA_FILE_NAME;
|
||||
sb.append(className);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -332,56 +307,80 @@ public abstract class StateStoreFileBaseImpl
|
|||
List<T> records, boolean allowUpdate, boolean errorIfExists)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
|
||||
if (records.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
|
||||
QueryResult<T> result;
|
||||
try {
|
||||
result = get(clazz);
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
Map<Object, T> writeList = new HashMap<>();
|
||||
long start = monotonicNow();
|
||||
StateStoreMetrics metrics = getMetrics();
|
||||
|
||||
// Write all of the existing records
|
||||
for (T existingRecord : result.getRecords()) {
|
||||
String key = existingRecord.getPrimaryKey();
|
||||
writeList.put(key, existingRecord);
|
||||
}
|
||||
// Check if any record exists
|
||||
Map<String, T> toWrite = new HashMap<>();
|
||||
for (T record : records) {
|
||||
Class<? extends BaseRecord> recordClass = record.getClass();
|
||||
String path = getPathForClass(recordClass);
|
||||
String primaryKey = getPrimaryKey(record);
|
||||
String recordPath = path + "/" + primaryKey;
|
||||
|
||||
// Add inserts and updates, overwrite any existing values
|
||||
for (T updatedRecord : records) {
|
||||
try {
|
||||
updatedRecord.validate();
|
||||
String key = updatedRecord.getPrimaryKey();
|
||||
if (writeList.containsKey(key) && allowUpdate) {
|
||||
// Update
|
||||
writeList.put(key, updatedRecord);
|
||||
if (exists(recordPath)) {
|
||||
if (allowUpdate) {
|
||||
// Update the mod time stamp. Many backends will use their
|
||||
// own timestamp for the mod time.
|
||||
updatedRecord.setDateModified(this.getTime());
|
||||
} else if (!writeList.containsKey(key)) {
|
||||
// Insert
|
||||
// Create/Mod timestamps are already initialized
|
||||
writeList.put(key, updatedRecord);
|
||||
record.setDateModified(this.getTime());
|
||||
toWrite.put(recordPath, record);
|
||||
} else if (errorIfExists) {
|
||||
LOG.error("Attempt to insert record {} that already exists",
|
||||
updatedRecord);
|
||||
recordPath);
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(monotonicNow() - start);
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
LOG.debug("Not updating {}", record);
|
||||
}
|
||||
} catch (IllegalArgumentException ex) {
|
||||
LOG.error("Cannot write invalid record to State Store", ex);
|
||||
return false;
|
||||
} else {
|
||||
toWrite.put(recordPath, record);
|
||||
}
|
||||
}
|
||||
|
||||
// Write all
|
||||
boolean status = writeAll(writeList.values(), clazz);
|
||||
return status;
|
||||
// Write the records
|
||||
boolean success = true;
|
||||
for (Entry<String, T> entry : toWrite.entrySet()) {
|
||||
String recordPath = entry.getKey();
|
||||
String recordPathTemp = recordPath + "." + now() + TMP_MARK;
|
||||
BufferedWriter writer = getWriter(recordPathTemp);
|
||||
try {
|
||||
T record = entry.getValue();
|
||||
String line = serializeString(record);
|
||||
writer.write(line);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot write {}", recordPathTemp, e);
|
||||
success = false;
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot close the writer for {}", recordPathTemp);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Commit
|
||||
if (!rename(recordPathTemp, recordPath)) {
|
||||
LOG.error("Failed committing record into {}", recordPath);
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
long end = monotonicNow();
|
||||
if (metrics != null) {
|
||||
if (success) {
|
||||
metrics.addWrite(end - start);
|
||||
} else {
|
||||
metrics.addFailure(end - start);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -393,6 +392,8 @@ public abstract class StateStoreFileBaseImpl
|
|||
return 0;
|
||||
}
|
||||
|
||||
long start = Time.monotonicNow();
|
||||
StateStoreMetrics metrics = getMetrics();
|
||||
int removed = 0;
|
||||
// Get the current records
|
||||
try {
|
||||
|
@ -400,21 +401,34 @@ public abstract class StateStoreFileBaseImpl
|
|||
final List<T> existingRecords = result.getRecords();
|
||||
// Write all of the existing records except those to be removed
|
||||
final List<T> recordsToRemove = filterMultiple(query, existingRecords);
|
||||
removed = recordsToRemove.size();
|
||||
final List<T> newRecords = new LinkedList<>();
|
||||
for (T record : existingRecords) {
|
||||
if (!recordsToRemove.contains(record)) {
|
||||
newRecords.add(record);
|
||||
boolean success = true;
|
||||
for (T recordToRemove : recordsToRemove) {
|
||||
String path = getPathForClass(clazz);
|
||||
String primaryKey = getPrimaryKey(recordToRemove);
|
||||
String recordToRemovePath = path + "/" + primaryKey;
|
||||
if (remove(recordToRemovePath)) {
|
||||
removed++;
|
||||
} else {
|
||||
LOG.error("Cannot remove record {}", recordToRemovePath);
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
if (!writeAll(newRecords, clazz)) {
|
||||
throw new IOException(
|
||||
"Cannot remove record " + clazz + " query " + query);
|
||||
if (!success) {
|
||||
LOG.error("Cannot remove records {} query {}", clazz, query);
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(monotonicNow() - start);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot remove records {} query {}", clazz, query, e);
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(monotonicNow() - start);
|
||||
}
|
||||
}
|
||||
|
||||
if (removed > 0 && metrics != null) {
|
||||
metrics.addRemove(monotonicNow() - start);
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
|
@ -422,8 +436,27 @@ public abstract class StateStoreFileBaseImpl
|
|||
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
List<T> emptyList = new ArrayList<>();
|
||||
boolean status = writeAll(emptyList, clazz);
|
||||
return status;
|
||||
long start = Time.monotonicNow();
|
||||
StateStoreMetrics metrics = getMetrics();
|
||||
|
||||
boolean success = true;
|
||||
String path = getPathForClass(clazz);
|
||||
List<String> children = getChildren(path);
|
||||
for (String child : children) {
|
||||
String pathRecord = path + "/" + child;
|
||||
if (!remove(pathRecord)) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (metrics != null) {
|
||||
long time = Time.monotonicNow() - start;
|
||||
if (success) {
|
||||
metrics.addRemove(time);
|
||||
} else {
|
||||
metrics.addFailure(time);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,11 +26,10 @@ import java.io.IOException;
|
|||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -49,10 +48,6 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
|
|||
public static final String FEDERATION_STORE_FILE_DIRECTORY =
|
||||
DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
|
||||
|
||||
/** Synchronization. */
|
||||
private static final ReadWriteLock READ_WRITE_LOCK =
|
||||
new ReentrantReadWriteLock();
|
||||
|
||||
/** Root directory for the state store. */
|
||||
private String rootDirectory;
|
||||
|
||||
|
@ -69,6 +64,23 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
|
|||
return dir.mkdirs();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean rename(String src, String dst) {
|
||||
try {
|
||||
Files.move(new File(src), new File(dst));
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot rename {} to {}", src, dst, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean remove(String path) {
|
||||
File file = new File(path);
|
||||
return file.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRootDir() {
|
||||
if (this.rootDirectory == null) {
|
||||
|
@ -76,6 +88,7 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
|
|||
if (dir == null) {
|
||||
File tempDir = Files.createTempDir();
|
||||
dir = tempDir.getAbsolutePath();
|
||||
LOG.warn("The root directory is not available, using {}", dir);
|
||||
}
|
||||
this.rootDirectory = dir;
|
||||
}
|
||||
|
@ -83,79 +96,53 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.writeLock().lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordWrite(
|
||||
Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.readLock().lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.readLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedReader getReader(
|
||||
Class<T> clazz, String sub) {
|
||||
String filename = StateStoreUtils.getRecordName(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
filename += "/" + sub;
|
||||
}
|
||||
filename += "/" + getDataFileName();
|
||||
|
||||
protected <T extends BaseRecord> BufferedReader getReader(String filename) {
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
LOG.debug("Loading file: {}", filename);
|
||||
File file = new File(getRootDir(), filename);
|
||||
File file = new File(filename);
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
InputStreamReader isr =
|
||||
new InputStreamReader(fis, StandardCharsets.UTF_8);
|
||||
BufferedReader reader = new BufferedReader(isr);
|
||||
return reader;
|
||||
reader = new BufferedReader(isr);
|
||||
} catch (Exception ex) {
|
||||
LOG.error(
|
||||
"Cannot open read stream for record {}", clazz.getSimpleName(), ex);
|
||||
return null;
|
||||
LOG.error("Cannot open read stream for record {}", filename, ex);
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedWriter getWriter(
|
||||
Class<T> clazz, String sub) {
|
||||
String filename = StateStoreUtils.getRecordName(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
filename += "/" + sub;
|
||||
}
|
||||
filename += "/" + getDataFileName();
|
||||
|
||||
protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
|
||||
BufferedWriter writer = null;
|
||||
try {
|
||||
File file = new File(getRootDir(), filename);
|
||||
LOG.debug("Writing file: {}", filename);
|
||||
File file = new File(filename);
|
||||
FileOutputStream fos = new FileOutputStream(file, false);
|
||||
OutputStreamWriter osw =
|
||||
new OutputStreamWriter(fos, StandardCharsets.UTF_8);
|
||||
BufferedWriter writer = new BufferedWriter(osw);
|
||||
return writer;
|
||||
} catch (IOException ex) {
|
||||
LOG.error(
|
||||
"Cannot open read stream for record {}", clazz.getSimpleName(), ex);
|
||||
return null;
|
||||
writer = new BufferedWriter(osw);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot open write stream for record {}", filename, e);
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
setInitialized(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> getChildren(String path) {
|
||||
List<String> ret = new LinkedList<>();
|
||||
File dir = new File(path);
|
||||
File[] files = dir.listFiles();
|
||||
if (files != null) {
|
||||
for (File file : files) {
|
||||
String filename = file.getName();
|
||||
ret.add(filename);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
|
@ -24,13 +24,17 @@ import java.io.InputStreamReader;
|
|||
import java.io.OutputStreamWriter;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -72,6 +76,36 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean rename(String src, String dst) {
|
||||
try {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
||||
dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
|
||||
return true;
|
||||
} else {
|
||||
// Replace should be atomic but not available
|
||||
if (fs.exists(new Path(dst))) {
|
||||
fs.delete(new Path(dst), true);
|
||||
}
|
||||
return fs.rename(new Path(src), new Path(dst));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot rename {} to {}", src, dst, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean remove(String path) {
|
||||
try {
|
||||
return fs.delete(new Path(path), true);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot remove {}", path, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRootDir() {
|
||||
if (this.workPath == null) {
|
||||
|
@ -95,84 +129,50 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the folder path for the record class' data.
|
||||
*
|
||||
* @param clazz Data record class.
|
||||
* @return Path of the folder containing the record class' data files.
|
||||
*/
|
||||
private Path getPathForClass(Class<? extends BaseRecord> clazz) {
|
||||
if (clazz == null) {
|
||||
return null;
|
||||
}
|
||||
// TODO extract table name from class: entry.getTableName()
|
||||
String className = StateStoreUtils.getRecordName(clazz);
|
||||
return new Path(workPath, className);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) {
|
||||
// Not required, synced with HDFS leasing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) {
|
||||
// Not required, synced with HDFS leasing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) {
|
||||
// TODO -> wait for lease to be available
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) {
|
||||
// TODO -> ensure lease is closed for the file
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedReader getReader(
|
||||
Class<T> clazz, String sub) {
|
||||
|
||||
Path path = getPathForClass(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
path = Path.mergePaths(path, new Path("/" + sub));
|
||||
}
|
||||
path = Path.mergePaths(path, new Path("/" + getDataFileName()));
|
||||
|
||||
protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
|
||||
BufferedReader reader = null;
|
||||
Path path = new Path(pathName);
|
||||
try {
|
||||
FSDataInputStream fdis = fs.open(path);
|
||||
InputStreamReader isr =
|
||||
new InputStreamReader(fdis, StandardCharsets.UTF_8);
|
||||
BufferedReader reader = new BufferedReader(isr);
|
||||
return reader;
|
||||
reader = new BufferedReader(isr);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cannot open write stream for {} to {}",
|
||||
clazz.getSimpleName(), path);
|
||||
return null;
|
||||
LOG.error("Cannot open read stream for {}", path);
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedWriter getWriter(
|
||||
Class<T> clazz, String sub) {
|
||||
|
||||
Path path = getPathForClass(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
path = Path.mergePaths(path, new Path("/" + sub));
|
||||
}
|
||||
path = Path.mergePaths(path, new Path("/" + getDataFileName()));
|
||||
|
||||
protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
|
||||
BufferedWriter writer = null;
|
||||
Path path = new Path(pathName);
|
||||
try {
|
||||
FSDataOutputStream fdos = fs.create(path, true);
|
||||
OutputStreamWriter osw =
|
||||
new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
|
||||
BufferedWriter writer = new BufferedWriter(osw);
|
||||
return writer;
|
||||
writer = new BufferedWriter(osw);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cannot open write stream for {} to {}",
|
||||
clazz.getSimpleName(), path);
|
||||
return null;
|
||||
LOG.error("Cannot open write stream for {}", path);
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> getChildren(String pathName) {
|
||||
List<String> ret = new LinkedList<>();
|
||||
Path path = new Path(workPath, pathName);
|
||||
try {
|
||||
FileStatus[] files = fs.listStatus(path);
|
||||
for (FileStatus file : files) {
|
||||
Path filePath = file.getPath();
|
||||
String fileName = filePath.getName();
|
||||
ret.add(fileName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot get children for {}", pathName, e);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,12 +117,6 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|||
@Override
|
||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
|
||||
throws IOException {
|
||||
return get(clazz, (String)null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
|
||||
throws IOException {
|
||||
verifyDriverReady();
|
||||
long start = monotonicNow();
|
||||
List<T> ret = new ArrayList<>();
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
|||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -74,6 +75,14 @@ public class TestStateStoreDriverBase {
|
|||
return stateStore.getDriver();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanMetrics() {
|
||||
if (stateStore != null) {
|
||||
StateStoreMetrics metrics = stateStore.getMetrics();
|
||||
metrics.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownCluster() {
|
||||
if (stateStore != null) {
|
||||
|
|
|
@ -61,4 +61,16 @@ public class TestStateStoreFile extends TestStateStoreDriverBase {
|
|||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testRemove(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchErrors()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testFetchErrors(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetrics()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testMetrics(getStateStoreDriver());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl.isOldTempRecord;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests for the State Store file based implementation.
|
||||
*/
|
||||
public class TestStateStoreFileBase {
|
||||
|
||||
@Test
|
||||
public void testTempOld() {
|
||||
assertFalse(isOldTempRecord("test.txt"));
|
||||
assertFalse(isOldTempRecord("testfolder/test.txt"));
|
||||
|
||||
long tnow = Time.now();
|
||||
String tmpFile1 = "test." + tnow + ".tmp";
|
||||
assertFalse(isOldTempRecord(tmpFile1));
|
||||
|
||||
long told = Time.now() - TimeUnit.MINUTES.toMillis(1);
|
||||
String tmpFile2 = "test." + told + ".tmp";
|
||||
assertTrue(isOldTempRecord(tmpFile2));
|
||||
}
|
||||
}
|
|
@ -69,15 +69,15 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testInsert(getStateStoreDriver());
|
||||
public void testUpdate() throws IllegalArgumentException, IOException,
|
||||
SecurityException, ReflectiveOperationException {
|
||||
testPut(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testInsert(getStateStoreDriver());
|
||||
testRemove(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -85,4 +85,10 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
|
|||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testFetchErrors(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetrics()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testMetrics(getStateStoreDriver());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue