HBASE-26049 Remove DfsBuilderUtility (#3444)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f0324a7516
commit
c74366c498
|
@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.util;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
|
@ -30,7 +28,6 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
|
@ -766,77 +763,6 @@ public final class CommonFSUtils {
|
|||
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
|
||||
}
|
||||
|
||||
private static final class DfsBuilderUtility {
|
||||
private static final Class<?> BUILDER;
|
||||
private static final Method REPLICATE;
|
||||
|
||||
static {
|
||||
String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
|
||||
Class<?> builderClass = null;
|
||||
try {
|
||||
builderClass = Class.forName(builderName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
|
||||
}
|
||||
Method replicateMethod = null;
|
||||
if (builderClass != null) {
|
||||
try {
|
||||
replicateMethod = builderClass.getMethod("replicate");
|
||||
LOG.debug("Using builder API via reflection for DFS file creation.");
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.debug("Could not find replicate method on builder; will not set replicate when" +
|
||||
" creating output stream", e);
|
||||
}
|
||||
}
|
||||
BUILDER = builderClass;
|
||||
REPLICATE = replicateMethod;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to call the replicate method on the given builder.
|
||||
*/
|
||||
static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
|
||||
if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
|
||||
try {
|
||||
REPLICATE.invoke(builder);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
// Should have caught this failure during initialization, so log full trace here
|
||||
LOG.warn("Couldn't use reflection with builder API", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to create a file with the given parameters and
|
||||
* replication enabled.
|
||||
* <p/>
|
||||
* Will not attempt to enable replication when passed an HFileSystem.
|
||||
*/
|
||||
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
|
||||
throws IOException {
|
||||
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
|
||||
DfsBuilderUtility.replicate(builder);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to create a file with the given parameters and
|
||||
* replication enabled.
|
||||
* <p/>
|
||||
* Will not attempt to enable replication when passed an HFileSystem.
|
||||
*/
|
||||
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
|
||||
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
|
||||
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
|
||||
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
|
||||
if (isRecursive) {
|
||||
builder.recursive();
|
||||
}
|
||||
DfsBuilderUtility.replicate(builder);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper exception for those cases where the place where we need to check a stream capability
|
||||
* is not where we have the needed context to explain the impact and mitigation for a lack.
|
||||
|
|
|
@ -144,6 +144,10 @@
|
|||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<artifactId>hadoop-hdfs-client</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
||||
import org.apache.hadoop.fs.FSError;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
|||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -1070,7 +1072,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
long startPos = -1;
|
||||
newLogFile = getLogFilePath(logId);
|
||||
try {
|
||||
newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
|
||||
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(newLogFile).overwrite(false);
|
||||
if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
|
||||
newStream = ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder)
|
||||
.replicate().build();
|
||||
} else {
|
||||
newStream = builder.build();
|
||||
}
|
||||
} catch (FileAlreadyExistsException e) {
|
||||
LOG.error("Log file with id={} already exists", logId, e);
|
||||
return false;
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
|
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
|
|||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -105,8 +107,19 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
@Override
|
||||
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
|
||||
this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
|
||||
blockSize, false);
|
||||
FSDataOutputStreamBuilder<?, ?> builder = fs
|
||||
.createFile(path)
|
||||
.overwrite(overwritable)
|
||||
.bufferSize(bufferSize)
|
||||
.replication(replication)
|
||||
.blockSize(blockSize);
|
||||
if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
|
||||
this.output = ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder)
|
||||
.replicate().build();
|
||||
} else {
|
||||
this.output = builder.build();
|
||||
}
|
||||
|
||||
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
|
||||
if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
|
||||
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
|
||||
|
|
Loading…
Reference in New Issue