HDFS-13979. Review StateStoreFileSystemImpl Class. Contributed by BELUGA BEHR.
This commit is contained in:
parent
4b5b1ac3d1
commit
a05bd1288c
@ -24,7 +24,8 @@
|
|||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.LinkedList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
@ -33,8 +34,8 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -50,13 +51,13 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
|
|||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
|
LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
|
||||||
|
|
||||||
|
|
||||||
/** Configuration keys. */
|
/** Configuration keys. */
|
||||||
public static final String FEDERATION_STORE_FS_PATH =
|
public static final String FEDERATION_STORE_FS_PATH =
|
||||||
RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
|
RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
|
||||||
|
|
||||||
/** File system to back the State Store. */
|
/** File system to back the State Store. */
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
|
|
||||||
/** Working path in the filesystem. */
|
/** Working path in the filesystem. */
|
||||||
private String workPath;
|
private String workPath;
|
||||||
|
|
||||||
@ -141,7 +142,7 @@ protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
|
|||||||
new InputStreamReader(fdis, StandardCharsets.UTF_8);
|
new InputStreamReader(fdis, StandardCharsets.UTF_8);
|
||||||
reader = new BufferedReader(isr);
|
reader = new BufferedReader(isr);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Cannot open read stream for {}", path);
|
LOG.error("Cannot open read stream for {}", path, ex);
|
||||||
}
|
}
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
@ -156,25 +157,26 @@ protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
|
|||||||
new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
|
new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
|
||||||
writer = new BufferedWriter(osw);
|
writer = new BufferedWriter(osw);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Cannot open write stream for {}", path);
|
LOG.error("Cannot open write stream for {}", path, ex);
|
||||||
}
|
}
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<String> getChildren(String pathName) {
|
protected List<String> getChildren(String pathName) {
|
||||||
List<String> ret = new LinkedList<>();
|
|
||||||
Path path = new Path(workPath, pathName);
|
Path path = new Path(workPath, pathName);
|
||||||
try {
|
try {
|
||||||
FileStatus[] files = fs.listStatus(path);
|
FileStatus[] files = fs.listStatus(path);
|
||||||
|
List<String> ret = new ArrayList<>(files.length);
|
||||||
for (FileStatus file : files) {
|
for (FileStatus file : files) {
|
||||||
Path filePath = file.getPath();
|
Path filePath = file.getPath();
|
||||||
String fileName = filePath.getName();
|
String fileName = filePath.getName();
|
||||||
ret.add(fileName);
|
ret.add(fileName);
|
||||||
}
|
}
|
||||||
|
return ret;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Cannot get children for {}", pathName, e);
|
LOG.error("Cannot get children for {}", pathName, e);
|
||||||
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user