HDFS-11062. Ozone:SCM: Remove null command. Contributed by Yuanbo Liu.

This commit is contained in:
Xiaoyu Yao 2017-04-11 13:33:40 -07:00
parent 349a19b2ad
commit a2ff806d2e
8 changed files with 44 additions and 153 deletions

View File

@ -125,11 +125,6 @@ public class HeartbeatEndpointTask
private void processResponse(SCMHeartbeatResponseProto response) { private void processResponse(SCMHeartbeatResponseProto response) {
for (SCMCommandResponseProto commandResponseProto : response for (SCMCommandResponseProto commandResponseProto : response
.getCommandsList()) { .getCommandsList()) {
if (commandResponseProto.getCmdType() ==
StorageContainerDatanodeProtocolProtos.Type.nullCmd) {
//this.context.addCommand(NullCommand.newBuilder().build());
LOG.debug("Discarding a null command from SCM.");
}
if (commandResponseProto.getCmdType() == if (commandResponseProto.getCmdType() ==
StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) { StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) {
this.context.addCommand(SendContainerCommand.getFromProtobuf( this.context.addCommand(SendContainerCommand.getFromProtobuf(

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
/**
* For each command that SCM can return we have a class in this commands
* directory. This is the Null command, that tells datanode that no action is
* needed from it.
*/
public class NullCommand extends SCMCommand<NullCmdResponseProto> {
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
public Type getType() {
return Type.nullCmd;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
public byte[] getProtoBufMessage() {
return NullCmdResponseProto.newBuilder().build().toByteArray();
}
/**
* Returns a NullCommand class from NullCommandResponse Proto.
* @param unused - unused
* @return NullCommand
*/
public static NullCommand getFromProtobuf(final NullCmdResponseProto
unused) {
return new NullCommand();
}
/**
* returns a new builder.
* @return Builder
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* A Builder class this is the standard pattern we are using for all commands.
*/
public static class Builder {
/**
* Return a null command.
* @return - NullCommand.
*/
public NullCommand build() {
return new NullCommand();
}
}
}

View File

@ -42,8 +42,6 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState; .StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -58,6 +56,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type; .StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -296,26 +296,27 @@ public class StorageContainerManager
public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd) public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
throws InvalidProtocolBufferException { throws InvalidProtocolBufferException {
Type type = cmd.getType(); Type type = cmd.getType();
SCMCommandResponseProto.Builder builder =
SCMCommandResponseProto.newBuilder();
switch (type) { switch (type) {
case nullCmd: case registeredCommand:
return getNullCmdResponse(); return builder.setCmdType(Type.registeredCommand)
.setRegisteredProto(
SCMRegisteredCmdResponseProto.getDefaultInstance())
.build();
case versionCommand:
return builder.setCmdType(Type.versionCommand)
.setVersionProto(SCMVersionResponseProto.getDefaultInstance())
.build();
case sendContainerReport:
return builder.setCmdType(Type.sendContainerReport)
.setSendReport(SendContainerReportProto.getDefaultInstance())
.build();
default: default:
throw new IllegalArgumentException("Not implemented"); throw new IllegalArgumentException("Not implemented");
} }
} }
/**
* Returns a null command response.
* @return
* @throws InvalidProtocolBufferException
*/
private static SCMCommandResponseProto getNullCmdResponse() {
return SCMCommandResponseProto.newBuilder()
.setCmdType(Type.nullCmd)
.setNullCommand(NullCmdResponseProto.getDefaultInstance())
.build();
}
@VisibleForTesting @VisibleForTesting
public static SCMRegisteredCmdResponseProto getRegisteredResponse( public static SCMRegisteredCmdResponseProto getRegisteredResponse(
SCMCommand cmd, SCMNodeAddressList addressList) { SCMCommand cmd, SCMNodeAddressList addressList) {
@ -465,11 +466,11 @@ public class StorageContainerManager
SCMNodeReport nodeReport, ReportState reportState) throws IOException { SCMNodeReport nodeReport, ReportState reportState) throws IOException {
List<SCMCommand> commands = List<SCMCommand> commands =
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport); getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
List<SCMCommandResponseProto> cmdReponses = new LinkedList<>(); List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) { for (SCMCommand cmd : commands) {
cmdReponses.add(getCommandResponse(cmd)); cmdResponses.add(getCommandResponse(cmd));
} }
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdReponses) return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build(); .build();
} }
@ -501,8 +502,9 @@ public class StorageContainerManager
sendContainerReport(ContainerReportsProto reports) throws IOException { sendContainerReport(ContainerReportsProto reports) throws IOException {
// TODO : fix this in the server side code changes for handling this request // TODO : fix this in the server side code changes for handling this request
// correctly. // correctly.
return SCMHeartbeatResponseProto.newBuilder() List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
.addCommands(getNullCmdResponse()).build(); return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build();
} }
/** /**

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.scm.node; package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.util.HashMap; import java.util.HashMap;
@ -39,7 +38,7 @@ public class CommandQueue {
private final Map<DatanodeID, List<SCMCommand>> commandMap; private final Map<DatanodeID, List<SCMCommand>> commandMap;
private final Lock lock; private final Lock lock;
// This map is used as default return value containing one null command. // This map is used as default return value.
private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>(); private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
/** /**
@ -48,12 +47,11 @@ public class CommandQueue {
public CommandQueue() { public CommandQueue() {
commandMap = new HashMap<>(); commandMap = new HashMap<>();
lock = new ReentrantLock(); lock = new ReentrantLock();
DEFAULT_LIST.add(NullCommand.newBuilder().build());
} }
/** /**
* Returns a list of Commands for the datanode to execute, if we have no * Returns a list of Commands for the datanode to execute, if we have no
* commands returns a list with Null Command otherwise the current set of * commands returns a empty list otherwise the current set of
* commands are returned and command map set to empty list again. * commands are returned and command map set to empty list again.
* *
* @param datanodeID DatanodeID * @param datanodeID DatanodeID
@ -67,8 +65,7 @@ public class CommandQueue {
if (commandMap.containsKey(datanodeID)) { if (commandMap.containsKey(datanodeID)) {
List temp = commandMap.get(datanodeID); List temp = commandMap.get(datanodeID);
if (temp.size() > 0) { if (temp.size() > 0) {
LinkedList<SCMCommand> emptyList = new LinkedList<>(); commandMap.put(datanodeID, DEFAULT_LIST);
commandMap.put(datanodeID, emptyList);
return temp; return temp;
} }
} }

View File

@ -181,12 +181,6 @@ message ContainerNodeIDProto {
} }
/**
* Empty Command Response
*/
message NullCmdResponseProto {
}
/** /**
This command tells the data node to send in the container report when possible This command tells the data node to send in the container report when possible
@ -198,7 +192,6 @@ message SendContainerReportProto {
Type of commands supported by SCM to datanode protocol. Type of commands supported by SCM to datanode protocol.
*/ */
enum Type { enum Type {
nullCmd = 1;
versionCommand = 2; versionCommand = 2;
registeredCommand = 3; registeredCommand = 3;
sendContainerReport = 4; sendContainerReport = 4;
@ -209,10 +202,9 @@ enum Type {
*/ */
message SCMCommandResponseProto { message SCMCommandResponseProto {
required Type cmdType = 2; // Type of the command required Type cmdType = 2; // Type of the command
optional NullCmdResponseProto nullCommand = 3; optional SCMRegisteredCmdResponseProto registeredProto = 3;
optional SCMRegisteredCmdResponseProto registeredProto = 4; optional SCMVersionResponseProto versionProto = 4;
optional SCMVersionResponseProto versionProto = 5; optional SendContainerReportProto sendReport = 5;
optional SendContainerReportProto sendReport = 6;
} }

View File

@ -20,13 +20,16 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -137,25 +140,10 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
heartbeatCount.incrementAndGet(); heartbeatCount.incrementAndGet();
this.reportState = reportState; this.reportState = reportState;
sleepIfNeeded(); sleepIfNeeded();
return getNullRespose(); List<SCMCommandResponseProto>
} cmdResponses = new LinkedList<>();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
private StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
getNullRespose() throws
com.google.protobuf.InvalidProtocolBufferException {
StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
cmdResponse = StorageContainerDatanodeProtocolProtos
.SCMCommandResponseProto
.newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos
.Type.nullCmd)
.setNullCommand(
StorageContainerDatanodeProtocolProtos.NullCmdResponseProto
.parseFrom(
NullCommand.newBuilder().build().getProtoBufMessage()))
.build(); .build();
return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
.newBuilder()
.addCommands(cmdResponse).build();
} }
/** /**
@ -188,13 +176,16 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
* @throws IOException * @throws IOException
*/ */
@Override @Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto public SCMHeartbeatResponseProto
sendContainerReport(StorageContainerDatanodeProtocolProtos sendContainerReport(StorageContainerDatanodeProtocolProtos
.ContainerReportsProto reports) throws IOException { .ContainerReportsProto reports) throws IOException {
Preconditions.checkNotNull(reports); Preconditions.checkNotNull(reports);
containerReportsCount.incrementAndGet(); containerReportsCount.incrementAndGet();
closedContainerCount.addAndGet(reports.getReportsCount()); closedContainerCount.addAndGet(reports.getReportsCount());
return getNullRespose(); List<SCMCommandResponseProto>
cmdResponses = new LinkedList<>();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build();
} }
public ReportState getReportState() { public ReportState getReportState() {

View File

@ -47,8 +47,6 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport; .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.ozone.scm.VersionInfo;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -286,10 +284,7 @@ public class TestEndPoint {
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(dataNode, nrb.build(), defaultReportState); .sendHeartbeat(dataNode, nrb.build(), defaultReportState);
Assert.assertNotNull(responseProto); Assert.assertNotNull(responseProto);
Assert.assertEquals(1, responseProto.getCommandsCount()); Assert.assertEquals(0, responseProto.getCommandsCount());
Assert.assertNotNull(responseProto.getCommandsList().get(0));
Assert.assertEquals(responseProto.getCommandsList().get(0).getCmdType(),
Type.nullCmd);
} }
} }

View File

@ -176,8 +176,8 @@ public class TestContainerPlacement {
thrown.expect(IOException.class); thrown.expect(IOException.class);
thrown.expectMessage( thrown.expectMessage(
startsWith("Unable to find enough nodes that meet the space " + startsWith("Unable to find enough nodes that meet "
"requirement in healthy node set.")); + "the space requirement"));
String container2 = UUID.randomUUID().toString(); String container2 = UUID.randomUUID().toString();
containerManager.allocateContainer(container2, containerManager.allocateContainer(container2,
ScmClient.ReplicationFactor.THREE); ScmClient.ReplicationFactor.THREE);