From b3d56cb83558c797403cb4538d0c21bf097263ce Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Wed, 14 Mar 2018 11:20:59 +0800 Subject: [PATCH] HDFS-12773. RBF: Improve State Store FS implementation. Contributed by Inigo Goiri. (cherry picked from commit 76be6cbf6c33f866794f27ca2560ca7c7b2fa0e7) --- .../federation/metrics/StateStoreMetrics.java | 5 + .../driver/StateStoreRecordOperations.java | 15 - .../driver/impl/StateStoreFileBaseImpl.java | 465 ++++++++++-------- .../store/driver/impl/StateStoreFileImpl.java | 109 ++-- .../driver/impl/StateStoreFileSystemImpl.java | 128 ++--- .../driver/impl/StateStoreZooKeeperImpl.java | 6 - .../driver/TestStateStoreDriverBase.java | 9 + .../store/driver/TestStateStoreFile.java | 12 + .../store/driver/TestStateStoreFileBase.java | 47 ++ .../driver/TestStateStoreFileSystem.java | 14 +- 10 files changed, 444 insertions(+), 366 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java index 40dcd40a829..09253a26e93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java @@ -140,5 +140,10 @@ public final class StateStoreMetrics implements StateStoreMBean { writes.resetMinMax(); removes.resetMinMax(); failures.resetMinMax(); + + reads.lastStat().reset(); + writes.lastStat().reset(); + removes.lastStat().reset(); + failures.lastStat().reset(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index e76a733cb4c..443d46edc65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -48,21 +48,6 @@ public interface StateStoreRecordOperations { @Idempotent QueryResult get(Class clazz) throws IOException; - /** - * Get all records of the requested record class from the data store. To use - * the default implementations in this class, getAll must return new instances - * of the records on each call. It is recommended to override the default - * implementations for better performance. - * - * @param clazz Class of record to fetch. - * @param sub Sub path. - * @return List of all records that match the clazz and the sub path. - * @throws IOException - */ - @Idempotent - QueryResult get(Class clazz, String sub) - throws IOException; - /** * Get a single record from the store that matches the query. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index a0cd878feed..6638d1c7c2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -18,28 +18,39 @@ package org.apache.hadoop.hdfs.server.federation.store.driver.impl; import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; -import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass; +import static org.apache.hadoop.util.Time.monotonicNow; +import static org.apache.hadoop.util.Time.now; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** - * {@link StateStoreDriver} implementation based on a local file. + * {@link StateStoreDriver} implementation based on files. In this approach, we + * use temporary files for the writes and renaming "atomically" to the final + * value. Instead of writing to the final location, it will go to a temporary + * one and then rename to the final destination. */ public abstract class StateStoreFileBaseImpl extends StateStoreSerializableImpl { @@ -47,59 +58,35 @@ public abstract class StateStoreFileBaseImpl private static final Logger LOG = LoggerFactory.getLogger(StateStoreFileBaseImpl.class); + /** File extension for temporary files. */ + private static final String TMP_MARK = ".tmp"; + /** We remove temporary files older than 10 seconds. */ + private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10); + /** File pattern for temporary records: file.XYZ.tmp. */ + private static final Pattern OLD_TMP_RECORD_PATTERN = + Pattern.compile(".+\\.(\\d+)\\.tmp"); + /** If it is initialized. */ private boolean initialized = false; - /** Name of the file containing the data. */ - private static final String DATA_FILE_NAME = "records.data"; - /** - * Lock reading records. + * Get the reader of a record for the file system. * - * @param clazz Class of the record. - */ - protected abstract void lockRecordRead(Class clazz); - - /** - * Unlock reading records. - * - * @param clazz Class of the record. - */ - protected abstract void unlockRecordRead( - Class clazz); - - /** - * Lock writing records. - * - * @param clazz Class of the record. - */ - protected abstract void lockRecordWrite( - Class clazz); - - /** - * Unlock writing records. - * - * @param clazz Class of the record. - */ - protected abstract void unlockRecordWrite( - Class clazz); - - /** - * Get the reader for the file system. - * - * @param clazz Class of the record. + * @param path Path of the record to read. + * @return Reader for the record. */ protected abstract BufferedReader getReader( - Class clazz, String sub); + String path); /** - * Get the writer for the file system. + * Get the writer of a record for the file system. * - * @param clazz Class of the record. + * @param path Path of the record to write. + * @return Writer for the record. */ protected abstract BufferedWriter getWriter( - Class clazz, String sub); + String path); /** * Check if a path exists. @@ -117,6 +104,31 @@ public abstract class StateStoreFileBaseImpl */ protected abstract boolean mkdir(String path); + /** + * Rename a file. This should be atomic. + * + * @param src Source name. + * @param dst Destination name. + * @return If the rename was successful. + */ + protected abstract boolean rename(String src, String dst); + + /** + * Remove a file. + * + * @param path Path for the file to remove + * @return If the file was removed. + */ + protected abstract boolean remove(String path); + + /** + * Get the children for a path. + * + * @param path Path to check. + * @return List of children. + */ + protected abstract List getChildren(String path); + /** * Get root directory. * @@ -171,15 +183,6 @@ public abstract class StateStoreFileBaseImpl LOG.error("Cannot create data directory {}", dataDirPath); return false; } - String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME; - if (!exists(dataFilePath)) { - // Create empty file - List emtpyList = new ArrayList<>(); - if(!writeAll(emtpyList, recordClass)) { - LOG.error("Cannot create data file {}", dataFilePath); - return false; - } - } } } catch (Exception ex) { LOG.error("Cannot create data directory {}", dataDirPath, ex); @@ -188,138 +191,110 @@ public abstract class StateStoreFileBaseImpl return true; } - /** - * Read all lines from a file and deserialize into the desired record type. - * - * @param reader Open handle for the file. - * @param clazz Record class to create. - * @param includeDates True if dateModified/dateCreated are serialized. - * @return List of records. - * @throws IOException - */ - private List getAllFile( - BufferedReader reader, Class clazz, boolean includeDates) - throws IOException { - - List ret = new ArrayList(); - String line; - while ((line = reader.readLine()) != null) { - if (!line.startsWith("#") && line.length() > 0) { - try { - T record = newRecord(line, clazz, includeDates); - ret.add(record); - } catch (Exception ex) { - LOG.error("Cannot parse line in data source file: {}", line, ex); - } - } - } - return ret; - } - @Override public QueryResult get(Class clazz) throws IOException { - return get(clazz, (String)null); + verifyDriverReady(); + long start = monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + List ret = new ArrayList<>(); + try { + String path = getPathForClass(clazz); + List children = getChildren(path); + for (String child : children) { + String pathRecord = path + "/" + child; + if (child.endsWith(TMP_MARK)) { + LOG.debug("There is a temporary file {} in {}", child, path); + if (isOldTempRecord(child)) { + LOG.warn("Removing {} as it's an old temporary record", child); + remove(pathRecord); + } + } else { + T record = getRecord(pathRecord, clazz); + ret.add(record); + } + } + } catch (Exception e) { + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } + String msg = "Cannot fetch records for " + clazz.getSimpleName(); + LOG.error(msg, e); + throw new IOException(msg, e); + } + + if (metrics != null) { + metrics.addRead(monotonicNow() - start); + } + return new QueryResult(ret, getTime()); } - @Override - public QueryResult get(Class clazz, String sub) - throws IOException { - verifyDriverReady(); - BufferedReader reader = null; - lockRecordRead(clazz); + /** + * Check if a record is temporary and old. + * + * @param pathRecord Path for the record to check. + * @return If the record is temporary and old. + */ + @VisibleForTesting + public static boolean isOldTempRecord(final String pathRecord) { + if (!pathRecord.endsWith(TMP_MARK)) { + return false; + } + // Extract temporary record creation time + Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord); + if (m.find()) { + long time = Long.parseLong(m.group(1)); + return now() - time > OLD_TMP_RECORD_MS; + } + return false; + } + + /** + * Read a record from a file. + * + * @param path Path to the file containing the record. + * @param clazz Class of the record. + * @return Record read from the file. + * @throws IOException If the file cannot be read. + */ + private T getRecord( + final String path, final Class clazz) throws IOException { + BufferedReader reader = getReader(path); try { - reader = getReader(clazz, sub); - List data = getAllFile(reader, clazz, true); - return new QueryResult(data, getTime()); - } catch (Exception ex) { - LOG.error("Cannot fetch records {}", clazz.getSimpleName()); - throw new IOException("Cannot read from data store " + ex.getMessage()); + String line; + while ((line = reader.readLine()) != null) { + if (!line.startsWith("#") && line.length() > 0) { + try { + T record = newRecord(line, clazz, false); + return record; + } catch (Exception ex) { + LOG.error("Cannot parse line {} in file {}", line, path, ex); + } + } + } } finally { if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - LOG.error("Failed closing file", e); - } + reader.close(); } - unlockRecordRead(clazz); } + throw new IOException("Cannot read " + path + " for record " + + clazz.getSimpleName()); } /** - * Overwrite the existing data with a new data set. - * - * @param records List of records to write. - * @param writer BufferedWriter stream to write to. - * @return If the records were succesfully written. + * Get the path for a record class. + * @param clazz Class of the record. + * @return Path for this record class. */ - private boolean writeAllFile( - Collection records, BufferedWriter writer) { - - try { - for (BaseRecord record : records) { - try { - String data = serializeString(record); - writer.write(data); - writer.newLine(); - } catch (IllegalArgumentException ex) { - LOG.error("Cannot write record {} to file", record, ex); - } - } - writer.flush(); - return true; - } catch (IOException e) { - LOG.error("Cannot commit records to file", e); - return false; + private String getPathForClass(final Class clazz) { + String className = StateStoreUtils.getRecordName(clazz); + StringBuilder sb = new StringBuilder(); + sb.append(getRootDir()); + if (sb.charAt(sb.length() - 1) != '/') { + sb.append("/"); } - } - - /** - * Overwrite the existing data with a new data set. Replaces all records in - * the data store for this record class. If all records in the data store are - * not successfully committed, this function must return false and leave the - * data store unchanged. - * - * @param records List of records to write. All records must be of type - * recordClass. - * @param recordClass Class of record to replace. - * @return true if all operations were successful, false otherwise. - * @throws StateStoreUnavailableException - */ - public boolean writeAll( - Collection records, Class recordClass) - throws StateStoreUnavailableException { - verifyDriverReady(); - lockRecordWrite(recordClass); - BufferedWriter writer = null; - try { - writer = getWriter(recordClass, null); - return writeAllFile(records, writer); - } catch (Exception e) { - LOG.error( - "Cannot add records to file for {}", recordClass.getSimpleName(), e); - return false; - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException e) { - LOG.error( - "Cannot close writer for {}", recordClass.getSimpleName(), e); - } - } - unlockRecordWrite(recordClass); - } - } - - /** - * Get the data file name. - * - * @return Data file name. - */ - protected String getDataFileName() { - return DATA_FILE_NAME; + sb.append(className); + return sb.toString(); } @Override @@ -332,56 +307,80 @@ public abstract class StateStoreFileBaseImpl List records, boolean allowUpdate, boolean errorIfExists) throws StateStoreUnavailableException { verifyDriverReady(); - if (records.isEmpty()) { return true; } - @SuppressWarnings("unchecked") - Class clazz = (Class) getRecordClass(records.get(0).getClass()); - QueryResult result; - try { - result = get(clazz); - } catch (IOException e) { - return false; - } - Map writeList = new HashMap<>(); + long start = monotonicNow(); + StateStoreMetrics metrics = getMetrics(); - // Write all of the existing records - for (T existingRecord : result.getRecords()) { - String key = existingRecord.getPrimaryKey(); - writeList.put(key, existingRecord); - } + // Check if any record exists + Map toWrite = new HashMap<>(); + for (T record : records) { + Class recordClass = record.getClass(); + String path = getPathForClass(recordClass); + String primaryKey = getPrimaryKey(record); + String recordPath = path + "/" + primaryKey; - // Add inserts and updates, overwrite any existing values - for (T updatedRecord : records) { - try { - updatedRecord.validate(); - String key = updatedRecord.getPrimaryKey(); - if (writeList.containsKey(key) && allowUpdate) { - // Update - writeList.put(key, updatedRecord); + if (exists(recordPath)) { + if (allowUpdate) { // Update the mod time stamp. Many backends will use their // own timestamp for the mod time. - updatedRecord.setDateModified(this.getTime()); - } else if (!writeList.containsKey(key)) { - // Insert - // Create/Mod timestamps are already initialized - writeList.put(key, updatedRecord); + record.setDateModified(this.getTime()); + toWrite.put(recordPath, record); } else if (errorIfExists) { LOG.error("Attempt to insert record {} that already exists", - updatedRecord); + recordPath); + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } return false; + } else { + LOG.debug("Not updating {}", record); } - } catch (IllegalArgumentException ex) { - LOG.error("Cannot write invalid record to State Store", ex); - return false; + } else { + toWrite.put(recordPath, record); } } - // Write all - boolean status = writeAll(writeList.values(), clazz); - return status; + // Write the records + boolean success = true; + for (Entry entry : toWrite.entrySet()) { + String recordPath = entry.getKey(); + String recordPathTemp = recordPath + "." + now() + TMP_MARK; + BufferedWriter writer = getWriter(recordPathTemp); + try { + T record = entry.getValue(); + String line = serializeString(record); + writer.write(line); + } catch (IOException e) { + LOG.error("Cannot write {}", recordPathTemp, e); + success = false; + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Cannot close the writer for {}", recordPathTemp); + } + } + } + // Commit + if (!rename(recordPathTemp, recordPath)) { + LOG.error("Failed committing record into {}", recordPath); + success = false; + } + } + + long end = monotonicNow(); + if (metrics != null) { + if (success) { + metrics.addWrite(end - start); + } else { + metrics.addFailure(end - start); + } + } + return success; } @Override @@ -393,6 +392,8 @@ public abstract class StateStoreFileBaseImpl return 0; } + long start = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); int removed = 0; // Get the current records try { @@ -400,21 +401,34 @@ public abstract class StateStoreFileBaseImpl final List existingRecords = result.getRecords(); // Write all of the existing records except those to be removed final List recordsToRemove = filterMultiple(query, existingRecords); - removed = recordsToRemove.size(); - final List newRecords = new LinkedList<>(); - for (T record : existingRecords) { - if (!recordsToRemove.contains(record)) { - newRecords.add(record); + boolean success = true; + for (T recordToRemove : recordsToRemove) { + String path = getPathForClass(clazz); + String primaryKey = getPrimaryKey(recordToRemove); + String recordToRemovePath = path + "/" + primaryKey; + if (remove(recordToRemovePath)) { + removed++; + } else { + LOG.error("Cannot remove record {}", recordToRemovePath); + success = false; } } - if (!writeAll(newRecords, clazz)) { - throw new IOException( - "Cannot remove record " + clazz + " query " + query); + if (!success) { + LOG.error("Cannot remove records {} query {}", clazz, query); + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } } } catch (IOException e) { LOG.error("Cannot remove records {} query {}", clazz, query, e); + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } } + if (removed > 0 && metrics != null) { + metrics.addRemove(monotonicNow() - start); + } return removed; } @@ -422,8 +436,27 @@ public abstract class StateStoreFileBaseImpl public boolean removeAll(Class clazz) throws StateStoreUnavailableException { verifyDriverReady(); - List emptyList = new ArrayList<>(); - boolean status = writeAll(emptyList, clazz); - return status; + long start = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + + boolean success = true; + String path = getPathForClass(clazz); + List children = getChildren(path); + for (String child : children) { + String pathRecord = path + "/" + child; + if (!remove(pathRecord)) { + success = false; + } + } + + if (metrics != null) { + long time = Time.monotonicNow() - start; + if (success) { + metrics.addRemove(time); + } else { + metrics.addFailure(time); + } + } + return success; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java index 24e966052fd..c585a232603 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -26,11 +26,10 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.LinkedList; +import java.util.List; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,10 +48,6 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl { public static final String FEDERATION_STORE_FILE_DIRECTORY = DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory"; - /** Synchronization. */ - private static final ReadWriteLock READ_WRITE_LOCK = - new ReentrantReadWriteLock(); - /** Root directory for the state store. */ private String rootDirectory; @@ -69,6 +64,23 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl { return dir.mkdirs(); } + @Override + protected boolean rename(String src, String dst) { + try { + Files.move(new File(src), new File(dst)); + return true; + } catch (IOException e) { + LOG.error("Cannot rename {} to {}", src, dst, e); + return false; + } + } + + @Override + protected boolean remove(String path) { + File file = new File(path); + return file.delete(); + } + @Override protected String getRootDir() { if (this.rootDirectory == null) { @@ -76,6 +88,7 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl { if (dir == null) { File tempDir = Files.createTempDir(); dir = tempDir.getAbsolutePath(); + LOG.warn("The root directory is not available, using {}", dir); } this.rootDirectory = dir; } @@ -83,79 +96,53 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl { } @Override - protected void lockRecordWrite(Class recordClass) { - // TODO - Synchronize via FS - READ_WRITE_LOCK.writeLock().lock(); - } - - @Override - protected void unlockRecordWrite( - Class recordClass) { - // TODO - Synchronize via FS - READ_WRITE_LOCK.writeLock().unlock(); - } - - @Override - protected void lockRecordRead(Class recordClass) { - // TODO - Synchronize via FS - READ_WRITE_LOCK.readLock().lock(); - } - - @Override - protected void unlockRecordRead(Class recordClass) { - // TODO - Synchronize via FS - READ_WRITE_LOCK.readLock().unlock(); - } - - @Override - protected BufferedReader getReader( - Class clazz, String sub) { - String filename = StateStoreUtils.getRecordName(clazz); - if (sub != null && sub.length() > 0) { - filename += "/" + sub; - } - filename += "/" + getDataFileName(); - + protected BufferedReader getReader(String filename) { + BufferedReader reader = null; try { LOG.debug("Loading file: {}", filename); - File file = new File(getRootDir(), filename); + File file = new File(filename); FileInputStream fis = new FileInputStream(file); InputStreamReader isr = new InputStreamReader(fis, StandardCharsets.UTF_8); - BufferedReader reader = new BufferedReader(isr); - return reader; + reader = new BufferedReader(isr); } catch (Exception ex) { - LOG.error( - "Cannot open read stream for record {}", clazz.getSimpleName(), ex); - return null; + LOG.error("Cannot open read stream for record {}", filename, ex); } + return reader; } @Override - protected BufferedWriter getWriter( - Class clazz, String sub) { - String filename = StateStoreUtils.getRecordName(clazz); - if (sub != null && sub.length() > 0) { - filename += "/" + sub; - } - filename += "/" + getDataFileName(); - + protected BufferedWriter getWriter(String filename) { + BufferedWriter writer = null; try { - File file = new File(getRootDir(), filename); + LOG.debug("Writing file: {}", filename); + File file = new File(filename); FileOutputStream fos = new FileOutputStream(file, false); OutputStreamWriter osw = new OutputStreamWriter(fos, StandardCharsets.UTF_8); - BufferedWriter writer = new BufferedWriter(osw); - return writer; - } catch (IOException ex) { - LOG.error( - "Cannot open read stream for record {}", clazz.getSimpleName(), ex); - return null; + writer = new BufferedWriter(osw); + } catch (IOException e) { + LOG.error("Cannot open write stream for record {}", filename, e); } + return writer; } @Override public void close() throws Exception { setInitialized(false); } + + @Override + protected List getChildren(String path) { + List ret = new LinkedList<>(); + File dir = new File(path); + File[] files = dir.listFiles(); + if (files != null) { + for (File file : files) { + String filename = file.getName(); + ret.add(filename); + } + } + return ret; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java index d9ef2805655..8d6c626142c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -24,13 +24,17 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +76,36 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { } } + @Override + protected boolean rename(String src, String dst) { + try { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem)fs; + dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE); + return true; + } else { + // Replace should be atomic but not available + if (fs.exists(new Path(dst))) { + fs.delete(new Path(dst), true); + } + return fs.rename(new Path(src), new Path(dst)); + } + } catch (Exception e) { + LOG.error("Cannot rename {} to {}", src, dst, e); + return false; + } + } + + @Override + protected boolean remove(String path) { + try { + return fs.delete(new Path(path), true); + } catch (Exception e) { + LOG.error("Cannot remove {}", path, e); + return false; + } + } + @Override protected String getRootDir() { if (this.workPath == null) { @@ -95,84 +129,50 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { } } - /** - * Get the folder path for the record class' data. - * - * @param clazz Data record class. - * @return Path of the folder containing the record class' data files. - */ - private Path getPathForClass(Class clazz) { - if (clazz == null) { - return null; - } - // TODO extract table name from class: entry.getTableName() - String className = StateStoreUtils.getRecordName(clazz); - return new Path(workPath, className); - } - @Override - protected void lockRecordRead(Class clazz) { - // Not required, synced with HDFS leasing - } - - @Override - protected void unlockRecordRead(Class clazz) { - // Not required, synced with HDFS leasing - } - - @Override - protected void lockRecordWrite(Class clazz) { - // TODO -> wait for lease to be available - } - - @Override - protected void unlockRecordWrite(Class clazz) { - // TODO -> ensure lease is closed for the file - } - - @Override - protected BufferedReader getReader( - Class clazz, String sub) { - - Path path = getPathForClass(clazz); - if (sub != null && sub.length() > 0) { - path = Path.mergePaths(path, new Path("/" + sub)); - } - path = Path.mergePaths(path, new Path("/" + getDataFileName())); - + protected BufferedReader getReader(String pathName) { + BufferedReader reader = null; + Path path = new Path(pathName); try { FSDataInputStream fdis = fs.open(path); InputStreamReader isr = new InputStreamReader(fdis, StandardCharsets.UTF_8); - BufferedReader reader = new BufferedReader(isr); - return reader; + reader = new BufferedReader(isr); } catch (IOException ex) { - LOG.error("Cannot open write stream for {} to {}", - clazz.getSimpleName(), path); - return null; + LOG.error("Cannot open read stream for {}", path); } + return reader; } @Override - protected BufferedWriter getWriter( - Class clazz, String sub) { - - Path path = getPathForClass(clazz); - if (sub != null && sub.length() > 0) { - path = Path.mergePaths(path, new Path("/" + sub)); - } - path = Path.mergePaths(path, new Path("/" + getDataFileName())); - + protected BufferedWriter getWriter(String pathName) { + BufferedWriter writer = null; + Path path = new Path(pathName); try { FSDataOutputStream fdos = fs.create(path, true); OutputStreamWriter osw = new OutputStreamWriter(fdos, StandardCharsets.UTF_8); - BufferedWriter writer = new BufferedWriter(osw); - return writer; + writer = new BufferedWriter(osw); } catch (IOException ex) { - LOG.error("Cannot open write stream for {} to {}", - clazz.getSimpleName(), path); - return null; + LOG.error("Cannot open write stream for {}", path); } + return writer; + } + + @Override + protected List getChildren(String pathName) { + List ret = new LinkedList<>(); + Path path = new Path(workPath, pathName); + try { + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + Path filePath = file.getPath(); + String fileName = filePath.getName(); + ret.add(fileName); + } + } catch (Exception e) { + LOG.error("Cannot get children for {}", pathName, e); + } + return ret; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 1c3f756d41d..69b9b98e785 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -117,12 +117,6 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { @Override public QueryResult get(Class clazz) throws IOException { - return get(clazz, (String)null); - } - - @Override - public QueryResult get(Class clazz, String sub) - throws IOException { verifyDriverReady(); long start = monotonicNow(); List ret = new ArrayList<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 1091c21e587..fd29e37e81a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.junit.After; import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,14 @@ public class TestStateStoreDriverBase { return stateStore.getDriver(); } + @After + public void cleanMetrics() { + if (stateStore != null) { + StateStoreMetrics metrics = stateStore.getMetrics(); + metrics.reset(); + } + } + @AfterClass public static void tearDownCluster() { if (stateStore != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java index 920e280d8d4..a8a9020744c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java @@ -61,4 +61,16 @@ public class TestStateStoreFile extends TestStateStoreDriverBase { throws IllegalArgumentException, IllegalAccessException, IOException { testRemove(getStateStoreDriver()); } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } + + @Test + public void testMetrics() + throws IllegalArgumentException, IllegalAccessException, IOException { + testMetrics(getStateStoreDriver()); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java new file mode 100644 index 00000000000..9adfe336b63 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl.isOldTempRecord; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.util.Time; +import org.junit.Test; + +/** + * Tests for the State Store file based implementation. + */ +public class TestStateStoreFileBase { + + @Test + public void testTempOld() { + assertFalse(isOldTempRecord("test.txt")); + assertFalse(isOldTempRecord("testfolder/test.txt")); + + long tnow = Time.now(); + String tmpFile1 = "test." + tnow + ".tmp"; + assertFalse(isOldTempRecord(tmpFile1)); + + long told = Time.now() - TimeUnit.MINUTES.toMillis(1); + String tmpFile2 = "test." + told + ".tmp"; + assertTrue(isOldTempRecord(tmpFile2)); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java index da2e51ddd72..8c4b188cc47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -69,15 +69,15 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase { } @Test - public void testUpdate() - throws IllegalArgumentException, IllegalAccessException, IOException { - testInsert(getStateStoreDriver()); + public void testUpdate() throws IllegalArgumentException, IOException, + SecurityException, ReflectiveOperationException { + testPut(getStateStoreDriver()); } @Test public void testDelete() throws IllegalArgumentException, IllegalAccessException, IOException { - testInsert(getStateStoreDriver()); + testRemove(getStateStoreDriver()); } @Test @@ -85,4 +85,10 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase { throws IllegalArgumentException, IllegalAccessException, IOException { testFetchErrors(getStateStoreDriver()); } + + @Test + public void testMetrics() + throws IllegalArgumentException, IllegalAccessException, IOException { + testMetrics(getStateStoreDriver()); + } } \ No newline at end of file