HBASE-15264 Implement a fan out HDFS OutputStream
This commit is contained in:
parent
a3b4575f70
commit
6e9d355b12
|
@ -0,0 +1,533 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
|
||||
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
|
||||
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
|
||||
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Deque;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
/**
|
||||
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
|
||||
* supports writing file with only one block.
|
||||
* <p>
|
||||
* Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
|
||||
* usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
|
||||
* method. And we place it here under util package because we want to make it independent of WAL
|
||||
* implementation thus easier to move it to HDFS project finally.
|
||||
* <p>
|
||||
* Note that, all connections to datanode will run in the same {@link EventLoop} which means we only
|
||||
* need one thread here. But be careful, we do some blocking operations in {@link #close()} and
|
||||
* {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
|
||||
* {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
|
||||
* {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them
|
||||
* outside {@link EventLoop}, there will be an extra context-switch.
|
||||
* <p>
|
||||
* Advantages compare to DFSOutputStream:
|
||||
* <ol>
|
||||
* <li>The fan out mechanism. This will reduce the latency.</li>
|
||||
* <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush
|
||||
* inside the EventLoop thread, so generally we only have one thread to do all the things.</li>
|
||||
* <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
|
||||
* ASAP.</li>
|
||||
* <li>We could benefit from netty's ByteBuf management mechanism.</li>
|
||||
* </ol>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FanOutOneBlockAsyncDFSOutput implements Closeable {
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private final FSUtils fsUtils;
|
||||
|
||||
private final DistributedFileSystem dfs;
|
||||
|
||||
private final DFSClient client;
|
||||
|
||||
private final ClientProtocol namenode;
|
||||
|
||||
private final String clientName;
|
||||
|
||||
private final String src;
|
||||
|
||||
private final long fileId;
|
||||
|
||||
private final LocatedBlock locatedBlock;
|
||||
|
||||
private final EventLoop eventLoop;
|
||||
|
||||
private final List<Channel> datanodeList;
|
||||
|
||||
private final DataChecksum summer;
|
||||
|
||||
private final ByteBufAllocator alloc;
|
||||
|
||||
private static final class Callback {
|
||||
|
||||
public final Promise<Void> promise;
|
||||
|
||||
public final long ackedLength;
|
||||
|
||||
public final Set<Channel> unfinishedReplicas;
|
||||
|
||||
public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) {
|
||||
this.promise = promise;
|
||||
this.ackedLength = ackedLength;
|
||||
if (replicas.isEmpty()) {
|
||||
this.unfinishedReplicas = Collections.emptySet();
|
||||
} else {
|
||||
this.unfinishedReplicas = Collections
|
||||
.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
|
||||
this.unfinishedReplicas.addAll(replicas);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final Deque<Callback> waitingAckQueue = new ArrayDeque<>();
|
||||
|
||||
// this could be different from acked block length because a packet can not start at the middle of
|
||||
// a chunk.
|
||||
private long nextPacketOffsetInBlock = 0L;
|
||||
|
||||
private long nextPacketSeqno = 0L;
|
||||
|
||||
private ByteBuf buf;
|
||||
|
||||
private enum State {
|
||||
STREAMING, CLOSING, BROKEN, CLOSED
|
||||
}
|
||||
|
||||
private State state;
|
||||
|
||||
private void completed(Channel channel) {
|
||||
if (waitingAckQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Callback c : waitingAckQueue) {
|
||||
if (c.unfinishedReplicas.remove(channel)) {
|
||||
if (c.unfinishedReplicas.isEmpty()) {
|
||||
c.promise.trySuccess(null);
|
||||
// since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas
|
||||
// is empty, so this could only happen at the head of waitingAckQueue, so we just call
|
||||
// removeFirst here.
|
||||
waitingAckQueue.removeFirst();
|
||||
// also wake up flush requests which have the same length.
|
||||
for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) {
|
||||
if (cb.ackedLength == c.ackedLength) {
|
||||
cb.promise.trySuccess(null);
|
||||
waitingAckQueue.removeFirst();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void failed(Channel channel, Supplier<Throwable> errorSupplier) {
|
||||
if (state == State.BROKEN || state == State.CLOSED) {
|
||||
return;
|
||||
}
|
||||
if (state == State.CLOSING) {
|
||||
Callback c = waitingAckQueue.peekFirst();
|
||||
if (c == null || !c.unfinishedReplicas.contains(channel)) {
|
||||
// nothing, the endBlock request has already finished.
|
||||
return;
|
||||
}
|
||||
}
|
||||
// disable further write, and fail all pending ack.
|
||||
state = State.BROKEN;
|
||||
Throwable error = errorSupplier.get();
|
||||
for (Callback c : waitingAckQueue) {
|
||||
c.promise.tryFailure(error);
|
||||
}
|
||||
waitingAckQueue.clear();
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void setupReceiver(final int timeoutMs) {
|
||||
SimpleChannelInboundHandler<PipelineAckProto> ackHandler = new SimpleChannelInboundHandler<PipelineAckProto>() {
|
||||
|
||||
@Override
|
||||
public boolean isSharable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
|
||||
throws Exception {
|
||||
final Status reply = getStatus(ack);
|
||||
if (reply != Status.SUCCESS) {
|
||||
failed(ctx.channel(), new Supplier<Throwable>() {
|
||||
|
||||
@Override
|
||||
public Throwable get() {
|
||||
return new IOException("Bad response " + reply + " for block "
|
||||
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (PipelineAck.isRestartOOBStatus(reply)) {
|
||||
failed(ctx.channel(), new Supplier<Throwable>() {
|
||||
|
||||
@Override
|
||||
public Throwable get() {
|
||||
return new IOException("Restart response " + reply + " for block "
|
||||
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
|
||||
return;
|
||||
}
|
||||
completed(ctx.channel());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
|
||||
failed(ctx.channel(), new Supplier<Throwable>() {
|
||||
|
||||
@Override
|
||||
public Throwable get() {
|
||||
return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause)
|
||||
throws Exception {
|
||||
failed(ctx.channel(), new Supplier<Throwable>() {
|
||||
|
||||
@Override
|
||||
public Throwable get() {
|
||||
return cause;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
IdleStateEvent e = (IdleStateEvent) evt;
|
||||
if (e.state() == IdleState.READER_IDLE) {
|
||||
failed(ctx.channel(), new Supplier<Throwable>() {
|
||||
|
||||
@Override
|
||||
public Throwable get() {
|
||||
return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
|
||||
}
|
||||
});
|
||||
} else if (e.state() == IdleState.WRITER_IDLE) {
|
||||
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
|
||||
int len = heartbeat.getSerializedSize();
|
||||
ByteBuf buf = alloc.buffer(len);
|
||||
heartbeat.putInBuffer(buf.nioBuffer(0, len));
|
||||
buf.writerIndex(len);
|
||||
ctx.channel().writeAndFlush(buf);
|
||||
}
|
||||
return;
|
||||
}
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
|
||||
};
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.pipeline().addLast(
|
||||
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
|
||||
new ProtobufVarint32FrameDecoder(),
|
||||
new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
|
||||
ch.config().setAutoRead(true);
|
||||
}
|
||||
}
|
||||
|
||||
FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
|
||||
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
|
||||
LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> datanodeList,
|
||||
DataChecksum summer, ByteBufAllocator alloc) {
|
||||
this.conf = conf;
|
||||
this.fsUtils = fsUtils;
|
||||
this.dfs = dfs;
|
||||
this.client = client;
|
||||
this.namenode = namenode;
|
||||
this.fileId = fileId;
|
||||
this.clientName = clientName;
|
||||
this.src = src;
|
||||
this.locatedBlock = locatedBlock;
|
||||
this.eventLoop = eventLoop;
|
||||
this.datanodeList = datanodeList;
|
||||
this.summer = summer;
|
||||
this.alloc = alloc;
|
||||
this.buf = alloc.directBuffer();
|
||||
this.state = State.STREAMING;
|
||||
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Just call write(b, 0, b.length).
|
||||
* @see #write(byte[], int, int)
|
||||
*/
|
||||
public void write(byte[] b) {
|
||||
write(b, 0, b.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy the data into the buffer. Note that you need to call
|
||||
* {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
|
||||
*/
|
||||
public void write(final byte[] b, final int off, final int len) {
|
||||
if (eventLoop.inEventLoop()) {
|
||||
buf.ensureWritable(len).writeBytes(b, off, len);
|
||||
} else {
|
||||
eventLoop.submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
buf.ensureWritable(len).writeBytes(b, off, len);
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the current size of buffered data.
|
||||
*/
|
||||
public int buffered() {
|
||||
if (eventLoop.inEventLoop()) {
|
||||
return buf.readableBytes();
|
||||
} else {
|
||||
return eventLoop.submit(new Callable<Integer>() {
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return buf.readableBytes();
|
||||
}
|
||||
}).syncUninterruptibly().getNow().intValue();
|
||||
}
|
||||
}
|
||||
|
||||
public DatanodeInfo[] getPipeline() {
|
||||
return locatedBlock.getLocations();
|
||||
}
|
||||
|
||||
private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
|
||||
boolean syncBlock) {
|
||||
if (state != State.STREAMING) {
|
||||
handler.failed(new IOException("stream already broken"), attachment);
|
||||
return;
|
||||
}
|
||||
int dataLen = buf.readableBytes();
|
||||
final long ackedLength = nextPacketOffsetInBlock + dataLen;
|
||||
if (ackedLength == locatedBlock.getBlock().getNumBytes()) {
|
||||
// no new data, just return
|
||||
handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
|
||||
return;
|
||||
}
|
||||
Promise<Void> promise = eventLoop.newPromise();
|
||||
promise.addListener(new FutureListener<Void>() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Void> future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
locatedBlock.getBlock().setNumBytes(ackedLength);
|
||||
handler.completed(ackedLength, attachment);
|
||||
} else {
|
||||
handler.failed(future.cause(), attachment);
|
||||
}
|
||||
}
|
||||
});
|
||||
Callback c = waitingAckQueue.peekLast();
|
||||
if (c != null && ackedLength == c.ackedLength) {
|
||||
// just append it to the tail of waiting ack queue,, do not issue new hflush request.
|
||||
waitingAckQueue
|
||||
.addLast(new Callback(promise, ackedLength, Collections.<Channel> emptyList()));
|
||||
return;
|
||||
}
|
||||
int chunkLen = summer.getBytesPerChecksum();
|
||||
int trailingPartialChunkLen = dataLen % chunkLen;
|
||||
int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
|
||||
int checksumLen = numChecks * summer.getChecksumSize();
|
||||
ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
|
||||
summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
|
||||
checksumBuf.writerIndex(checksumLen);
|
||||
PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
|
||||
nextPacketSeqno, false, dataLen, syncBlock);
|
||||
int headerLen = header.getSerializedSize();
|
||||
ByteBuf headerBuf = alloc.buffer(headerLen);
|
||||
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
|
||||
headerBuf.writerIndex(headerLen);
|
||||
|
||||
waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList));
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.write(headerBuf.duplicate().retain());
|
||||
ch.write(checksumBuf.duplicate().retain());
|
||||
ch.writeAndFlush(buf.duplicate().retain());
|
||||
}
|
||||
checksumBuf.release();
|
||||
headerBuf.release();
|
||||
ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
|
||||
if (trailingPartialChunkLen != 0) {
|
||||
buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
|
||||
}
|
||||
buf.release();
|
||||
this.buf = newBuf;
|
||||
nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen;
|
||||
nextPacketSeqno++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the buffer out to datanodes.
|
||||
* @param attachment will be passed to handler when completed.
|
||||
* @param handler will set the acked length as result when completed.
|
||||
* @param syncBlock will call hsync if true, otherwise hflush.
|
||||
*/
|
||||
public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
|
||||
final boolean syncBlock) {
|
||||
if (eventLoop.inEventLoop()) {
|
||||
flush0(attachment, handler, syncBlock);
|
||||
} else {
|
||||
eventLoop.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
flush0(attachment, handler, syncBlock);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void endBlock(Promise<Void> promise, long size) {
|
||||
if (state != State.STREAMING) {
|
||||
promise.tryFailure(new IOException("stream already broken"));
|
||||
return;
|
||||
}
|
||||
if (!waitingAckQueue.isEmpty()) {
|
||||
promise.tryFailure(new IllegalStateException("should call flush first before calling close"));
|
||||
return;
|
||||
}
|
||||
state = State.CLOSING;
|
||||
PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false);
|
||||
buf.release();
|
||||
buf = null;
|
||||
int headerLen = header.getSerializedSize();
|
||||
ByteBuf headerBuf = alloc.buffer(headerLen);
|
||||
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
|
||||
headerBuf.writerIndex(headerLen);
|
||||
waitingAckQueue.add(new Callback(promise, size, datanodeList));
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.writeAndFlush(headerBuf.duplicate().retain());
|
||||
}
|
||||
headerBuf.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* The close method when error occurred. Now we just call recoverFileLease.
|
||||
*/
|
||||
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
|
||||
assert !eventLoop.inEventLoop();
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.closeFuture().awaitUninterruptibly();
|
||||
}
|
||||
endFileLease(client, src, fileId);
|
||||
fsUtils.recoverFileLease(dfs, new Path(src), conf,
|
||||
reporter == null ? new CancelOnClose(client) : reporter);
|
||||
}
|
||||
|
||||
/**
|
||||
* End the current block and complete file at namenode. You should call
|
||||
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
|
||||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
assert !eventLoop.inEventLoop();
|
||||
final Promise<Void> promise = eventLoop.newPromise();
|
||||
eventLoop.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes());
|
||||
}
|
||||
});
|
||||
promise.addListener(new FutureListener<Void>() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Void> future) throws Exception {
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.close();
|
||||
}
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
for (Channel ch : datanodeList) {
|
||||
ch.closeFuture().awaitUninterruptibly();
|
||||
}
|
||||
completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,672 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemLinkResolver;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
/**
|
||||
* Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FanOutOneBlockAsyncDFSOutputHelper {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class);
|
||||
|
||||
private FanOutOneBlockAsyncDFSOutputHelper() {
|
||||
}
|
||||
|
||||
// use pooled allocator for performance.
|
||||
private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
|
||||
|
||||
// copied from DFSPacket since it is package private.
|
||||
public static final long HEART_BEAT_SEQNO = -1L;
|
||||
|
||||
// helper class for creating DataChecksum object.
|
||||
private static final Method CREATE_CHECKSUM;
|
||||
|
||||
// helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
|
||||
// getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
|
||||
// get from proto directly, or combined by the reply field of the proto and a ECN object. See
|
||||
// createPipelineAckStatusGetter for more details.
|
||||
private interface PipelineAckStatusGetter {
|
||||
Status get(PipelineAckProto ack);
|
||||
}
|
||||
|
||||
private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
|
||||
|
||||
// StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and
|
||||
// the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the
|
||||
// setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more
|
||||
// details.
|
||||
private interface StorageTypeSetter {
|
||||
OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
|
||||
}
|
||||
|
||||
private static final StorageTypeSetter STORAGE_TYPE_SETTER;
|
||||
|
||||
// helper class for calling create method on namenode. There is a supportedVersions parameter for
|
||||
// hadoop 2.6 or after. See createFileCreater for more details.
|
||||
private interface FileCreater {
|
||||
HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws IOException;
|
||||
}
|
||||
|
||||
private static final FileCreater FILE_CREATER;
|
||||
|
||||
// helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
|
||||
// hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
|
||||
private interface LeaseManager {
|
||||
|
||||
void begin(DFSClient client, String src, long inodeId);
|
||||
|
||||
void end(DFSClient client, String src, long inodeId);
|
||||
}
|
||||
|
||||
private static final LeaseManager LEASE_MANAGER;
|
||||
|
||||
// This is used to terminate a recoverFileLease call when FileSystem is already closed.
|
||||
// isClientRunning is not public so we need to use reflection.
|
||||
private interface DFSClientAdaptor {
|
||||
boolean isClientRunning(DFSClient client);
|
||||
}
|
||||
|
||||
private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
|
||||
|
||||
private static DFSClientAdaptor createDFSClientAdaptor() {
|
||||
try {
|
||||
final Method method = DFSClient.class.getDeclaredMethod("isClientRunning");
|
||||
method.setAccessible(true);
|
||||
return new DFSClientAdaptor() {
|
||||
|
||||
@Override
|
||||
public boolean isClientRunning(DFSClient client) {
|
||||
try {
|
||||
return (Boolean) method.invoke(client);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static LeaseManager createLeaseManager() {
|
||||
try {
|
||||
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
|
||||
long.class, DFSOutputStream.class);
|
||||
beginFileLeaseMethod.setAccessible(true);
|
||||
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
|
||||
long.class);
|
||||
endFileLeaseMethod.setAccessible(true);
|
||||
return new LeaseManager() {
|
||||
|
||||
@Override
|
||||
public void begin(DFSClient client, String src, long inodeId) {
|
||||
try {
|
||||
beginFileLeaseMethod.invoke(client, inodeId, null);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void end(DFSClient client, String src, long inodeId) {
|
||||
try {
|
||||
endFileLeaseMethod.invoke(client, inodeId);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e);
|
||||
}
|
||||
try {
|
||||
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
|
||||
String.class, DFSOutputStream.class);
|
||||
beginFileLeaseMethod.setAccessible(true);
|
||||
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
|
||||
String.class);
|
||||
endFileLeaseMethod.setAccessible(true);
|
||||
return new LeaseManager() {
|
||||
|
||||
@Override
|
||||
public void begin(DFSClient client, String src, long inodeId) {
|
||||
try {
|
||||
beginFileLeaseMethod.invoke(client, src, null);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void end(DFSClient client, String src, long inodeId) {
|
||||
try {
|
||||
endFileLeaseMethod.invoke(client, src);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static PipelineAckStatusGetter createPipelineAckStatusGetter() {
|
||||
try {
|
||||
final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
|
||||
@SuppressWarnings("rawtypes")
|
||||
Class<? extends Enum> ecnClass;
|
||||
try {
|
||||
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
|
||||
.asSubclass(Enum.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
|
||||
final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
|
||||
final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
|
||||
Status.class);
|
||||
final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
|
||||
int.class);
|
||||
return new PipelineAckStatusGetter() {
|
||||
|
||||
@Override
|
||||
public Status get(PipelineAckProto ack) {
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
|
||||
Integer headerFlag;
|
||||
if (flagList.isEmpty()) {
|
||||
Status reply = (Status) getReplyMethod.invoke(ack, 0);
|
||||
headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
|
||||
} else {
|
||||
headerFlag = flagList.get(0);
|
||||
}
|
||||
return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.warn("Can not get expected methods, should be hadoop 2.6-", e);
|
||||
}
|
||||
try {
|
||||
final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
|
||||
return new PipelineAckStatusGetter() {
|
||||
|
||||
@Override
|
||||
public Status get(PipelineAckProto ack) {
|
||||
try {
|
||||
return (Status) getStatusMethod.invoke(ack, 0);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static StorageTypeSetter createStorageTypeSetter() {
|
||||
final Method setStorageTypeMethod;
|
||||
try {
|
||||
setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
|
||||
StorageTypeProto.class);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
|
||||
return new StorageTypeSetter() {
|
||||
|
||||
@Override
|
||||
public Builder set(Builder builder, Enum<?> storageType) {
|
||||
return builder;
|
||||
}
|
||||
};
|
||||
}
|
||||
ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
|
||||
for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
|
||||
builder.put(storageTypeProto.name(), storageTypeProto);
|
||||
}
|
||||
final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
|
||||
return new StorageTypeSetter() {
|
||||
|
||||
@Override
|
||||
public Builder set(Builder builder, Enum<?> storageType) {
|
||||
Object protoEnum = name2ProtoEnum.get(storageType.name());
|
||||
try {
|
||||
setStorageTypeMethod.invoke(builder, protoEnum);
|
||||
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static FileCreater createFileCreater() {
|
||||
for (Method method : ClientProtocol.class.getMethods()) {
|
||||
if (method.getName().equals("create")) {
|
||||
final Method createMethod = method;
|
||||
Class<?>[] paramTypes = createMethod.getParameterTypes();
|
||||
if (paramTypes[paramTypes.length - 1] == long.class) {
|
||||
return new FileCreater() {
|
||||
|
||||
@Override
|
||||
public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws IOException {
|
||||
try {
|
||||
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
|
||||
createParent, replication, blockSize);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} else {
|
||||
try {
|
||||
Class<?> cryptoProtocolVersionClass = Class
|
||||
.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
|
||||
Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
|
||||
final Object supported = supportedMethod.invoke(null);
|
||||
return new FileCreater() {
|
||||
|
||||
@Override
|
||||
public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws IOException {
|
||||
try {
|
||||
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
|
||||
flag, createParent, replication, blockSize, supported);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
|
||||
| InvocationTargetException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new Error("No create method found for " + ClientProtocol.class.getName());
|
||||
}
|
||||
|
||||
// cancel the processing if DFSClient is already closed.
|
||||
static final class CancelOnClose implements CancelableProgressable {
|
||||
|
||||
private final DFSClient client;
|
||||
|
||||
public CancelOnClose(DFSClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean progress() {
|
||||
return DFS_CLIENT_ADAPTOR.isClientRunning(client);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static {
|
||||
try {
|
||||
CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum");
|
||||
CREATE_CHECKSUM.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
|
||||
PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
|
||||
STORAGE_TYPE_SETTER = createStorageTypeSetter();
|
||||
FILE_CREATER = createFileCreater();
|
||||
LEASE_MANAGER = createLeaseManager();
|
||||
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
|
||||
}
|
||||
|
||||
static void beginFileLease(DFSClient client, String src, long inodeId) {
|
||||
LEASE_MANAGER.begin(client, src, inodeId);
|
||||
}
|
||||
|
||||
static void endFileLease(DFSClient client, String src, long inodeId) {
|
||||
LEASE_MANAGER.end(client, src, inodeId);
|
||||
}
|
||||
|
||||
static DataChecksum createChecksum(DFSClient client) {
|
||||
try {
|
||||
return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf());
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
static Status getStatus(PipelineAckProto ack) {
|
||||
return PIPELINE_ACK_STATUS_GETTER.get(ack);
|
||||
}
|
||||
|
||||
private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo,
|
||||
final Promise<Channel> promise, final int timeoutMs) {
|
||||
channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
|
||||
new ProtobufVarint32FrameDecoder(),
|
||||
new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
|
||||
new SimpleChannelInboundHandler<BlockOpResponseProto>() {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
|
||||
throws Exception {
|
||||
Status pipelineStatus = resp.getStatus();
|
||||
if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
|
||||
throw new IOException("datanode " + dnInfo + " is restarting");
|
||||
}
|
||||
String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
|
||||
if (resp.getStatus() != Status.SUCCESS) {
|
||||
if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
||||
throw new InvalidBlockTokenException("Got access token error" + ", status message "
|
||||
+ resp.getMessage() + ", " + logInfo);
|
||||
} else {
|
||||
throw new IOException("Got error" + ", status=" + resp.getStatus().name()
|
||||
+ ", status message " + resp.getMessage() + ", " + logInfo);
|
||||
}
|
||||
}
|
||||
// success
|
||||
ChannelPipeline p = ctx.pipeline();
|
||||
while (p.first() != null) {
|
||||
p.removeFirst();
|
||||
}
|
||||
// Disable auto read here. Enable it after we setup the streaming pipeline in
|
||||
// FanOutOneBLockAsyncDFSOutput.
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
promise.trySuccess(ctx.channel());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (evt instanceof IdleStateEvent
|
||||
&& ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
|
||||
promise
|
||||
.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
|
||||
} else {
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
promise.tryFailure(cause);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void requestWriteBlock(Channel channel, Enum<?> storageType,
|
||||
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
|
||||
// TODO: SASL negotiation. should be done using a netty Handler.
|
||||
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
|
||||
int protoLen = proto.getSerializedSize();
|
||||
ByteBuf buffer = channel.alloc()
|
||||
.buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
|
||||
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
|
||||
buffer.writeByte(Op.WRITE_BLOCK.code);
|
||||
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
|
||||
channel.writeAndFlush(buffer);
|
||||
}
|
||||
|
||||
private static List<Future<Channel>> connectToDataNodes(Configuration conf, String clientName,
|
||||
LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage,
|
||||
DataChecksum summer, EventLoop eventLoop) {
|
||||
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
|
||||
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
|
||||
boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
||||
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||
final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||
HdfsServerConstants.READ_TIMEOUT);
|
||||
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
|
||||
blockCopy.setNumBytes(locatedBlock.getBlockSize());
|
||||
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
|
||||
.setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
|
||||
.setToken(PBHelper.convert(locatedBlock.getBlockToken())))
|
||||
.setClientName(clientName).build();
|
||||
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
|
||||
final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
|
||||
.setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
|
||||
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
|
||||
.setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
|
||||
.setRequestedChecksum(checksumProto)
|
||||
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
|
||||
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
|
||||
for (int i = 0; i < datanodeInfos.length; i++) {
|
||||
final DatanodeInfo dnInfo = datanodeInfos[i];
|
||||
// Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType
|
||||
// will cause compilation error for hadoop 2.5 or before.
|
||||
final Enum<?> storageType = storageTypes[i];
|
||||
final Promise<Channel> promise = eventLoop.newPromise();
|
||||
futureList.add(promise);
|
||||
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
|
||||
new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
|
||||
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
|
||||
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
processWriteBlockResponse(ch, dnInfo, promise, timeoutMs);
|
||||
}
|
||||
}).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
requestWriteBlock(future.channel(), storageType, writeBlockProtoBuilder);
|
||||
} else {
|
||||
promise.tryFailure(future.cause());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return futureList;
|
||||
}
|
||||
|
||||
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
|
||||
boolean overwrite, boolean createParent, short replication, long blockSize,
|
||||
EventLoop eventLoop) throws IOException {
|
||||
Configuration conf = dfs.getConf();
|
||||
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
|
||||
DFSClient client = dfs.getClient();
|
||||
String clientName = client.getClientName();
|
||||
ClientProtocol namenode = client.getNamenode();
|
||||
HdfsFileStatus stat = FILE_CREATER.create(namenode, src,
|
||||
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
|
||||
new EnumSetWritable<CreateFlag>(
|
||||
overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
|
||||
createParent, replication, blockSize);
|
||||
beginFileLease(client, src, stat.getFileId());
|
||||
boolean succ = false;
|
||||
LocatedBlock locatedBlock = null;
|
||||
List<Channel> datanodeList = new ArrayList<>();
|
||||
try {
|
||||
DataChecksum summer = createChecksum(client);
|
||||
locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(),
|
||||
null);
|
||||
for (Future<Channel> future : connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L,
|
||||
PIPELINE_SETUP_CREATE, summer, eventLoop)) {
|
||||
// fail the creation if there are connection failures since we are fail-fast. The upper
|
||||
// layer should retry itself if needed.
|
||||
datanodeList.add(future.syncUninterruptibly().getNow());
|
||||
}
|
||||
succ = true;
|
||||
return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
|
||||
stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC);
|
||||
} finally {
|
||||
if (!succ) {
|
||||
for (Channel c : datanodeList) {
|
||||
c.close();
|
||||
}
|
||||
endFileLease(client, src, stat.getFileId());
|
||||
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
|
||||
* inside {@link EventLoop}.
|
||||
* @param eventLoop all connections to datanode will use the same event loop.
|
||||
*/
|
||||
public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f,
|
||||
final boolean overwrite, final boolean createParent, final short replication,
|
||||
final long blockSize, final EventLoop eventLoop) throws IOException {
|
||||
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
|
||||
|
||||
@Override
|
||||
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
|
||||
blockSize, eventLoop);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}.resolve(dfs, f);
|
||||
}
|
||||
|
||||
static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
|
||||
ExtendedBlock block, long fileId) {
|
||||
for (int retry = 0;; retry++) {
|
||||
try {
|
||||
if (namenode.complete(src, clientName, block, fileId)) {
|
||||
endFileLease(client, src, fileId);
|
||||
return;
|
||||
} else {
|
||||
LOG.warn("complete file " + src + " not finished, retry = " + retry);
|
||||
}
|
||||
} catch (LeaseExpiredException e) {
|
||||
LOG.warn("lease for file " + src + " is expired, give up", e);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("complete file " + src + " failed, retry = " + retry, e);
|
||||
}
|
||||
sleepIgnoreInterrupt(retry);
|
||||
}
|
||||
}
|
||||
|
||||
static void sleepIgnoreInterrupt(int retry) {
|
||||
try {
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public final class FanOutOneBlockAsyncDFSOutputFlushHandler
|
||||
implements CompletionHandler<Long, Void> {
|
||||
|
||||
private long size;
|
||||
|
||||
private Throwable error;
|
||||
|
||||
private boolean finished;
|
||||
|
||||
@Override
|
||||
public synchronized void completed(Long result, Void attachment) {
|
||||
size = result.longValue();
|
||||
finished = true;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void failed(Throwable exc, Void attachment) {
|
||||
error = exc;
|
||||
finished = true;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public synchronized long get() throws InterruptedException, ExecutionException {
|
||||
while (!finished) {
|
||||
wait();
|
||||
}
|
||||
if (error != null) {
|
||||
throw new ExecutionException(error);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
size = 0L;
|
||||
error = null;
|
||||
finished = false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,190 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ MiscTests.class, MediumTests.class })
|
||||
public class TestFanOutOneBlockAsyncDFSOutput {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static DistributedFileSystem FS;
|
||||
|
||||
private static EventLoopGroup EVENT_LOOP_GROUP;
|
||||
|
||||
private static int READ_TIMEOUT_MS = 2000;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG);
|
||||
Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG);
|
||||
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
FS = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
EVENT_LOOP_GROUP = new NioEventLoopGroup();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException, InterruptedException {
|
||||
if (EVENT_LOOP_GROUP != null) {
|
||||
EVENT_LOOP_GROUP.shutdownGracefully().sync();
|
||||
}
|
||||
TEST_UTIL.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out)
|
||||
throws IOException, InterruptedException, ExecutionException {
|
||||
final byte[] b = new byte[10];
|
||||
ThreadLocalRandom.current().nextBytes(b);
|
||||
final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
|
||||
new FanOutOneBlockAsyncDFSOutputFlushHandler();
|
||||
eventLoop.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
out.write(b, 0, b.length);
|
||||
out.flush(null, handler, false);
|
||||
}
|
||||
});
|
||||
assertEquals(b.length, handler.get());
|
||||
out.close();
|
||||
assertEquals(b.length, FS.getFileStatus(f).getLen());
|
||||
byte[] actual = new byte[b.length];
|
||||
try (FSDataInputStream in = FS.open(f)) {
|
||||
in.readFully(actual);
|
||||
}
|
||||
assertArrayEquals(b, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException, ExecutionException {
|
||||
Path f = new Path("/" + name.getMethodName());
|
||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||
final FanOutOneBlockAsyncDFSOutput out =
|
||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
||||
FS.getDefaultBlockSize(), eventLoop);
|
||||
writeAndVerify(eventLoop, f, out);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecover() throws IOException, InterruptedException, ExecutionException {
|
||||
Path f = new Path("/" + name.getMethodName());
|
||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||
final FanOutOneBlockAsyncDFSOutput out =
|
||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
||||
FS.getDefaultBlockSize(), eventLoop);
|
||||
final byte[] b = new byte[10];
|
||||
ThreadLocalRandom.current().nextBytes(b);
|
||||
final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
|
||||
new FanOutOneBlockAsyncDFSOutputFlushHandler();
|
||||
eventLoop.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
out.write(b, 0, b.length);
|
||||
out.flush(null, handler, false);
|
||||
}
|
||||
});
|
||||
handler.get();
|
||||
// restart one datanode which causes one connection broken
|
||||
TEST_UTIL.getDFSCluster().restartDataNode(0);
|
||||
handler.reset();
|
||||
eventLoop.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
out.write(b, 0, b.length);
|
||||
out.flush(null, handler, false);
|
||||
}
|
||||
});
|
||||
try {
|
||||
handler.get();
|
||||
fail("flush should fail");
|
||||
} catch (ExecutionException e) {
|
||||
// we restarted one datanode so the flush should fail
|
||||
e.printStackTrace();
|
||||
}
|
||||
out.recoverAndClose(null);
|
||||
assertEquals(b.length, FS.getFileStatus(f).getLen());
|
||||
byte[] actual = new byte[b.length];
|
||||
try (FSDataInputStream in = FS.open(f)) {
|
||||
in.readFully(actual);
|
||||
}
|
||||
assertArrayEquals(b, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
|
||||
Path f = new Path("/" + name.getMethodName());
|
||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||
final FanOutOneBlockAsyncDFSOutput out =
|
||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
||||
FS.getDefaultBlockSize(), eventLoop);
|
||||
Thread.sleep(READ_TIMEOUT_MS * 2);
|
||||
// the connection to datanode should still alive.
|
||||
writeAndVerify(eventLoop, f, out);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is important for fencing when recover from RS crash.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateParentFailed() throws IOException {
|
||||
Path f = new Path("/" + name.getMethodName() + "/test");
|
||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||
try {
|
||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
||||
FS.getDefaultBlockSize(), eventLoop);
|
||||
fail("should fail with parent does not exist");
|
||||
} catch (RemoteException e) {
|
||||
assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue