HBASE-16968 Refactor FanOutOneBlockAsyncDFSOutput

This commit is contained in:
zhangduo 2016-10-30 20:03:08 +08:00
parent d0be36deb7
commit 45a2594249
13 changed files with 521 additions and 1217 deletions

View File

@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -39,8 +40,8 @@ public interface AsyncFSOutput extends Closeable {
void write(byte[] b);
/**
* Copy the data into the buffer. Note that you need to call
* {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
* Copy the data into the buffer. Note that you need to call {@link #flush(boolean)} to flush the
* buffer manually.
*/
void write(byte[] b, int off, int len);
@ -66,11 +67,10 @@ public interface AsyncFSOutput extends Closeable {
/**
* Flush the buffer out.
* @param attachment will be passed to handler when completed.
* @param handler will set the acked length as result when completed.
* @param sync persistent the data to device
* @return A CompletableFuture that hold the acked length after flushing.
*/
<A> void flush(A attachment, final CompletionHandler<Long, ? super A> handler, boolean sync);
CompletableFuture<Long> flush(boolean sync);
/**
* The close method when error occurred.

View File

@ -25,7 +25,7 @@ import io.netty.channel.EventLoop;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -80,11 +80,7 @@ public final class AsyncFSOutputHelper {
if (eventLoop.inEventLoop()) {
out.write(b, off, len);
} else {
eventLoop.submit(new Runnable() {
public void run() {
out.write(b, off, len);
}
}).syncUninterruptibly();
eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly();
}
}
@ -103,15 +99,14 @@ public final class AsyncFSOutputHelper {
return new DatanodeInfo[0];
}
private <A> void flush0(A attachment, CompletionHandler<Long, ? super A> handler,
boolean sync) {
private void flush0(CompletableFuture<Long> future, boolean sync) {
try {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
} catch (IOException e) {
eventLoop.execute(() -> handler.failed(e, attachment));
eventLoop.execute(() -> future.completeExceptionally(e));
return;
}
try {
@ -120,17 +115,18 @@ public final class AsyncFSOutputHelper {
} else {
fsOut.hflush();
}
final long pos = fsOut.getPos();
eventLoop.execute(() -> handler.completed(pos, attachment));
} catch (final IOException e) {
eventLoop.execute(() -> handler.failed(e, attachment));
long pos = fsOut.getPos();
eventLoop.execute(() -> future.complete(pos));
} catch (IOException e) {
eventLoop.execute(() -> future.completeExceptionally(e));
}
}
@Override
public <A> void flush(A attachment, CompletionHandler<Long, ? super A> handler,
boolean sync) {
flushExecutor.execute(() -> flush0(attachment, handler, sync));
public CompletableFuture<Long> flush(boolean sync) {
CompletableFuture<Long> future = new CompletableFuture<>();
flushExecutor.execute(() -> flush0(future, sync));
return future;
}
@Override

View File

@ -26,8 +26,6 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import com.google.common.base.Supplier;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@ -39,14 +37,11 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
@ -54,13 +49,15 @@ import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
@ -87,8 +84,8 @@ import org.apache.hadoop.util.DataChecksum;
* need one thread here. But be careful, we do some blocking operations in {@link #close()} and
* {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
* {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
* {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them
* outside {@link EventLoop}, there will be an extra context-switch.
* {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop},
* there will be an extra context-switch.
* <p>
* Advantages compare to DFSOutputStream:
* <ol>
@ -125,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final LocatedBlock locatedBlock;
private final CryptoCodec cryptoCodec;
private final Encryptor encryptor;
private final EventLoop eventLoop;
@ -151,8 +148,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet();
} else {
this.unfinishedReplicas = Collections
.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
this.unfinishedReplicas =
Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
this.unfinishedReplicas.addAll(replicas);
}
}
@ -215,13 +212,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// disable further write, and fail all pending ack.
state = State.BROKEN;
Throwable error = errorSupplier.get();
for (Callback c : waitingAckQueue) {
c.promise.tryFailure(error);
}
waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error));
waitingAckQueue.clear();
for (Channel ch : datanodeList) {
ch.close();
}
datanodeList.forEach(ch -> ch.close());
}
@Sharable
@ -234,29 +227,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
throws Exception {
final Status reply = getStatus(ack);
protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
Status reply = getStatus(ack);
if (reply != Status.SUCCESS) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Bad response " + reply + " for block " + locatedBlock.getBlock()
+ " from datanode " + ctx.channel().remoteAddress());
}
});
failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block "
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (PipelineAck.isRestartOOBStatus(reply)) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Restart response " + reply + " for block "
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
}
});
failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
@ -266,25 +246,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
}
});
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
failed(ctx.channel(),
() -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return cause;
}
});
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
failed(ctx.channel(), () -> cause);
}
@Override
@ -292,13 +261,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == READER_IDLE) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
}
});
failed(ctx.channel(),
() -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
} else if (e.state() == WRITER_IDLE) {
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
int len = heartbeat.getSerializedSize();
@ -326,7 +290,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop,
LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop,
List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
this.conf = conf;
this.fsUtils = fsUtils;
@ -337,7 +301,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.clientName = clientName;
this.src = src;
this.locatedBlock = locatedBlock;
this.cryptoCodec = cryptoCodec;
this.encryptor = encryptor;
this.eventLoop = eventLoop;
this.datanodeList = datanodeList;
this.summer = summer;
@ -350,14 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private void writeInt0(int i) {
buf.ensureWritable(4);
if (cryptoCodec == null) {
buf.writeInt(i);
} else {
ByteBuffer inBuffer = ByteBuffer.allocate(4);
inBuffer.putInt(0, i);
cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4));
buf.writerIndex(buf.writerIndex() + 4);
}
buf.writeInt(i);
}
@Override
@ -370,14 +327,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
private void write0(ByteBuffer bb) {
int len = bb.remaining();
buf.ensureWritable(len);
if (cryptoCodec == null) {
buf.writeBytes(bb);
} else {
cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len));
buf.writerIndex(buf.writerIndex() + len);
}
buf.ensureWritable(bb.remaining());
buf.writeBytes(bb);
}
@Override
@ -394,19 +345,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
write(b, 0, b.length);
}
private void write0(byte[] b, final int off, final int len) {
private void write0(byte[] b, int off, int len) {
buf.ensureWritable(len);
if (cryptoCodec == null) {
buf.writeBytes(b, off, len);
} else {
ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len);
cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len));
buf.writerIndex(buf.writerIndex() + len);
}
buf.writeBytes(b, off, len);
}
@Override
public void write(final byte[] b, final int off, final int len) {
public void write(byte[] b, int off, int len) {
if (eventLoop.inEventLoop()) {
write0(b, off, len);
} else {
@ -464,27 +409,40 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
return promise;
}
private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
boolean syncBlock) {
private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
if (state != State.STREAMING) {
handler.failed(new IOException("stream already broken"), attachment);
future.completeExceptionally(new IOException("stream already broken"));
return;
}
int dataLen = buf.readableBytes();
final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
if (encryptor != null) {
ByteBuf encryptBuf = alloc.directBuffer(dataLen);
try {
encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen),
encryptBuf.nioBuffer(0, dataLen));
} catch (IOException e) {
encryptBuf.release();
future.completeExceptionally(e);
return;
}
encryptBuf.writerIndex(dataLen);
buf.release();
buf = encryptBuf;
}
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
// no new data, just return
handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
future.complete(locatedBlock.getBlock().getNumBytes());
return;
}
Callback c = waitingAckQueue.peekLast();
if (c != null && lengthAfterFlush == c.ackedLength) {
// just append it to the tail of waiting ack queue,, do not issue new hflush request.
waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(future -> {
if (future.isSuccess()) {
handler.completed(lengthAfterFlush, attachment);
waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(f -> {
if (f.isSuccess()) {
future.complete(lengthAfterFlush);
} else {
handler.failed(future.cause(), attachment);
future.completeExceptionally(f.cause());
}
}), lengthAfterFlush, Collections.<Channel> emptyList()));
return;
@ -506,11 +464,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
} else {
promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
}
promise.addListener(future -> {
if (future.isSuccess()) {
handler.completed(lengthAfterFlush, attachment);
promise.addListener(f -> {
if (f.isSuccess()) {
future.complete(lengthAfterFlush);
} else {
handler.failed(future.cause(), attachment);
future.completeExceptionally(f.cause());
}
});
int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
@ -525,23 +483,17 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
/**
* Flush the buffer out to datanodes.
* @param attachment will be passed to handler when completed.
* @param handler will set the acked length as result when completed.
* @param syncBlock will call hsync if true, otherwise hflush.
* @return A CompletableFuture that hold the acked length after flushing.
*/
public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
final boolean syncBlock) {
public CompletableFuture<Long> flush(boolean syncBlock) {
CompletableFuture<Long> future = new CompletableFuture<Long>();
if (eventLoop.inEventLoop()) {
flush0(attachment, handler, syncBlock);
flush0(future, syncBlock);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
flush0(attachment, handler, syncBlock);
}
});
eventLoop.execute(() -> flush0(future, syncBlock));
}
return future;
}
private void endBlock(Promise<Void> promise, long size) {
@ -558,13 +510,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
buf.release();
buf = null;
int headerLen = header.getSerializedSize();
ByteBuf headerBuf = alloc.buffer(headerLen);
ByteBuf headerBuf = alloc.directBuffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
waitingAckQueue.add(new Callback(promise, size, datanodeList));
for (Channel ch : datanodeList) {
ch.writeAndFlush(headerBuf.duplicate().retain());
}
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain()));
headerBuf.release();
}
@ -574,10 +524,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
assert !eventLoop.inEventLoop();
for (Channel ch : datanodeList) {
ch.closeFuture().awaitUninterruptibly();
}
endFileLease(client, src, fileId);
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
fsUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
@ -589,26 +537,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public void close() throws IOException {
assert !eventLoop.inEventLoop();
final Promise<Void> promise = eventLoop.newPromise();
eventLoop.execute(new Runnable() {
@Override
public void run() {
endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes());
}
});
promise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
for (Channel ch : datanodeList) {
ch.close();
}
}
}).syncUninterruptibly();
for (Channel ch : datanodeList) {
ch.closeFuture().awaitUninterruptibly();
}
Promise<Void> promise = eventLoop.newPromise();
eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()));
promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly();
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
}
}

View File

@ -21,7 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
@ -74,7 +76,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
@ -96,7 +97,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
@ -143,26 +143,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
// StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and
// the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the
// setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more
// details.
// StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
// we need to use reflection to set it.See createStorageTypeSetter for more details.
private interface StorageTypeSetter {
OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
}
private static final StorageTypeSetter STORAGE_TYPE_SETTER;
// helper class for calling create method on namenode. There is a supportedVersions parameter for
// hadoop 2.6 or after. See createFileCreater for more details.
private interface FileCreater {
HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException;
}
private static final FileCreater FILE_CREATER;
// helper class for calling add block method on namenode. There is a addBlockFlags parameter for
// hadoop 2.8 or later. See createBlockAdder for more details.
private interface BlockAdder {
@ -174,13 +162,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final BlockAdder BLOCK_ADDER;
// helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
// hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
private interface LeaseManager {
void begin(DFSClient client, String src, long inodeId);
void begin(DFSClient client, long inodeId);
void end(DFSClient client, String src, long inodeId);
void end(DFSClient client, long inodeId);
}
private static final LeaseManager LEASE_MANAGER;
@ -197,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// helper class for convert protos.
private interface PBHelper {
ExtendedBlockProto convert(final ExtendedBlock b);
ExtendedBlockProto convert(ExtendedBlock b);
TokenProto convert(Token<?> tok);
}
@ -212,7 +198,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final ChecksumCreater CHECKSUM_CREATER;
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
return new DFSClientAdaptor() {
@ -227,16 +213,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
private static LeaseManager createLeaseManager25() throws NoSuchMethodException {
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
long.class, DFSOutputStream.class);
private static LeaseManager createLeaseManager() throws NoSuchMethodException {
Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
endFileLeaseMethod.setAccessible(true);
return new LeaseManager() {
@Override
public void begin(DFSClient client, String src, long inodeId) {
public void begin(DFSClient client, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, inodeId, null);
} catch (IllegalAccessException | InvocationTargetException e) {
@ -245,7 +231,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
@Override
public void end(DFSClient client, String src, long inodeId) {
public void end(DFSClient client, long inodeId) {
try {
endFileLeaseMethod.invoke(client, inodeId);
} catch (IllegalAccessException | InvocationTargetException e) {
@ -255,66 +241,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
private static LeaseManager createLeaseManager24() throws NoSuchMethodException {
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
String.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
String.class);
endFileLeaseMethod.setAccessible(true);
return new LeaseManager() {
@Override
public void begin(DFSClient client, String src, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, src, null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public void end(DFSClient client, String src, long inodeId) {
try {
endFileLeaseMethod.invoke(client, src);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static LeaseManager createLeaseManager() throws NoSuchMethodException {
try {
return createLeaseManager25();
} catch (NoSuchMethodException e) {
LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e);
}
return createLeaseManager24();
}
private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
throws NoSuchMethodException {
final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
@SuppressWarnings("rawtypes")
Class<? extends Enum> ecnClass;
try {
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
.asSubclass(Enum.class);
} catch (ClassNotFoundException e) {
final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
}
@SuppressWarnings("unchecked")
final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
Status.class);
final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
int.class);
Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
Method combineHeaderMethod =
PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
Method getStatusFromHeaderMethod =
PipelineAck.class.getMethod("getStatusFromHeader", int.class);
return new PipelineAckStatusGetter() {
@Override
@ -339,7 +287,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
throws NoSuchMethodException {
final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
return new PipelineAckStatusGetter() {
@Override
@ -363,30 +311,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return createPipelineAckStatusGetter26();
}
private static StorageTypeSetter createStorageTypeSetter() {
final Method setStorageTypeMethod;
try {
setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
StorageTypeProto.class);
} catch (NoSuchMethodException e) {
LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e);
return new StorageTypeSetter() {
@Override
public Builder set(Builder builder, Enum<?> storageType) {
return builder;
}
};
}
private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
Method setStorageTypeMethod =
OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
builder.put(storageTypeProto.name(), storageTypeProto);
}
final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
return new StorageTypeSetter() {
@Override
public Builder set(Builder builder, Enum<?> storageType) {
public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
Object protoEnum = name2ProtoEnum.get(storageType.name());
try {
setStorageTypeMethod.invoke(builder, protoEnum);
@ -398,62 +334,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
private static FileCreater createFileCreater() throws ClassNotFoundException,
NoSuchMethodException, IllegalAccessException, InvocationTargetException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("create")) {
final Method createMethod = method;
Class<?>[] paramTypes = createMethod.getParameterTypes();
if (paramTypes[paramTypes.length - 1] == long.class) {
return new FileCreater() {
@Override
public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
createParent, replication, blockSize);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
}
}
};
} else {
Class<?> cryptoProtocolVersionClass = Class
.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
final Object supported = supportedMethod.invoke(null);
return new FileCreater() {
@Override
public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
createParent, replication, blockSize, supported);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
}
}
};
}
}
}
throw new NoSuchMethodException("Can not find create method in ClientProtocol");
}
private static BlockAdder createBlockAdder() throws NoSuchMethodException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("addBlock")) {
final Method addBlockMethod = method;
Method addBlockMethod = method;
Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
if (paramTypes[paramTypes.length - 1] == String[].class) {
return new BlockAdder() {
@ -505,8 +389,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
}
final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
return new PBHelper() {
@Override
@ -533,7 +417,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
throws NoSuchMethodException {
for (Method method : confClass.getMethods()) {
if (method.getName().equals("createChecksum")) {
final Method createChecksumMethod = method;
Method createChecksumMethod = method;
return new ChecksumCreater() {
@Override
@ -552,7 +436,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
throws NoSuchMethodException {
final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
createChecksumMethod.setAccessible(true);
return new ChecksumCreater() {
@ -597,14 +481,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
try {
PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
STORAGE_TYPE_SETTER = createStorageTypeSetter();
FILE_CREATER = createFileCreater();
BLOCK_ADDER = createBlockAdder();
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
PB_HELPER = createPBHelper();
CHECKSUM_CREATER = createChecksumCreater();
} catch (Exception e) {
final String msg = "Couldn't properly initialize access to HDFS internals. Please "
String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
@ -612,12 +495,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
}
static void beginFileLease(DFSClient client, String src, long inodeId) {
LEASE_MANAGER.begin(client, src, inodeId);
static void beginFileLease(DFSClient client, long inodeId) {
LEASE_MANAGER.begin(client, inodeId);
}
static void endFileLease(DFSClient client, String src, long inodeId) {
LEASE_MANAGER.end(client, src, inodeId);
static void endFileLease(DFSClient client, long inodeId) {
LEASE_MANAGER.end(client, inodeId);
}
static DataChecksum createChecksum(DFSClient client) {
@ -628,8 +511,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return PIPELINE_ACK_STATUS_GETTER.get(ack);
}
private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo,
final Promise<Channel> promise, final int timeoutMs) {
private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
Promise<Channel> promise, int timeoutMs) {
channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
new ProtobufVarint32FrameDecoder(),
new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
@ -693,18 +576,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
int protoLen = proto.getSerializedSize();
ByteBuf buffer = channel.alloc()
.buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
ByteBuf buffer =
channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
channel.writeAndFlush(buffer);
}
private static void initialize(Configuration conf, final Channel channel,
final DatanodeInfo dnInfo, final Enum<?> storageType,
final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client,
Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
@ -722,14 +605,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd,
long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
@ -737,7 +620,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
.setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
.setClientName(clientName).build();
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
.setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
.setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
@ -745,11 +628,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
final DatanodeInfo dnInfo = datanodeInfos[i];
// Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType
// will cause compilation error for hadoop 2.5 or before.
final Enum<?> storageType = storageTypes[i];
final Promise<Channel> promise = eventLoop.newPromise();
DatanodeInfo dnInfo = datanodeInfos[i];
Enum<?> storageType = storageTypes[i];
Promise<Channel> promise = eventLoop.newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
@ -799,11 +680,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
stat = FILE_CREATER.create(namenode, src,
stat = namenode.create(src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<CreateFlag>(
overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize);
createParent, replication, blockSize, CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
@ -811,7 +692,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
throw new NameNodeException(e);
}
}
beginFileLease(client, src, stat.getFileId());
beginFileLease(client, stat.getFileId());
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
@ -827,10 +708,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// layer should retry itself if needed.
datanodeList.add(future.syncUninterruptibly().getNow());
}
CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs,
client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop,
datanodeList, summer, ALLOC);
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC);
succ = true;
return output;
} finally {
@ -848,7 +729,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
}
endFileLease(client, src, stat.getFileId());
endFileLease(client, stat.getFileId());
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
}
}
@ -859,9 +740,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
* inside {@link EventLoop}.
* @param eventLoop all connections to datanode will use the same event loop.
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f,
final boolean overwrite, final boolean createParent, final short replication,
final long blockSize, final EventLoop eventLoop) throws IOException {
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
@ -890,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
for (int retry = 0;; retry++) {
try {
if (namenode.complete(src, clientName, block, fileId)) {
endFileLease(client, src, fileId);
endFileLease(client, fileId);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);

View File

@ -18,12 +18,11 @@
package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.CodedOutputStream;
@ -47,13 +46,13 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -73,9 +72,17 @@ import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.Decryptor;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hdfs.DFSClient;
@ -83,9 +90,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
@ -110,561 +118,148 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
private static final String NAME_DELIMITER = " ";
@VisibleForTesting
static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
@VisibleForTesting
static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
private interface SaslAdaptor {
SaslPropertiesResolver getSaslPropsResolver(DFSClient client);
TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
TrustedChannelResolver getTrustedChannelResolver(DFSClient client);
SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
AtomicBoolean getFallbackToSimpleAuth(DFSClient client);
DataEncryptionKey createDataEncryptionKey(DFSClient client);
AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
}
private static final SaslAdaptor SASL_ADAPTOR;
private interface CipherOptionHelper {
// helper class for convert protos.
private interface PBHelper {
List<Object> getCipherOptions(Configuration conf) throws IOException;
List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder,
List<Object> cipherOptions);
Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy,
SaslClient saslClient) throws IOException;
Object getCipherSuite(Object cipherOption);
byte[] getInKey(Object cipherOption);
byte[] getInIv(Object cipherOption);
byte[] getOutKey(Object cipherOption);
byte[] getOutIv(Object cipherOption);
List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
}
private static final CipherOptionHelper CIPHER_OPTION_HELPER;
private static final PBHelper PB_HELPER;
private interface TransparentCryptoHelper {
Object getFileEncryptionInfo(HdfsFileStatus stat);
CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
throws IOException;
}
private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
static final class CryptoCodec {
private static final Method CREATE_CODEC;
private static final Method CREATE_ENCRYPTOR;
private static final Method CREATE_DECRYPTOR;
private static final Method INIT_ENCRYPTOR;
private static final Method INIT_DECRYPTOR;
private static final Method ENCRYPT;
private static final Method DECRYPT;
static {
Class<?> cryptoCodecClass = null;
try {
cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
} catch (ClassNotFoundException e) {
LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e);
}
if (cryptoCodecClass != null) {
Method getInstanceMethod = null;
for (Method method : cryptoCodecClass.getMethods()) {
if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) {
getInstanceMethod = method;
break;
}
}
try {
if (getInstanceMethod == null) {
throw new NoSuchMethodException(
"Can not find suitable getInstance method in CryptoCodec");
}
CREATE_CODEC = getInstanceMethod;
CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor");
INIT_ENCRYPTOR = encryptorClass.getMethod("init", byte[].class, byte[].class);
ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class);
Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class);
DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
} catch (Exception e) {
final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
}
} else {
CREATE_CODEC = null;
CREATE_ENCRYPTOR = null;
CREATE_DECRYPTOR = null;
INIT_ENCRYPTOR = null;
INIT_DECRYPTOR = null;
ENCRYPT = null;
DECRYPT = null;
}
}
private final Object encryptor;
private final Object decryptor;
public CryptoCodec(Configuration conf, Object cipherOption) {
try {
Object codec = CREATE_CODEC.invoke(null, conf,
CIPHER_OPTION_HELPER.getCipherSuite(cipherOption));
encryptor = CREATE_ENCRYPTOR.invoke(codec);
byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption);
byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption);
INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length));
decryptor = CREATE_DECRYPTOR.invoke(codec);
byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption);
byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption);
INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) {
try {
Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite);
encryptor = CREATE_ENCRYPTOR.invoke(codec);
INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv);
decryptor = null;
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
try {
ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
try {
DECRYPT.invoke(decryptor, inBuffer, outBuffer);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass)
private static SaslAdaptor createSaslAdaptor()
throws NoSuchFieldException, NoSuchMethodException {
final Field saslPropsResolverField = saslDataTransferClientClass
.getDeclaredField("saslPropsResolver");
Field saslPropsResolverField =
SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
saslPropsResolverField.setAccessible(true);
final Field trustedChannelResolverField = saslDataTransferClientClass
.getDeclaredField("trustedChannelResolver");
Field trustedChannelResolverField =
SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
trustedChannelResolverField.setAccessible(true);
final Field fallbackToSimpleAuthField = saslDataTransferClientClass
.getDeclaredField("fallbackToSimpleAuth");
Field fallbackToSimpleAuthField =
SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
fallbackToSimpleAuthField.setAccessible(true);
final Method getSaslDataTransferClientMethod = DFSClient.class
.getMethod("getSaslDataTransferClient");
final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey");
return new SaslAdaptor() {
@Override
public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
try {
return (TrustedChannelResolver) trustedChannelResolverField
.get(getSaslDataTransferClientMethod.invoke(client));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
try {
return (SaslPropertiesResolver) saslPropsResolverField
.get(getSaslDataTransferClientMethod.invoke(client));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
try {
return (AtomicBoolean) fallbackToSimpleAuthField
.get(getSaslDataTransferClientMethod.invoke(client));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
try {
return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static SaslAdaptor createSaslAdaptor25()
throws NoSuchFieldException, NoSuchMethodException {
final Field trustedChannelResolverField = DFSClient.class
.getDeclaredField("trustedChannelResolver");
trustedChannelResolverField.setAccessible(true);
final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
return new SaslAdaptor() {
@Override
public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
try {
return (TrustedChannelResolver) trustedChannelResolverField.get(client);
return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
return null;
}
@Override
public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
return null;
}
@Override
public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
try {
return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
try {
return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
};
}
private static SaslAdaptor createSaslAdaptor()
throws NoSuchFieldException, NoSuchMethodException {
private static PBHelper createPBHelper() throws NoSuchMethodException {
Class<?> helperClass;
try {
return createSaslAdaptor27(
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"));
} catch (ClassNotFoundException e) {
LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e);
}
return createSaslAdaptor25();
}
private static CipherOptionHelper createCipherHelper25() {
return new CipherOptionHelper() {
@Override
public byte[] getOutKey(Object cipherOption) {
throw new UnsupportedOperationException();
}
@Override
public byte[] getOutIv(Object cipherOption) {
throw new UnsupportedOperationException();
}
@Override
public byte[] getInKey(Object cipherOption) {
throw new UnsupportedOperationException();
}
@Override
public byte[] getInIv(Object cipherOption) {
throw new UnsupportedOperationException();
}
@Override
public Object getCipherSuite(Object cipherOption) {
throw new UnsupportedOperationException();
}
@Override
public List<Object> getCipherOptions(Configuration conf) {
return null;
}
@Override
public Object getCipherOption(DataTransferEncryptorMessageProto proto,
boolean isNegotiatedQopPrivacy, SaslClient saslClient) {
return null;
}
@Override
public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
throw new UnsupportedOperationException();
}
};
}
private static CipherOptionHelper createCipherHelper27(Class<?> cipherOptionClass)
throws ClassNotFoundException, NoSuchMethodException {
@SuppressWarnings("rawtypes")
Class<? extends Enum> cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite")
.asSubclass(Enum.class);
@SuppressWarnings("unchecked")
final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING");
final Constructor<?> cipherOptionConstructor = cipherOptionClass
.getConstructor(cipherSuiteClass);
final Constructor<?> cipherOptionWithKeyAndIvConstructor = cipherOptionClass
.getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class);
final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite");
final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
final Method getInIvMethod = cipherOptionClass.getMethod("getInIv");
final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
Class<?> pbHelperClass;
try {
pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
} catch (ClassNotFoundException e) {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
}
final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions",
List.class);
final Method convertCipherOptionProtosMethod = pbHelperClass
.getMethod("convertCipherOptionProtos", List.class);
final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
.getMethod("addAllCipherOption", Iterable.class);
final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class
.getMethod("getCipherOptionList");
return new CipherOptionHelper() {
Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
Method convertCipherOptionProtosMethod =
helperClass.getMethod("convertCipherOptionProtos", List.class);
return new PBHelper() {
@SuppressWarnings("unchecked")
@Override
public byte[] getOutKey(Object cipherOption) {
public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
try {
return (byte[]) getOutKeyMethod.invoke(cipherOption);
return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] getOutIv(Object cipherOption) {
try {
return (byte[]) getOutIvMethod.invoke(cipherOption);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] getInKey(Object cipherOption) {
try {
return (byte[]) getInKeyMethod.invoke(cipherOption);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] getInIv(Object cipherOption) {
try {
return (byte[]) getInIvMethod.invoke(cipherOption);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public Object getCipherSuite(Object cipherOption) {
try {
return getCipherSuiteMethod.invoke(cipherOption);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public List<Object> getCipherOptions(Configuration conf) throws IOException {
// Negotiate cipher suites if configured. Currently, the only supported
// cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
// values for future expansion.
String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
if (cipherSuites == null || cipherSuites.isEmpty()) {
return null;
}
if (!cipherSuites.equals(AES_CTR_NOPADDING)) {
throw new IOException(String.format("Invalid cipher suite, %s=%s",
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
}
Object option;
try {
option = cipherOptionConstructor.newInstance(aesCipherSuite);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
List<Object> cipherOptions = Lists.newArrayListWithCapacity(1);
cipherOptions.add(option);
return cipherOptions;
}
private Object unwrap(Object option, SaslClient saslClient) throws IOException {
byte[] inKey = getInKey(option);
if (inKey != null) {
inKey = saslClient.unwrap(inKey, 0, inKey.length);
}
byte[] outKey = getOutKey(option);
if (outKey != null) {
outKey = saslClient.unwrap(outKey, 0, outKey.length);
}
try {
return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey,
getInIv(option), outKey, getOutIv(option));
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public Object getCipherOption(DataTransferEncryptorMessageProto proto,
boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
List<Object> cipherOptions;
public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
try {
cipherOptions = (List<Object>) convertCipherOptionProtosMethod.invoke(null,
getCipherOptionListMethod.invoke(proto));
return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
if (cipherOptions == null || cipherOptions.isEmpty()) {
return null;
}
Object cipherOption = cipherOptions.get(0);
return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
}
@Override
public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
try {
addAllCipherOptionMethod.invoke(builder,
convertCipherOptionsMethod.invoke(null, cipherOptions));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static CipherOptionHelper createCipherHelper()
throws ClassNotFoundException, NoSuchMethodException {
Class<?> cipherOptionClass;
try {
cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
} catch (ClassNotFoundException e) {
LOG.debug("No CipherOption class found, should be hadoop 2.5-", e);
return createCipherHelper25();
}
return createCipherHelper27(cipherOptionClass);
}
private static TransparentCryptoHelper createTransparentCryptoHelper25() {
return new TransparentCryptoHelper() {
@Override
public Object getFileEncryptionInfo(HdfsFileStatus stat) {
return null;
}
@Override
public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) {
throw new UnsupportedOperationException();
}
};
}
private static TransparentCryptoHelper createTransparentCryptoHelper27(Class<?> feInfoClass)
throws NoSuchMethodException, ClassNotFoundException {
final Method getFileEncryptionInfoMethod = HdfsFileStatus.class
.getMethod("getFileEncryptionInfo");
final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
.getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass);
decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
final Method getCipherSuiteMethod = feInfoClass.getMethod("getCipherSuite");
Class<?> keyVersionClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion");
final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial");
final Method getIVMethod = feInfoClass.getMethod("getIV");
return new TransparentCryptoHelper() {
@Override
public Object getFileEncryptionInfo(HdfsFileStatus stat) {
try {
return getFileEncryptionInfoMethod.invoke(stat);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
throws IOException {
try {
Object decrypted = decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo),
(byte[]) getMaterialMethod.invoke(decrypted), (byte[]) getIVMethod.invoke(feInfo));
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e.getTargetException());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
};
}
private static TransparentCryptoHelper createTransparentCryptoHelper()
throws NoSuchMethodException, ClassNotFoundException {
Class<?> feInfoClass;
try {
feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
} catch (ClassNotFoundException e) {
LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e);
return createTransparentCryptoHelper25();
}
return createTransparentCryptoHelper27(feInfoClass);
throws NoSuchMethodException {
Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
.getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
return new TransparentCryptoHelper() {
@Override
public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
DFSClient client) throws IOException {
try {
KeyVersion decryptedKey =
(KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
Encryptor encryptor = cryptoCodec.createEncryptor();
encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
return encryptor;
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e.getTargetException());
} catch (GeneralSecurityException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
};
}
static {
try {
SASL_ADAPTOR = createSaslAdaptor();
CIPHER_OPTION_HELPER = createCipherHelper();
PB_HELPER = createPBHelper();
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
} catch (Exception e) {
final String msg = "Couldn't properly initialize access to HDFS internals. Please "
String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
@ -748,16 +343,31 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
sendSaslMessage(ctx, payload, null);
}
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto
.newBuilder();
private List<CipherOption> getCipherOptions() throws IOException {
// Negotiate cipher suites if configured. Currently, the only supported
// cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
// values for future expansion.
String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
if (StringUtils.isBlank(cipherSuites)) {
return null;
}
if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
throw new IOException(String.format("Invalid cipher suite, %s=%s",
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
}
return Arrays.asList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
}
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
List<CipherOption> options) throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
if (payload != null) {
builder.setPayload(ByteStringer.wrap(payload));
}
if (options != null) {
CIPHER_OPTION_HELPER.addCipherOptions(builder, options);
builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
}
DataTransferEncryptorMessageProto proto = builder.build();
int size = proto.getSerializedSize();
@ -798,8 +408,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
}
private boolean requestedQopContainsPrivacy() {
Set<String> requestedQop = ImmutableSet
.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
Set<String> requestedQop =
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
return requestedQop.contains("auth-conf");
}
@ -807,8 +417,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
if (!saslClient.isComplete()) {
throw new IOException("Failed to complete SASL handshake");
}
Set<String> requestedQop = ImmutableSet
.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
Set<String> requestedQop =
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
String negotiatedQop = getNegotiatedQop();
LOG.debug(
"Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
@ -825,48 +435,73 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
return qop != null && !"auth".equalsIgnoreCase(qop);
}
private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
byte[] inKey = option.getInKey();
if (inKey != null) {
inKey = saslClient.unwrap(inKey, 0, inKey.length);
}
byte[] outKey = option.getOutKey();
if (outKey != null) {
outKey = saslClient.unwrap(outKey, 0, outKey.length);
}
return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
option.getOutIv());
}
private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
List<CipherOption> cipherOptions =
PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
if (cipherOptions == null || cipherOptions.isEmpty()) {
return null;
}
CipherOption cipherOption = cipherOptions.get(0);
return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof DataTransferEncryptorMessageProto) {
DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
check(proto);
byte[] challenge = proto.getPayload().toByteArray();
byte[] response = saslClient.evaluateChallenge(challenge);
switch (step) {
case 1: {
List<Object> cipherOptions = null;
if (requestedQopContainsPrivacy()) {
cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
}
sendSaslMessage(ctx, response, cipherOptions);
ctx.flush();
step++;
break;
}
case 2: {
assert response == null;
checkSaslComplete();
Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto,
isNegotiatedQopPrivacy(), saslClient);
ChannelPipeline p = ctx.pipeline();
while (p.first() != null) {
p.removeFirst();
}
if (cipherOption != null) {
CryptoCodec codec = new CryptoCodec(conf, cipherOption);
p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
} else {
if (useWrap()) {
p.addLast(new SaslWrapHandler(saslClient),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
new SaslUnwrapHandler(saslClient));
case 1: {
List<CipherOption> cipherOptions = null;
if (requestedQopContainsPrivacy()) {
cipherOptions = getCipherOptions();
}
sendSaslMessage(ctx, response, cipherOptions);
ctx.flush();
step++;
break;
}
promise.trySuccess(null);
break;
}
default:
throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
case 2: {
assert response == null;
checkSaslComplete();
CipherOption cipherOption =
getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
ChannelPipeline p = ctx.pipeline();
while (p.first() != null) {
p.removeFirst();
}
if (cipherOption != null) {
CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
} else {
if (useWrap()) {
p.addLast(new SaslWrapHandler(saslClient),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
new SaslUnwrapHandler(saslClient));
}
}
promise.trySuccess(null);
break;
}
default:
throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
}
} else {
ctx.fireChannelRead(msg);
@ -961,10 +596,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final CryptoCodec codec;
private final Decryptor decryptor;
public DecryptHandler(CryptoCodec codec) {
this.codec = codec;
public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
throws GeneralSecurityException, IOException {
this.decryptor = codec.createDecryptor();
this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
}
@Override
@ -981,7 +618,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
ByteBuffer inBuffer = inBuf.nioBuffer();
ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
codec.decrypt(inBuffer, outBuffer);
decryptor.decrypt(inBuffer, outBuffer);
outBuf.writerIndex(inBuf.readableBytes());
if (release) {
inBuf.release();
@ -992,11 +629,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
private final CryptoCodec codec;
private final Encryptor encryptor;
public EncryptHandler(CryptoCodec codec) {
super(false);
this.codec = codec;
public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
throws GeneralSecurityException, IOException {
this.encryptor = codec.createEncryptor();
this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
}
@Override
@ -1022,7 +660,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
}
ByteBuffer inBuffer = inBuf.nioBuffer();
ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
codec.encrypt(inBuffer, outBuffer);
encryptor.encrypt(inBuffer, outBuffer);
out.writerIndex(inBuf.readableBytes());
if (release) {
inBuf.release();
@ -1070,22 +708,18 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
Promise<Void> saslPromise) {
SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client);
TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client);
AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client);
Promise<Void> saslPromise) throws IOException {
SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
TrustedChannelResolver trustedChannelResolver =
SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
saslPromise.trySuccess(null);
return;
}
DataEncryptionKey encryptionKey;
try {
encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client);
} catch (Exception e) {
saslPromise.tryFailure(e);
return;
}
DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
if (encryptionKey != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@ -1131,12 +765,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
}
}
static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client)
static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
throws IOException {
Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat);
FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
if (feInfo == null) {
return null;
}
return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client);
return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
}
}

View File

@ -29,7 +29,6 @@ import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.CompletionHandler;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
@ -206,9 +205,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final long logRollerExitedCheckIntervalMs;
private final ExecutorService closeExecutor = Executors
.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Close-WAL-Writer-%d").build());
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
private volatile AsyncFSOutput fsOut;
@ -216,8 +214,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final Deque<FSWALEntry> unackedEntries = new ArrayDeque<FSWALEntry>();
private final PriorityQueue<SyncFuture> syncFutures = new PriorityQueue<SyncFuture>(11,
SEQ_COMPARATOR);
private final PriorityQueue<SyncFuture> syncFutures =
new PriorityQueue<SyncFuture>(11, SEQ_COMPARATOR);
private Promise<Void> rollPromise;
@ -285,8 +283,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop) throws FailedLogCloseException,
IOException {
String prefix, String suffix, EventLoop eventLoop)
throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop;
int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200);
@ -294,9 +292,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
createMaxRetries =
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
logRollerExitedCheckIntervalMs =
conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
rollWriter();
}
@ -310,82 +307,85 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private void syncFailed(Throwable error) {
LOG.warn("sync failed", error);
// Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
// When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
// is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
// directly if it is already in the EventLoop thread. And in the listener method, it will
// call us. So here we know that all failed flush request will call us continuously, and
// before the last one finish, no other task can be executed in EventLoop. So here we are
// safe to use writerBroken as a guard.
// Do not forget to revisit this if we change the implementation of
// FanOutOneBlockAsyncDFSOutput!
synchronized (waitingConsumePayloads) {
if (writerBroken) {
return;
}
// schedule a periodical task to check if log roller is exited. Otherwise the the sync
// request maybe blocked forever since we are still waiting for a new writer to write the
// pending data and sync it...
logRollerExitedChecker = new LogRollerExitedChecker();
// we are currently in the EventLoop thread, so it is safe to set the future after
// schedule it since the task can not be executed before we release the thread.
logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
writerBroken = true;
}
for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
waitingAppendEntries.addFirst(iter.next());
}
highestUnsyncedTxid = highestSyncedTxid.get();
if (rollPromise != null) {
rollPromise.trySuccess(null);
rollPromise = null;
return;
}
// request a roll.
if (!rollWriterLock.tryLock()) {
return;
}
try {
requestLogRoll();
} finally {
rollWriterLock.unlock();
}
}
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid);
int syncCount = finishSync(true);
for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
if (iter.next().getTxid() <= processedTxid) {
iter.remove();
} else {
break;
}
}
postSync(System.nanoTime() - startTimeNs, syncCount);
tryFinishRoll();
if (!rollWriterLock.tryLock()) {
return;
}
try {
if (writer.getLength() >= logrollsize) {
requestLogRoll();
}
} finally {
rollWriterLock.unlock();
}
}
private void sync(final AsyncWriter writer, final long processedTxid) {
fileLengthAtLastSync = writer.getLength();
final long startTimeNs = System.nanoTime();
writer.sync(new CompletionHandler<Long, Void>() {
@Override
public void completed(Long result, Void attachment) {
highestSyncedTxid.set(processedTxid);
int syncCount = finishSync(true);
for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
if (iter.next().getTxid() <= processedTxid) {
iter.remove();
} else {
break;
}
}
postSync(System.nanoTime() - startTimeNs, syncCount);
tryFinishRoll();
if (!rollWriterLock.tryLock()) {
return;
}
try {
if (writer.getLength() >= logrollsize) {
requestLogRoll();
}
} finally {
rollWriterLock.unlock();
}
writer.sync().whenComplete((result, error) -> {
if (error != null) {
syncFailed(error);
} else {
syncCompleted(writer, processedTxid, startTimeNs);
}
@Override
public void failed(Throwable exc, Void attachment) {
LOG.warn("sync failed", exc);
// Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
// When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
// is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
// directly if it is already in the EventLoop thread. And in the listener method, it will
// call us. So here we know that all failed flush request will call us continuously, and
// before the last one finish, no other task can be executed in EventLoop. So here we are
// safe to use writerBroken as a guard.
// Do not forget to revisit this if we change the implementation of
// FanOutOneBlockAsyncDFSOutput!
synchronized (waitingConsumePayloads) {
if (writerBroken) {
return;
}
// schedule a periodical task to check if log roller is exited. Otherwise the the sync
// request maybe blocked forever since we are still waiting for a new writer to write the
// pending data and sync it...
logRollerExitedChecker = new LogRollerExitedChecker();
// we are currently in the EventLoop thread, so it is safe to set the future after
// schedule it since the task can not be executed before we release the thread.
logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
writerBroken = true;
}
for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
waitingAppendEntries.addFirst(iter.next());
}
highestUnsyncedTxid = highestSyncedTxid.get();
if (rollPromise != null) {
rollPromise.trySuccess(null);
rollPromise = null;
return;
}
// request a roll.
if (!rollWriterLock.tryLock()) {
return;
}
try {
requestLogRoll();
} finally {
rollWriterLock.unlock();
}
}
}, null);
});
}
private void addTimeAnnotation(SyncFuture future, String annotation) {
@ -457,13 +457,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private static final Comparator<SyncFuture> SEQ_COMPARATOR = new Comparator<SyncFuture>() {
@Override
public int compare(SyncFuture o1, SyncFuture o2) {
int c = Long.compare(o1.getTxid(), o2.getTxid());
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
}
private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
int c = Long.compare(o1.getTxid(), o2.getTxid());
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
};
private final Runnable consumer = new Runnable() {
@ -690,15 +686,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long oldFileLen;
if (oldWriter != null) {
oldFileLen = oldWriter.getLength();
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
oldWriter.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
closeExecutor.execute(() -> {
try {
oldWriter.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
});
} else {

View File

@ -25,7 +25,9 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,50 +52,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
private static final class BlockingCompletionHandler implements CompletionHandler<Long, Void> {
private long size;
private Throwable error;
private boolean finished;
@Override
public void completed(Long result, Void attachment) {
synchronized (this) {
size = result.longValue();
finished = true;
notifyAll();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
synchronized (this) {
error = exc;
finished = true;
notifyAll();
}
}
public long get() throws IOException {
synchronized (this) {
while (!finished) {
try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new RuntimeException(error);
}
return size;
}
}
}
private final EventLoop eventLoop;
private AsyncFSOutput output;
@ -166,8 +124,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
@Override
public <A> void sync(CompletionHandler<Long, A> handler, A attachment) {
output.flush(attachment, handler, false);
public CompletableFuture<Long> sync() {
return output.flush(false);
}
@Override
@ -197,10 +155,24 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<Long>();
eventLoop.execute(() -> action.accept(future));
try {
return future.get().longValue();
} catch (InterruptedException e) {
InterruptedIOException ioe = new InterruptedIOException();
ioe.initCause(e);
throw ioe;
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e.getCause());
}
}
@Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
final BlockingCompletionHandler handler = new BlockingCompletionHandler();
eventLoop.execute(() -> {
return write(future -> {
output.write(magic);
try {
header.writeDelimitedTo(asyncOutputWrapper);
@ -208,16 +180,19 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
// should not happen
throw new AssertionError(e);
}
output.flush(null, handler, false);
output.flush(false).whenComplete((len, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
future.complete(len);
}
});
});
return handler.get();
}
@Override
protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic)
throws IOException {
final BlockingCompletionHandler handler = new BlockingCompletionHandler();
eventLoop.execute(() -> {
protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
return write(future -> {
try {
trailer.writeTo(asyncOutputWrapper);
} catch (IOException e) {
@ -226,9 +201,14 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
output.writeInt(trailer.getSerializedSize());
output.write(magic);
output.flush(null, handler, false);
output.flush(false).whenComplete((len, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
future.complete(len);
}
});
});
return handler.get();
}
@Override

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CompletionHandler;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -85,7 +85,7 @@ public interface WALProvider {
}
interface AsyncWriter extends Closeable {
<A> void sync(CompletionHandler<Long, A> handler, A attachment);
CompletableFuture<Long> sync();
void append(WAL.Entry entry);
long getLength();
}

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
public final class FanOutOneBlockAsyncDFSOutputFlushHandler
implements CompletionHandler<Long, Void> {
private long size;
private Throwable error;
private boolean finished;
@Override
public synchronized void completed(Long result, Void attachment) {
size = result.longValue();
finished = true;
notifyAll();
}
@Override
public synchronized void failed(Throwable exc, Void attachment) {
error = exc;
finished = true;
notifyAll();
}
public synchronized long get() throws InterruptedException, ExecutionException {
while (!finished) {
wait();
}
if (error != null) {
throw new ExecutionException(error);
}
return size;
}
public void reset() {
size = 0L;
error = null;
finished = false;
}
}

View File

@ -107,17 +107,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
throws IOException, InterruptedException, ExecutionException {
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() {
@Override
public void run() {
out.write(b, 0, b.length);
out.flush(null, handler, false);
}
});
assertEquals(b.length, handler.get());
out.write(b, 0, b.length);
assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, dfs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
@ -144,31 +135,14 @@ public class TestFanOutOneBlockAsyncDFSOutput {
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() {
@Override
public void run() {
out.write(b, 0, b.length);
out.flush(null, handler, false);
}
});
handler.get();
out.write(b, 0, b.length);
out.flush(false).get();
// restart one datanode which causes one connection broken
TEST_UTIL.getDFSCluster().restartDataNode(0);
try {
handler.reset();
eventLoop.execute(new Runnable() {
@Override
public void run() {
out.write(b, 0, b.length);
out.flush(null, handler, false);
}
});
out.write(b, 0, b.length);
try {
handler.get();
out.flush(false).get();
fail("flush should fail");
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail
@ -254,17 +228,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b);
FanOutOneBlockAsyncDFSOutputFlushHandler handler =
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() {
@Override
public void run() {
out.write(b);
out.flush(null, handler, false);
}
});
assertEquals(b.length, handler.get());
out.write(b);
out.flush(false);
assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];

View File

@ -60,10 +60,8 @@ public class TestLocalAsyncOutput {
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
out.write(b);
out.flush(null, handler, true);
assertEquals(b.length, handler.get());
assertEquals(b.length, out.flush(true).get().longValue());
out.close();
assertEquals(b.length, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];

View File

@ -17,11 +17,19 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.AES_CTR_NOPADDING;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@ -38,6 +46,9 @@ import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
@ -45,7 +56,6 @@ import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.minikdc.MiniKdc;
@ -113,7 +123,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
List<Object[]> params = new ArrayList<>();
for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) {
for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
for (boolean useTransparentEncryption : Arrays.asList(false, true)) {
params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite,
useTransparentEncryption });
@ -125,17 +135,15 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
}
private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
// change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1
conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
HTTP_PRINCIPAL + "@" + KDC.getRealm());
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath());
keystoresDir.mkdirs();
@ -146,32 +154,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
}
private static void setUpKeyProvider(Configuration conf) throws Exception {
Class<?> keyProviderFactoryClass;
try {
keyProviderFactoryClass = Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory");
} catch (ClassNotFoundException e) {
// should be hadoop 2.5-, give up
TEST_TRANSPARENT_ENCRYPTION = false;
return;
}
URI keyProviderUri =
new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
Method getKeyProviderMethod =
keyProviderFactoryClass.getMethod("get", URI.class, Configuration.class);
Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, conf);
Class<?> keyProviderClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider");
Class<?> keyProviderOptionsClass =
Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options");
Method createKeyMethod =
keyProviderClass.getMethod("createKey", String.class, keyProviderOptionsClass);
Object options = keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf);
createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options);
Method flushMethod = keyProviderClass.getMethod("flush");
flushMethod.invoke(keyProvider);
Method closeMethod = keyProviderClass.getMethod("close");
closeMethod.invoke(keyProvider);
KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
keyProvider.flush();
keyProvider.close();
}
@BeforeClass
@ -231,7 +220,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
}
TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniDFSCluster(1);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
FS.mkdirs(testDirOnTestFs);

View File

@ -17,27 +17,26 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputFlushHandler;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import com.google.common.base.Throwables;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> {
@ -68,14 +67,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
@Override
protected void sync(AsyncWriter writer) throws IOException {
FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
writer.sync(handler, null);
try {
handler.get();
writer.sync().get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
Throwables.propagateIfPossible(e.getCause());
throw new IOException(e.getCause());
}
}