HDFS-2512. Add textual error message to data transfer protocol responses. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
611bb588f8
commit
e16dd6212e
|
@ -758,6 +758,9 @@ Release 0.23.0 - Unreleased
|
||||||
HDFS-2436. Change FSNamesystem.setTimes(..) for allowing setting times on
|
HDFS-2436. Change FSNamesystem.setTimes(..) for allowing setting times on
|
||||||
directories. (Uma Maheswara Rao G via szetszwo)
|
directories. (Uma Maheswara Rao G via szetszwo)
|
||||||
|
|
||||||
|
HDFS-2512. Add textual error message to data transfer protocol responses
|
||||||
|
(todd)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||||
// source: datatransfer.proto
|
// source: datatransfer.proto
|
||||||
|
|
||||||
|
@ -6936,6 +6935,10 @@ public final class DataTransferProtos {
|
||||||
boolean hasChecksumResponse();
|
boolean hasChecksumResponse();
|
||||||
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto getChecksumResponse();
|
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto getChecksumResponse();
|
||||||
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProtoOrBuilder getChecksumResponseOrBuilder();
|
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProtoOrBuilder getChecksumResponseOrBuilder();
|
||||||
|
|
||||||
|
// optional string message = 4;
|
||||||
|
boolean hasMessage();
|
||||||
|
String getMessage();
|
||||||
}
|
}
|
||||||
public static final class BlockOpResponseProto extends
|
public static final class BlockOpResponseProto extends
|
||||||
com.google.protobuf.GeneratedMessage
|
com.google.protobuf.GeneratedMessage
|
||||||
|
@ -7021,10 +7024,43 @@ public final class DataTransferProtos {
|
||||||
return checksumResponse_;
|
return checksumResponse_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional string message = 4;
|
||||||
|
public static final int MESSAGE_FIELD_NUMBER = 4;
|
||||||
|
private java.lang.Object message_;
|
||||||
|
public boolean hasMessage() {
|
||||||
|
return ((bitField0_ & 0x00000008) == 0x00000008);
|
||||||
|
}
|
||||||
|
public String getMessage() {
|
||||||
|
java.lang.Object ref = message_;
|
||||||
|
if (ref instanceof String) {
|
||||||
|
return (String) ref;
|
||||||
|
} else {
|
||||||
|
com.google.protobuf.ByteString bs =
|
||||||
|
(com.google.protobuf.ByteString) ref;
|
||||||
|
String s = bs.toStringUtf8();
|
||||||
|
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
|
||||||
|
message_ = s;
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private com.google.protobuf.ByteString getMessageBytes() {
|
||||||
|
java.lang.Object ref = message_;
|
||||||
|
if (ref instanceof String) {
|
||||||
|
com.google.protobuf.ByteString b =
|
||||||
|
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
|
||||||
|
message_ = b;
|
||||||
|
return b;
|
||||||
|
} else {
|
||||||
|
return (com.google.protobuf.ByteString) ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
status_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
|
status_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
|
||||||
firstBadLink_ = "";
|
firstBadLink_ = "";
|
||||||
checksumResponse_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto.getDefaultInstance();
|
checksumResponse_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto.getDefaultInstance();
|
||||||
|
message_ = "";
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -7057,6 +7093,9 @@ public final class DataTransferProtos {
|
||||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||||
output.writeMessage(3, checksumResponse_);
|
output.writeMessage(3, checksumResponse_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
output.writeBytes(4, getMessageBytes());
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7078,6 +7117,10 @@ public final class DataTransferProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeMessageSize(3, checksumResponse_);
|
.computeMessageSize(3, checksumResponse_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeBytesSize(4, getMessageBytes());
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -7116,6 +7159,11 @@ public final class DataTransferProtos {
|
||||||
result = result && getChecksumResponse()
|
result = result && getChecksumResponse()
|
||||||
.equals(other.getChecksumResponse());
|
.equals(other.getChecksumResponse());
|
||||||
}
|
}
|
||||||
|
result = result && (hasMessage() == other.hasMessage());
|
||||||
|
if (hasMessage()) {
|
||||||
|
result = result && getMessage()
|
||||||
|
.equals(other.getMessage());
|
||||||
|
}
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -7137,6 +7185,10 @@ public final class DataTransferProtos {
|
||||||
hash = (37 * hash) + CHECKSUMRESPONSE_FIELD_NUMBER;
|
hash = (37 * hash) + CHECKSUMRESPONSE_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + getChecksumResponse().hashCode();
|
hash = (53 * hash) + getChecksumResponse().hashCode();
|
||||||
}
|
}
|
||||||
|
if (hasMessage()) {
|
||||||
|
hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + getMessage().hashCode();
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
@ -7264,6 +7316,8 @@ public final class DataTransferProtos {
|
||||||
checksumResponseBuilder_.clear();
|
checksumResponseBuilder_.clear();
|
||||||
}
|
}
|
||||||
bitField0_ = (bitField0_ & ~0x00000004);
|
bitField0_ = (bitField0_ & ~0x00000004);
|
||||||
|
message_ = "";
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7318,6 +7372,10 @@ public final class DataTransferProtos {
|
||||||
} else {
|
} else {
|
||||||
result.checksumResponse_ = checksumResponseBuilder_.build();
|
result.checksumResponse_ = checksumResponseBuilder_.build();
|
||||||
}
|
}
|
||||||
|
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
to_bitField0_ |= 0x00000008;
|
||||||
|
}
|
||||||
|
result.message_ = message_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -7343,6 +7401,9 @@ public final class DataTransferProtos {
|
||||||
if (other.hasChecksumResponse()) {
|
if (other.hasChecksumResponse()) {
|
||||||
mergeChecksumResponse(other.getChecksumResponse());
|
mergeChecksumResponse(other.getChecksumResponse());
|
||||||
}
|
}
|
||||||
|
if (other.hasMessage()) {
|
||||||
|
setMessage(other.getMessage());
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -7409,6 +7470,11 @@ public final class DataTransferProtos {
|
||||||
setChecksumResponse(subBuilder.buildPartial());
|
setChecksumResponse(subBuilder.buildPartial());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 34: {
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
message_ = input.readBytes();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7565,6 +7631,42 @@ public final class DataTransferProtos {
|
||||||
return checksumResponseBuilder_;
|
return checksumResponseBuilder_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional string message = 4;
|
||||||
|
private java.lang.Object message_ = "";
|
||||||
|
public boolean hasMessage() {
|
||||||
|
return ((bitField0_ & 0x00000008) == 0x00000008);
|
||||||
|
}
|
||||||
|
public String getMessage() {
|
||||||
|
java.lang.Object ref = message_;
|
||||||
|
if (!(ref instanceof String)) {
|
||||||
|
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
|
||||||
|
message_ = s;
|
||||||
|
return s;
|
||||||
|
} else {
|
||||||
|
return (String) ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public Builder setMessage(String value) {
|
||||||
|
if (value == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
message_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearMessage() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
|
message_ = getDefaultInstance().getMessage();
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
void setMessage(com.google.protobuf.ByteString value) {
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
message_ = value;
|
||||||
|
onChanged();
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:BlockOpResponseProto)
|
// @@protoc_insertion_point(builder_scope:BlockOpResponseProto)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8995,19 +9097,20 @@ public final class DataTransferProtos {
|
||||||
"\030\001 \002(\020\022\r\n\005seqno\030\002 \002(\020\022\031\n\021lastPacketInBlo" +
|
"\030\001 \002(\020\022\r\n\005seqno\030\002 \002(\020\022\031\n\021lastPacketInBlo" +
|
||||||
"ck\030\003 \002(\010\022\017\n\007dataLen\030\004 \002(\017\":\n\020PipelineAck" +
|
"ck\030\003 \002(\010\022\017\n\007dataLen\030\004 \002(\017\":\n\020PipelineAck" +
|
||||||
"Proto\022\r\n\005seqno\030\001 \002(\022\022\027\n\006status\030\002 \003(\0162\007.S" +
|
"Proto\022\r\n\005seqno\030\001 \002(\022\022\027\n\006status\030\002 \003(\0162\007.S" +
|
||||||
"tatus\"~\n\024BlockOpResponseProto\022\027\n\006status\030" +
|
"tatus\"\217\001\n\024BlockOpResponseProto\022\027\n\006status" +
|
||||||
"\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002 \001(\t\0227\n\020" +
|
"\030\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002 \001(\t\0227\n" +
|
||||||
"checksumResponse\030\003 \001(\0132\035.OpBlockChecksum" +
|
"\020checksumResponse\030\003 \001(\0132\035.OpBlockChecksu" +
|
||||||
"ResponseProto\"0\n\025ClientReadStatusProto\022\027" +
|
"mResponseProto\022\017\n\007message\030\004 \001(\t\"0\n\025Clien" +
|
||||||
"\n\006status\030\001 \002(\0162\007.Status\"-\n\022DNTransferAck" +
|
"tReadStatusProto\022\027\n\006status\030\001 \002(\0162\007.Statu" +
|
||||||
"Proto\022\027\n\006status\030\001 \002(\0162\007.Status\"U\n\034OpBloc",
|
"s\"-\n\022DNTransferAckProto\022\027\n\006status\030\001 \002(\0162",
|
||||||
"kChecksumResponseProto\022\023\n\013bytesPerCrc\030\001 " +
|
"\007.Status\"U\n\034OpBlockChecksumResponseProto" +
|
||||||
"\002(\r\022\023\n\013crcPerBlock\030\002 \002(\004\022\013\n\003md5\030\003 \002(\014*\202\001" +
|
"\022\023\n\013bytesPerCrc\030\001 \002(\r\022\023\n\013crcPerBlock\030\002 \002" +
|
||||||
"\n\006Status\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\022\n\016ERRO" +
|
"(\004\022\013\n\003md5\030\003 \002(\014*\202\001\n\006Status\022\013\n\007SUCCESS\020\000\022" +
|
||||||
"R_CHECKSUM\020\002\022\021\n\rERROR_INVALID\020\003\022\020\n\014ERROR" +
|
"\t\n\005ERROR\020\001\022\022\n\016ERROR_CHECKSUM\020\002\022\021\n\rERROR_" +
|
||||||
"_EXISTS\020\004\022\026\n\022ERROR_ACCESS_TOKEN\020\005\022\017\n\013CHE" +
|
"INVALID\020\003\022\020\n\014ERROR_EXISTS\020\004\022\026\n\022ERROR_ACC" +
|
||||||
"CKSUM_OK\020\006B>\n%org.apache.hadoop.hdfs.pro" +
|
"ESS_TOKEN\020\005\022\017\n\013CHECKSUM_OK\020\006B>\n%org.apac" +
|
||||||
"tocol.protoB\022DataTransferProtos\240\001\001"
|
"he.hadoop.hdfs.protocol.protoB\022DataTrans" +
|
||||||
|
"ferProtos\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -9099,7 +9202,7 @@ public final class DataTransferProtos {
|
||||||
internal_static_BlockOpResponseProto_fieldAccessorTable = new
|
internal_static_BlockOpResponseProto_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_BlockOpResponseProto_descriptor,
|
internal_static_BlockOpResponseProto_descriptor,
|
||||||
new java.lang.String[] { "Status", "FirstBadLink", "ChecksumResponse", },
|
new java.lang.String[] { "Status", "FirstBadLink", "ChecksumResponse", "Message", },
|
||||||
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.class,
|
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.class,
|
||||||
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder.class);
|
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder.class);
|
||||||
internal_static_ClientReadStatusProto_descriptor =
|
internal_static_ClientReadStatusProto_descriptor =
|
||||||
|
|
|
@ -358,7 +358,8 @@ public class Balancer {
|
||||||
if (response.getStatus() != Status.SUCCESS) {
|
if (response.getStatus() != Status.SUCCESS) {
|
||||||
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
|
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
|
||||||
throw new IOException("block move failed due to access token error");
|
throw new IOException("block move failed due to access token error");
|
||||||
throw new IOException("block move is failed");
|
throw new IOException("block move is failed: " +
|
||||||
|
response.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
|
@ -225,13 +227,14 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
blockSender = new BlockSender(block, blockOffset, length,
|
blockSender = new BlockSender(block, blockOffset, length,
|
||||||
true, true, false, datanode, clientTraceFmt);
|
true, true, false, datanode, clientTraceFmt);
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.info("opReadBlock " + block + " received exception " + e);
|
String msg = "opReadBlock " + block + " received exception " + e;
|
||||||
sendResponse(s, ERROR, datanode.socketWriteTimeout);
|
LOG.info(msg);
|
||||||
|
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send op status
|
// send op status
|
||||||
sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
|
sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout);
|
||||||
|
|
||||||
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
||||||
|
|
||||||
|
@ -452,7 +455,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("TRANSFER: send close-ack");
|
LOG.trace("TRANSFER: send close-ack");
|
||||||
}
|
}
|
||||||
writeResponse(SUCCESS, replyOut);
|
writeResponse(SUCCESS, null, replyOut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,7 +510,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
||||||
try {
|
try {
|
||||||
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
|
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
|
||||||
writeResponse(Status.SUCCESS, out);
|
writeResponse(Status.SUCCESS, null, out);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
}
|
}
|
||||||
|
@ -577,16 +580,17 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
LOG.warn("Invalid access token in request from " + remoteAddress
|
||||||
+ " for OP_COPY_BLOCK for block " + block + " : "
|
+ " for OP_COPY_BLOCK for block " + block + " : "
|
||||||
+ e.getLocalizedMessage());
|
+ e.getLocalizedMessage());
|
||||||
sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
|
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
||||||
LOG.info("Not able to copy block " + block.getBlockId() + " to "
|
String msg = "Not able to copy block " + block.getBlockId() + " to "
|
||||||
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
|
||||||
sendResponse(s, ERROR, datanode.socketWriteTimeout);
|
LOG.info(msg);
|
||||||
|
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -606,7 +610,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
|
|
||||||
// send status first
|
// send status first
|
||||||
writeResponse(SUCCESS, reply);
|
writeResponse(SUCCESS, null, reply);
|
||||||
// send block content to the target
|
// send block content to the target
|
||||||
long read = blockSender.sendBlock(reply, baseStream,
|
long read = blockSender.sendBlock(reply, baseStream,
|
||||||
dataXceiverServer.balanceThrottler);
|
dataXceiverServer.balanceThrottler);
|
||||||
|
@ -653,21 +657,24 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
LOG.warn("Invalid access token in request from " + remoteAddress
|
||||||
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
||||||
+ e.getLocalizedMessage());
|
+ e.getLocalizedMessage());
|
||||||
sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
|
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
|
||||||
|
datanode.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
||||||
LOG.warn("Not able to receive block " + block.getBlockId() + " from "
|
String msg = "Not able to receive block " + block.getBlockId() + " from "
|
||||||
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
|
||||||
sendResponse(s, ERROR, datanode.socketWriteTimeout);
|
LOG.warn(msg);
|
||||||
|
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Socket proxySock = null;
|
Socket proxySock = null;
|
||||||
DataOutputStream proxyOut = null;
|
DataOutputStream proxyOut = null;
|
||||||
Status opStatus = SUCCESS;
|
Status opStatus = SUCCESS;
|
||||||
|
String errMsg = null;
|
||||||
BlockReceiver blockReceiver = null;
|
BlockReceiver blockReceiver = null;
|
||||||
DataInputStream proxyReply = null;
|
DataInputStream proxyReply = null;
|
||||||
|
|
||||||
|
@ -720,7 +727,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
opStatus = ERROR;
|
opStatus = ERROR;
|
||||||
LOG.info("opReplaceBlock " + block + " received exception " + ioe);
|
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
|
||||||
|
LOG.info(errMsg);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
// receive the last byte that indicates the proxy released its thread resource
|
// receive the last byte that indicates the proxy released its thread resource
|
||||||
|
@ -736,7 +744,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
// send response back
|
// send response back
|
||||||
try {
|
try {
|
||||||
sendResponse(s, opStatus, datanode.socketWriteTimeout);
|
sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
||||||
}
|
}
|
||||||
|
@ -759,21 +767,22 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
* @param opStatus status message to write
|
* @param opStatus status message to write
|
||||||
* @param timeout send timeout
|
* @param timeout send timeout
|
||||||
**/
|
**/
|
||||||
private void sendResponse(Socket s, Status status,
|
private void sendResponse(Socket s, Status status, String message,
|
||||||
long timeout) throws IOException {
|
long timeout) throws IOException {
|
||||||
DataOutputStream reply =
|
DataOutputStream reply =
|
||||||
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
|
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
|
||||||
|
|
||||||
writeResponse(status, reply);
|
writeResponse(status, message, reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeResponse(Status status, OutputStream out)
|
private void writeResponse(Status status, String message, OutputStream out)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
|
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
|
||||||
.setStatus(status)
|
.setStatus(status);
|
||||||
.build();
|
if (message != null) {
|
||||||
|
response.setMessage(message);
|
||||||
response.writeDelimitedTo(out);
|
}
|
||||||
|
response.build().writeDelimitedTo(out);
|
||||||
out.flush();
|
out.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -119,6 +119,9 @@ message BlockOpResponseProto {
|
||||||
|
|
||||||
optional string firstBadLink = 2;
|
optional string firstBadLink = 2;
|
||||||
optional OpBlockChecksumResponseProto checksumResponse = 3;
|
optional OpBlockChecksumResponseProto checksumResponse = 3;
|
||||||
|
|
||||||
|
/** explanatory text which may be useful to log on the client side */
|
||||||
|
optional string message = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -117,10 +117,8 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
throw eof;
|
throw eof;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Received: " +
|
LOG.info("Received: " +new String(retBuf));
|
||||||
StringUtils.byteToHexString(retBuf));
|
LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray()));
|
||||||
LOG.info("Expected: " +
|
|
||||||
StringUtils.byteToHexString(recvBuf.toByteArray()));
|
|
||||||
|
|
||||||
if (eofExpected) {
|
if (eofExpected) {
|
||||||
throw new IOException("Did not recieve IOException when an exception " +
|
throw new IOException("Did not recieve IOException when an exception " +
|
||||||
|
@ -129,10 +127,8 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] needed = recvBuf.toByteArray();
|
byte[] needed = recvBuf.toByteArray();
|
||||||
for (int i=0; i<retBuf.length; i++) {
|
assertEquals(StringUtils.byteToHexString(needed),
|
||||||
System.out.print(retBuf[i]);
|
StringUtils.byteToHexString(retBuf));
|
||||||
assertEquals("checking byte[" + i + "]", needed[i], retBuf[i]);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeSocket(sock);
|
IOUtils.closeSocket(sock);
|
||||||
}
|
}
|
||||||
|
@ -166,18 +162,22 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
sendOut.writeInt(0); // zero checksum
|
sendOut.writeInt(0); // zero checksum
|
||||||
|
|
||||||
//ok finally write a block with 0 len
|
//ok finally write a block with 0 len
|
||||||
sendResponse(Status.SUCCESS, "", recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
|
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
|
||||||
sendRecvData(description, false);
|
sendRecvData(description, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendResponse(Status status, String firstBadLink,
|
private void sendResponse(Status status, String firstBadLink,
|
||||||
|
String message,
|
||||||
DataOutputStream out)
|
DataOutputStream out)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Builder builder = BlockOpResponseProto.newBuilder().setStatus(status);
|
Builder builder = BlockOpResponseProto.newBuilder().setStatus(status);
|
||||||
if (firstBadLink != null) {
|
if (firstBadLink != null) {
|
||||||
builder.setFirstBadLink(firstBadLink);
|
builder.setFirstBadLink(firstBadLink);
|
||||||
}
|
}
|
||||||
|
if (message != null) {
|
||||||
|
builder.setMessage(message);
|
||||||
|
}
|
||||||
builder.build()
|
builder.build()
|
||||||
.writeDelimitedTo(out);
|
.writeDelimitedTo(out);
|
||||||
}
|
}
|
||||||
|
@ -190,11 +190,11 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
new DatanodeInfo[1], null, stage,
|
new DatanodeInfo[1], null, stage,
|
||||||
0, block.getNumBytes(), block.getNumBytes(), newGS);
|
0, block.getNumBytes(), block.getNumBytes(), newGS);
|
||||||
if (eofExcepted) {
|
if (eofExcepted) {
|
||||||
sendResponse(Status.ERROR, null, recvOut);
|
sendResponse(Status.ERROR, null, null, recvOut);
|
||||||
sendRecvData(description, true);
|
sendRecvData(description, true);
|
||||||
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
||||||
//ok finally write a block with 0 len
|
//ok finally write a block with 0 len
|
||||||
sendResponse(Status.SUCCESS, "", recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
sendRecvData(description, false);
|
sendRecvData(description, false);
|
||||||
} else {
|
} else {
|
||||||
writeZeroLengthPacket(block, description);
|
writeZeroLengthPacket(block, description);
|
||||||
|
@ -383,7 +383,7 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
// bad bytes per checksum
|
// bad bytes per checksum
|
||||||
sendOut.writeInt(-1-random.nextInt(oneMil));
|
sendOut.writeInt(-1-random.nextInt(oneMil));
|
||||||
recvBuf.reset();
|
recvBuf.reset();
|
||||||
sendResponse(Status.ERROR, null, recvOut);
|
sendResponse(Status.ERROR, null, null, recvOut);
|
||||||
sendRecvData("wrong bytesPerChecksum while writing", true);
|
sendRecvData("wrong bytesPerChecksum while writing", true);
|
||||||
|
|
||||||
sendBuf.reset();
|
sendBuf.reset();
|
||||||
|
@ -403,7 +403,7 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
-1 - random.nextInt(oneMil)); // bad datalen
|
-1 - random.nextInt(oneMil)); // bad datalen
|
||||||
hdr.write(sendOut);
|
hdr.write(sendOut);
|
||||||
|
|
||||||
sendResponse(Status.SUCCESS, "", recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
|
new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
|
||||||
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
|
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
|
||||||
true);
|
true);
|
||||||
|
@ -428,7 +428,7 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
sendOut.writeInt(0); // zero checksum
|
sendOut.writeInt(0); // zero checksum
|
||||||
sendOut.flush();
|
sendOut.flush();
|
||||||
//ok finally write a block with 0 len
|
//ok finally write a block with 0 len
|
||||||
sendResponse(Status.SUCCESS, "", recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
|
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
|
||||||
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
|
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
|
||||||
|
|
||||||
|
@ -462,7 +462,7 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
|
|
||||||
// negative length is ok. Datanode assumes we want to read the whole block.
|
// negative length is ok. Datanode assumes we want to read the whole block.
|
||||||
recvBuf.reset();
|
recvBuf.reset();
|
||||||
sendResponse(Status.SUCCESS, null, recvOut);
|
sendResponse(Status.SUCCESS, null, null, recvOut);
|
||||||
sendBuf.reset();
|
sendBuf.reset();
|
||||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||||
0L, -1L-random.nextInt(oneMil));
|
0L, -1L-random.nextInt(oneMil));
|
||||||
|
@ -471,7 +471,11 @@ public class TestDataTransferProtocol extends TestCase {
|
||||||
|
|
||||||
// length is more than size of block.
|
// length is more than size of block.
|
||||||
recvBuf.reset();
|
recvBuf.reset();
|
||||||
sendResponse(Status.ERROR, null, recvOut);
|
sendResponse(Status.ERROR, null,
|
||||||
|
"opReadBlock " + firstBlock +
|
||||||
|
" received exception java.io.IOException: " +
|
||||||
|
"Offset 0 and length 4097 don't match block " + firstBlock + " ( blockLen 4096 )",
|
||||||
|
recvOut);
|
||||||
sendBuf.reset();
|
sendBuf.reset();
|
||||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||||
0L, fileLen+1);
|
0L, fileLen+1);
|
||||||
|
|
Loading…
Reference in New Issue