HDFS-11492. Ozone: Add the ability to handle sendContainerReport Command. Contributed by Anu Engineer.
This commit is contained in:
parent
603f2c18ec
commit
6b5dee9c38
|
@ -18,7 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.container.common.helpers;
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -49,6 +51,7 @@ public class ContainerData {
|
||||||
public ContainerData(String containerName) {
|
public ContainerData(String containerName) {
|
||||||
this.metadata = new TreeMap<>();
|
this.metadata = new TreeMap<>();
|
||||||
this.containerName = containerName;
|
this.containerName = containerName;
|
||||||
|
this.open = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -213,15 +216,20 @@ public class ContainerData {
|
||||||
* checks if the container is open.
|
* checks if the container is open.
|
||||||
* @return - boolean
|
* @return - boolean
|
||||||
*/
|
*/
|
||||||
public boolean isOpen() {
|
public synchronized boolean isOpen() {
|
||||||
return open;
|
return open;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks this container as closed.
|
* Marks this container as closed.
|
||||||
*/
|
*/
|
||||||
public void closeContainer() {
|
public synchronized void closeContainer() {
|
||||||
this.open = false;
|
setOpen(false);
|
||||||
|
|
||||||
|
// Some thing brain dead for now. name + Time stamp of when we get the close
|
||||||
|
// container message.
|
||||||
|
setHash(DigestUtils.sha256Hex(this.getContainerName() +
|
||||||
|
Long.toString(Time.monotonicNow())));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -242,7 +250,7 @@ public class ContainerData {
|
||||||
* Sets the open or closed values.
|
* Sets the open or closed values.
|
||||||
* @param open
|
* @param open
|
||||||
*/
|
*/
|
||||||
public void setOpen(boolean open) {
|
public synchronized void setOpen(boolean open) {
|
||||||
this.open = open;
|
this.open = open;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,141 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Container Report iterates the closed containers and sends a container report
|
||||||
|
* to SCM.
|
||||||
|
* <p>
|
||||||
|
* The protobuf counter part of this class looks like this.
|
||||||
|
* message ContainerInfo {
|
||||||
|
* required string containerName = 1;
|
||||||
|
* repeated bytes finalhash = 2;
|
||||||
|
* optional int64 size = 3;
|
||||||
|
* optional int64 keycount = 4;
|
||||||
|
* }
|
||||||
|
*/
|
||||||
|
public class ContainerReport {
|
||||||
|
private static final int UNKNOWN = -1;
|
||||||
|
private final String containerName;
|
||||||
|
private final String finalhash;
|
||||||
|
private long size;
|
||||||
|
private long keyCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the ContainerReport.
|
||||||
|
*
|
||||||
|
* @param containerName - Container Name.
|
||||||
|
* @param finalhash - Final Hash.
|
||||||
|
*/
|
||||||
|
public ContainerReport(String containerName, String finalhash) {
|
||||||
|
this.containerName = containerName;
|
||||||
|
this.finalhash = finalhash;
|
||||||
|
this.size = UNKNOWN;
|
||||||
|
this.keyCount = UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a containerReport from protobuf class.
|
||||||
|
*
|
||||||
|
* @param info - ContainerInfo.
|
||||||
|
* @return - ContainerReport.
|
||||||
|
*/
|
||||||
|
public static ContainerReport getFromProtoBuf(ContainerInfo info) {
|
||||||
|
Preconditions.checkNotNull(info);
|
||||||
|
ContainerReport report = new ContainerReport(info.getContainerName(),
|
||||||
|
info.getFinalhash());
|
||||||
|
if (info.hasSize()) {
|
||||||
|
report.setSize(info.getSize());
|
||||||
|
}
|
||||||
|
if (info.hasKeycount()) {
|
||||||
|
report.setKeyCount(info.getKeycount());
|
||||||
|
}
|
||||||
|
return report;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the container name.
|
||||||
|
*
|
||||||
|
* @return - Name
|
||||||
|
*/
|
||||||
|
public String getContainerName() {
|
||||||
|
return containerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the final signature for this container.
|
||||||
|
*
|
||||||
|
* @return - hash
|
||||||
|
*/
|
||||||
|
public String getFinalhash() {
|
||||||
|
return finalhash;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a positive number it is a valid number, -1 if not known.
|
||||||
|
*
|
||||||
|
* @return size or -1
|
||||||
|
*/
|
||||||
|
public long getSize() {
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the size of the container on disk.
|
||||||
|
*
|
||||||
|
* @param size - int
|
||||||
|
*/
|
||||||
|
public void setSize(long size) {
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets number of keys in the container if known.
|
||||||
|
*
|
||||||
|
* @return - Number of keys or -1 for not known.
|
||||||
|
*/
|
||||||
|
public long getKeyCount() {
|
||||||
|
return keyCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the key count.
|
||||||
|
*
|
||||||
|
* @param keyCount - Key Count
|
||||||
|
*/
|
||||||
|
public void setKeyCount(long keyCount) {
|
||||||
|
this.keyCount = keyCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a containerInfo protobuf message from ContainerReports.
|
||||||
|
*
|
||||||
|
* @return ContainerInfo
|
||||||
|
*/
|
||||||
|
public ContainerInfo getProtoBufMessage() {
|
||||||
|
return ContainerInfo.newBuilder()
|
||||||
|
.setContainerName(this.getContainerName())
|
||||||
|
.setKeycount(this.getKeyCount())
|
||||||
|
.setSize(this.getSize())
|
||||||
|
.setFinalhash(this.getFinalhash())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
|
@ -611,6 +612,25 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
return nrb.build();
|
return nrb.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets container reports.
|
||||||
|
*
|
||||||
|
* @return List of all closed containers.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<ContainerData> getContainerReports() throws IOException {
|
||||||
|
LOG.debug("Starting container report iteration.");
|
||||||
|
// No need for locking since containerMap is a ConcurrentSkipListMap
|
||||||
|
// And we can never get the exact state since close might happen
|
||||||
|
// after we iterate a point.
|
||||||
|
return containerMap.entrySet().stream()
|
||||||
|
.filter(containerStatus ->
|
||||||
|
containerStatus.getValue().getContainer().isOpen())
|
||||||
|
.map(containerStatus -> containerStatus.getValue().getContainer())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the Key Manager.
|
* Sets the Key Manager.
|
||||||
*
|
*
|
||||||
|
|
|
@ -25,8 +25,7 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.util.RwLock;
|
import org.apache.hadoop.hdfs.util.RwLock;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -151,4 +150,11 @@ public interface ContainerManager extends RwLock {
|
||||||
* @return node report.
|
* @return node report.
|
||||||
*/
|
*/
|
||||||
SCMNodeReport getNodeReport() throws IOException;
|
SCMNodeReport getNodeReport() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets container reports.
|
||||||
|
* @return List of all closed containers.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
List<ContainerData> getContainerReports() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
|
||||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -31,6 +34,7 @@ import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State Machine Class.
|
* State Machine Class.
|
||||||
|
@ -46,12 +50,15 @@ public class DatanodeStateMachine implements Closeable {
|
||||||
private StateContext context;
|
private StateContext context;
|
||||||
private final OzoneContainer container;
|
private final OzoneContainer container;
|
||||||
private DatanodeID datanodeID = null;
|
private DatanodeID datanodeID = null;
|
||||||
|
private final CommandDispatcher commandDispatcher;
|
||||||
|
private long commandsHandled;
|
||||||
|
private AtomicLong nextHB;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a a datanode state machine.
|
* Constructs a a datanode state machine.
|
||||||
*
|
*
|
||||||
* @param datanodeID - DatanodeID used to identify a datanode
|
* @param datanodeID - DatanodeID used to identify a datanode
|
||||||
* @param conf - Configration.
|
* @param conf - Configuration.
|
||||||
*/
|
*/
|
||||||
public DatanodeStateMachine(DatanodeID datanodeID,
|
public DatanodeStateMachine(DatanodeID datanodeID,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
@ -65,6 +72,17 @@ public class DatanodeStateMachine implements Closeable {
|
||||||
OzoneClientUtils.getScmHeartbeatInterval(conf));
|
OzoneClientUtils.getScmHeartbeatInterval(conf));
|
||||||
container = new OzoneContainer(conf);
|
container = new OzoneContainer(conf);
|
||||||
this.datanodeID = datanodeID;
|
this.datanodeID = datanodeID;
|
||||||
|
nextHB = new AtomicLong(Time.monotonicNow());
|
||||||
|
|
||||||
|
|
||||||
|
// When we add new handlers just adding a new handler here should do the
|
||||||
|
// trick.
|
||||||
|
commandDispatcher = CommandDispatcher.newBuilder()
|
||||||
|
.addHandler(new ContainerReportHandler())
|
||||||
|
.setConnectionManager(connectionManager)
|
||||||
|
.setContainer(container)
|
||||||
|
.setContext(context)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public DatanodeStateMachine(Configuration conf)
|
public DatanodeStateMachine(Configuration conf)
|
||||||
|
@ -104,18 +122,19 @@ public class DatanodeStateMachine implements Closeable {
|
||||||
*/
|
*/
|
||||||
private void start() throws IOException {
|
private void start() throws IOException {
|
||||||
long now = 0;
|
long now = 0;
|
||||||
long nextHB = 0;
|
|
||||||
container.start();
|
container.start();
|
||||||
|
initCommandHandlerThread(conf);
|
||||||
while (context.getState() != DatanodeStates.SHUTDOWN) {
|
while (context.getState() != DatanodeStates.SHUTDOWN) {
|
||||||
try {
|
try {
|
||||||
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
|
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
|
||||||
nextHB = Time.monotonicNow() + heartbeatFrequency;
|
nextHB.set(Time.monotonicNow() + heartbeatFrequency);
|
||||||
context.setReportState(container.getNodeReport());
|
context.setReportState(container.getNodeReport());
|
||||||
context.execute(executorService, heartbeatFrequency,
|
context.execute(executorService, heartbeatFrequency,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
now = Time.monotonicNow();
|
now = Time.monotonicNow();
|
||||||
if (now < nextHB) {
|
if (now < nextHB.get()) {
|
||||||
Thread.sleep(nextHB - now);
|
Thread.sleep(nextHB.get() - now);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Unable to finish the execution.", e);
|
LOG.error("Unable to finish the execution.", e);
|
||||||
|
@ -162,7 +181,7 @@ public class DatanodeStateMachine implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||||
LOG.error("Unable to shutdown statemachine properly.");
|
LOG.error("Unable to shutdown state machine properly.");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("Error attempting to shutdown.", e);
|
LOG.error("Error attempting to shutdown.", e);
|
||||||
|
@ -289,4 +308,61 @@ public class DatanodeStateMachine implements Closeable {
|
||||||
&& this.getContext().getExecutionCount() == 0
|
&& this.getContext().getExecutionCount() == 0
|
||||||
&& this.getContext().getState() == DatanodeStates.SHUTDOWN;
|
&& this.getContext().getState() == DatanodeStates.SHUTDOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a command handler thread.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
*/
|
||||||
|
private void initCommandHandlerThread(Configuration conf) {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Task that periodically checks if we have any outstanding commands.
|
||||||
|
* It is assumed that commands can be processed slowly and in order.
|
||||||
|
* This assumption might change in future. Right now due to this assumption
|
||||||
|
* we have single command queue process thread.
|
||||||
|
*/
|
||||||
|
Runnable processCommandQueue = () -> {
|
||||||
|
long now;
|
||||||
|
while (getContext().getState() != DatanodeStates.SHUTDOWN) {
|
||||||
|
SCMCommand command = getContext().getNextCommand();
|
||||||
|
if (command != null) {
|
||||||
|
commandDispatcher.handle(command);
|
||||||
|
commandsHandled++;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
// Sleep till the next HB + 1 second.
|
||||||
|
now = Time.monotonicNow();
|
||||||
|
if (nextHB.get() > now) {
|
||||||
|
Thread.sleep((nextHB.get() - now) + 1000L);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore this exception.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// We will have only one thread for command processing in a datanode.
|
||||||
|
Thread cmdProcessThread = new Thread(processCommandQueue);
|
||||||
|
cmdProcessThread.setDaemon(true);
|
||||||
|
cmdProcessThread.setName("Command processor thread");
|
||||||
|
cmdProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
|
||||||
|
// Let us just restart this thread after logging a critical error.
|
||||||
|
// if this thread is not running we cannot handle commands from SCM.
|
||||||
|
LOG.error("Critical Error : Command processor thread encountered an " +
|
||||||
|
"error. Thread: {}", t.toString(), e);
|
||||||
|
cmdProcessThread.start();
|
||||||
|
});
|
||||||
|
cmdProcessThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of commands handled by the datanode.
|
||||||
|
* @return count
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getCommandHandled() {
|
||||||
|
return commandsHandled;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||||
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatches command to the correct handler.
|
||||||
|
*/
|
||||||
|
public final class CommandDispatcher {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(CommandDispatcher.class);
|
||||||
|
private final StateContext context;
|
||||||
|
private final Map<Type, CommandHandler> handlerMap;
|
||||||
|
private final OzoneContainer container;
|
||||||
|
private final SCMConnectionManager connectionManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a command Dispatcher.
|
||||||
|
* @param context - Context.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Constructs a command dispatcher.
|
||||||
|
*
|
||||||
|
* @param container - Ozone Container
|
||||||
|
* @param context - Context
|
||||||
|
* @param handlers - Set of handlers.
|
||||||
|
*/
|
||||||
|
private CommandDispatcher(OzoneContainer container, SCMConnectionManager
|
||||||
|
connectionManager, StateContext context,
|
||||||
|
CommandHandler... handlers) {
|
||||||
|
Preconditions.checkNotNull(context);
|
||||||
|
Preconditions.checkNotNull(handlers);
|
||||||
|
Preconditions.checkArgument(handlers.length > 0);
|
||||||
|
Preconditions.checkNotNull(container);
|
||||||
|
Preconditions.checkNotNull(connectionManager);
|
||||||
|
this.context = context;
|
||||||
|
this.container = container;
|
||||||
|
this.connectionManager = connectionManager;
|
||||||
|
handlerMap = new HashMap<>();
|
||||||
|
for (CommandHandler h : handlers) {
|
||||||
|
if(handlerMap.containsKey(h.getCommandType())){
|
||||||
|
LOG.error("Duplicate handler for the same command. Exiting. Handle " +
|
||||||
|
"key : { }", h.getCommandType().getDescriptorForType().getName());
|
||||||
|
throw new IllegalArgumentException("Duplicate handler for the same " +
|
||||||
|
"command.");
|
||||||
|
}
|
||||||
|
handlerMap.put(h.getCommandType(), h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatch the command to the correct handler.
|
||||||
|
*
|
||||||
|
* @param command - SCM Command.
|
||||||
|
*/
|
||||||
|
public void handle(SCMCommand command) {
|
||||||
|
Preconditions.checkNotNull(command);
|
||||||
|
CommandHandler handler = handlerMap.get(command.getType());
|
||||||
|
if (handler != null) {
|
||||||
|
handler.handle(command, container, context, connectionManager);
|
||||||
|
} else {
|
||||||
|
LOG.error("Unknown SCM Command queued. There is no handler for this " +
|
||||||
|
"command. Command: {}", command.getType().getDescriptorForType()
|
||||||
|
.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Builder newBuilder() {
|
||||||
|
return new Builder();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to construct command dispatcher.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private final List<CommandHandler> handlerList;
|
||||||
|
private OzoneContainer container;
|
||||||
|
private StateContext context;
|
||||||
|
private SCMConnectionManager connectionManager;
|
||||||
|
|
||||||
|
public Builder() {
|
||||||
|
handlerList = new LinkedList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a handler.
|
||||||
|
*
|
||||||
|
* @param handler - handler
|
||||||
|
* @return Builder
|
||||||
|
*/
|
||||||
|
public Builder addHandler(CommandHandler handler) {
|
||||||
|
Preconditions.checkNotNull(handler);
|
||||||
|
handlerList.add(handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the OzoneContainer.
|
||||||
|
*
|
||||||
|
* @param container - ozone container.
|
||||||
|
* @return Builder
|
||||||
|
*/
|
||||||
|
public Builder setContainer(OzoneContainer container) {
|
||||||
|
Preconditions.checkNotNull(container);
|
||||||
|
this.container = container;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the Connection Manager.
|
||||||
|
*
|
||||||
|
* @param connectionManager
|
||||||
|
* @return this
|
||||||
|
*/
|
||||||
|
public Builder setConnectionManager(SCMConnectionManager
|
||||||
|
connectionManager) {
|
||||||
|
Preconditions.checkNotNull(connectionManager);
|
||||||
|
this.connectionManager = connectionManager;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the Context.
|
||||||
|
*
|
||||||
|
* @param context - StateContext
|
||||||
|
* @return this
|
||||||
|
*/
|
||||||
|
public Builder setContext(StateContext context) {
|
||||||
|
Preconditions.checkNotNull(context);
|
||||||
|
this.context = context;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds a command Dispatcher.
|
||||||
|
* @return Command Dispatcher.
|
||||||
|
*/
|
||||||
|
public CommandDispatcher build() {
|
||||||
|
Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
|
||||||
|
" manager.");
|
||||||
|
Preconditions.checkNotNull(this.container, "Missing container.");
|
||||||
|
Preconditions.checkNotNull(this.context, "Missing context.");
|
||||||
|
Preconditions.checkArgument(this.handlerList.size() > 0);
|
||||||
|
return new CommandDispatcher(this.container, this.connectionManager,
|
||||||
|
this.context, handlerList.toArray(
|
||||||
|
new CommandHandler[handlerList.size()]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||||
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic interface for handlers.
|
||||||
|
*/
|
||||||
|
public interface CommandHandler {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles a given SCM command.
|
||||||
|
* @param command - SCM Command
|
||||||
|
* @param container - Ozone Container.
|
||||||
|
* @param context - Current Context.
|
||||||
|
* @param connectionManager - The SCMs that we are talking to.
|
||||||
|
*/
|
||||||
|
void handle(SCMCommand command, OzoneContainer container,
|
||||||
|
StateContext context, SCMConnectionManager connectionManager);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the command type that this command handler handles.
|
||||||
|
* @return Type
|
||||||
|
*/
|
||||||
|
Type getCommandType();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns number of times this handler has been invoked.
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
int getInvocationCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the average time this function takes to run.
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
long getAverageRunTime();
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,122 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||||
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Container Report handler.
|
||||||
|
*/
|
||||||
|
public class ContainerReportHandler implements CommandHandler {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ContainerReportHandler.class);
|
||||||
|
private int invocationCount;
|
||||||
|
private long totalTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a ContainerReport handler.
|
||||||
|
*/
|
||||||
|
public ContainerReportHandler() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles a given SCM command.
|
||||||
|
*
|
||||||
|
* @param command - SCM Command
|
||||||
|
* @param container - Ozone Container.
|
||||||
|
* @param context - Current Context.
|
||||||
|
* @param connectionManager - The SCMs that we are talking to.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void handle(SCMCommand command, OzoneContainer container,
|
||||||
|
StateContext context, SCMConnectionManager connectionManager) {
|
||||||
|
LOG.debug("Processing Container Report.");
|
||||||
|
invocationCount++;
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
try {
|
||||||
|
ContainerReportsProto.Builder contianerReportsBuilder =
|
||||||
|
ContainerReportsProto.newBuilder();
|
||||||
|
List<ContainerData> closedContainerList = container.getContainerReports();
|
||||||
|
for (ContainerData cd : closedContainerList) {
|
||||||
|
ContainerReport report =
|
||||||
|
new ContainerReport(cd.getContainerName(), cd.getHash());
|
||||||
|
contianerReportsBuilder.addReports(report.getProtoBufMessage());
|
||||||
|
}
|
||||||
|
contianerReportsBuilder.setType(ContainerReportsProto.reportType
|
||||||
|
.fullReport);
|
||||||
|
|
||||||
|
// TODO : We send this report to all SCMs.Check if it is enough only to
|
||||||
|
// send to the leader once we have RAFT enabled SCMs.
|
||||||
|
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
|
||||||
|
endPoint.getEndPoint().sendContainerReport(
|
||||||
|
contianerReportsBuilder.build());
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Unable to process the Container Report command.", ex);
|
||||||
|
} finally {
|
||||||
|
long endTime = Time.monotonicNow();
|
||||||
|
totalTime += endTime - startTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the command type that this command handler handles.
|
||||||
|
*
|
||||||
|
* @return Type
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Type getCommandType() {
|
||||||
|
return Type.sendContainerReport;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns number of times this handler has been invoked.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getInvocationCount() {
|
||||||
|
return invocationCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the average time this function takes to run.
|
||||||
|
*
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long getAverageRunTime() {
|
||||||
|
if (invocationCount > 0) {
|
||||||
|
return totalTime / invocationCount;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
||||||
|
@ -176,4 +177,13 @@ public class OzoneContainer {
|
||||||
public int getContainerServerPort() {
|
public int getContainerServerPort() {
|
||||||
return server.getIPCPort();
|
return server.getIPCPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of closed containers.
|
||||||
|
* @return - List of closed containers.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public List<ContainerData> getContainerReports() throws IOException {
|
||||||
|
return this.manager.getContainerReports();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||||
|
@ -61,4 +62,12 @@ public interface StorageContainerDatanodeProtocol {
|
||||||
SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
|
SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
|
||||||
String[] scmAddresses) throws IOException;
|
String[] scmAddresses) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a container report.
|
||||||
|
* @param reports -- Container report
|
||||||
|
* @return HeartbeatRespose.nullcommand.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.ReportState;
|
.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -31,14 +32,12 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
|
.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -159,4 +158,23 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a container report.
|
||||||
|
*
|
||||||
|
* @param reports -- Container report
|
||||||
|
* @return HeartbeatRespose.nullcommand.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public SCMHeartbeatResponseProto sendContainerReport(
|
||||||
|
ContainerReportsProto reports) throws IOException {
|
||||||
|
final SCMHeartbeatResponseProto resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -84,4 +85,16 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SCMHeartbeatResponseProto
|
||||||
|
sendContainerReport(RpcController controller,
|
||||||
|
ContainerReportsProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
return impl.sendContainerReport(request);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -34,11 +34,12 @@ import org.apache.hadoop.scm.client.ScmClient;
|
||||||
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
|
|
||||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos;
|
.StorageContainerDatanodeProtocolProtos;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.ReportState;
|
.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -297,16 +298,24 @@ public class StorageContainerManager
|
||||||
Type type = cmd.getType();
|
Type type = cmd.getType();
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case nullCmd:
|
case nullCmd:
|
||||||
Preconditions.checkState(cmd.getClass() == NullCommand.class);
|
return getNullCmdResponse();
|
||||||
return SCMCommandResponseProto.newBuilder().setCmdType(cmd.getType())
|
|
||||||
.setNullCommand(
|
|
||||||
NullCmdResponseProto.parseFrom(cmd.getProtoBufMessage()))
|
|
||||||
.build();
|
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Not implemented");
|
throw new IllegalArgumentException("Not implemented");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a null command response.
|
||||||
|
* @return
|
||||||
|
* @throws InvalidProtocolBufferException
|
||||||
|
*/
|
||||||
|
private static SCMCommandResponseProto getNullCmdResponse() {
|
||||||
|
return SCMCommandResponseProto.newBuilder()
|
||||||
|
.setCmdType(Type.nullCmd)
|
||||||
|
.setNullCommand(NullCmdResponseProto.getDefaultInstance())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static SCMRegisteredCmdResponseProto getRegisteredResponse(
|
public static SCMRegisteredCmdResponseProto getRegisteredResponse(
|
||||||
SCMCommand cmd, SCMNodeAddressList addressList) {
|
SCMCommand cmd, SCMNodeAddressList addressList) {
|
||||||
|
@ -480,6 +489,22 @@ public class StorageContainerManager
|
||||||
return getRegisteredResponse(scmNodeManager.register(datanodeID), null);
|
return getRegisteredResponse(scmNodeManager.register(datanodeID), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a container report.
|
||||||
|
*
|
||||||
|
* @param reports -- Container report
|
||||||
|
* @return HeartbeatRespose.nullcommand.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public SCMHeartbeatResponseProto
|
||||||
|
sendContainerReport(ContainerReportsProto reports) throws IOException {
|
||||||
|
// TODO : fix this in the server side code changes for handling this request
|
||||||
|
// correctly.
|
||||||
|
return SCMHeartbeatResponseProto.newBuilder()
|
||||||
|
.addCommands(getNullCmdResponse()).build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the Number of Datanodes that are communicating with SCM.
|
* Returns the Number of Datanodes that are communicating with SCM.
|
||||||
*
|
*
|
||||||
|
|
|
@ -96,7 +96,7 @@ A container report contains the following information.
|
||||||
*/
|
*/
|
||||||
message ContainerInfo {
|
message ContainerInfo {
|
||||||
required string containerName = 1;
|
required string containerName = 1;
|
||||||
repeated bytes finalhash = 2;
|
required string finalhash = 2;
|
||||||
optional int64 size = 3;
|
optional int64 size = 3;
|
||||||
optional int64 keycount = 4;
|
optional int64 keycount = 4;
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ message ContainerInfo {
|
||||||
A set of container reports, max count is generally set to
|
A set of container reports, max count is generally set to
|
||||||
8192 since that keeps the size of the reports under 1 MB.
|
8192 since that keeps the size of the reports under 1 MB.
|
||||||
*/
|
*/
|
||||||
message ContainerReports {
|
message ContainerReportsProto {
|
||||||
enum reportType {
|
enum reportType {
|
||||||
fullReport = 0;
|
fullReport = 0;
|
||||||
deltaReport = 1;
|
deltaReport = 1;
|
||||||
|
@ -306,4 +306,11 @@ service StorageContainerDatanodeProtocolService {
|
||||||
* extremely light weight and contains no data payload.
|
* extremely light weight and contains no data payload.
|
||||||
*/
|
*/
|
||||||
rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
|
rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
|
||||||
|
|
||||||
|
/**
|
||||||
|
send container reports sends the container report to SCM. This will
|
||||||
|
return a null command as response.
|
||||||
|
*/
|
||||||
|
rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.container.common;
|
package org.apache.hadoop.ozone.container.common;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||||
|
@ -37,6 +38,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
||||||
private AtomicInteger heartbeatCount = new AtomicInteger(0);
|
private AtomicInteger heartbeatCount = new AtomicInteger(0);
|
||||||
private AtomicInteger rpcCount = new AtomicInteger(0);
|
private AtomicInteger rpcCount = new AtomicInteger(0);
|
||||||
private ReportState reportState;
|
private ReportState reportState;
|
||||||
|
private AtomicInteger containerReportsCount = new AtomicInteger(0);
|
||||||
|
private AtomicInteger closedContainerCount = new AtomicInteger(0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of heartbeats made to this class.
|
* Returns the number of heartbeats made to this class.
|
||||||
|
@ -74,6 +77,22 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
||||||
this.rpcResponseDelay = rpcResponseDelay;
|
this.rpcResponseDelay = rpcResponseDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of container reports server has seen.
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
public int getContainerReportsCount() {
|
||||||
|
return containerReportsCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of closed containers that have been reported so far.
|
||||||
|
* @return - count of closed containers.
|
||||||
|
*/
|
||||||
|
public int getClosedContainerCount() {
|
||||||
|
return closedContainerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns SCM version.
|
* Returns SCM version.
|
||||||
*
|
*
|
||||||
|
@ -118,6 +137,12 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
||||||
heartbeatCount.incrementAndGet();
|
heartbeatCount.incrementAndGet();
|
||||||
this.reportState = reportState;
|
this.reportState = reportState;
|
||||||
sleepIfNeeded();
|
sleepIfNeeded();
|
||||||
|
return getNullRespose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
|
||||||
|
getNullRespose() throws
|
||||||
|
com.google.protobuf.InvalidProtocolBufferException {
|
||||||
StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
|
StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
|
||||||
cmdResponse = StorageContainerDatanodeProtocolProtos
|
cmdResponse = StorageContainerDatanodeProtocolProtos
|
||||||
.SCMCommandResponseProto
|
.SCMCommandResponseProto
|
||||||
|
@ -155,6 +180,23 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
||||||
.SCMRegisteredCmdResponseProto.ErrorCode.success).build();
|
.SCMRegisteredCmdResponseProto.ErrorCode.success).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a container report.
|
||||||
|
*
|
||||||
|
* @param reports -- Container report
|
||||||
|
* @return HeartbeatResponse.nullcommand.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
|
||||||
|
sendContainerReport(StorageContainerDatanodeProtocolProtos
|
||||||
|
.ContainerReportsProto reports) throws IOException {
|
||||||
|
Preconditions.checkNotNull(reports);
|
||||||
|
containerReportsCount.incrementAndGet();
|
||||||
|
closedContainerCount.addAndGet(reports.getReportsCount());
|
||||||
|
return getNullRespose();
|
||||||
|
}
|
||||||
|
|
||||||
public ReportState getReportState() {
|
public ReportState getReportState() {
|
||||||
return this.reportState;
|
return this.reportState;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.container.common;
|
package org.apache.hadoop.ozone.container.common;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine
|
import org.apache.hadoop.ozone.container.common.statemachine
|
||||||
.DatanodeStateMachine;
|
.DatanodeStateMachine;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine
|
import org.apache.hadoop.ozone.container.common.statemachine
|
||||||
|
@ -344,4 +346,51 @@ public class TestEndPoint {
|
||||||
scmServerImpl.setRpcResponseDelay(0);
|
scmServerImpl.setRpcResponseDelay(0);
|
||||||
Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
|
Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new container report.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
ContainerReport getRandomContainerReport() {
|
||||||
|
return new ContainerReport(UUID.randomUUID().toString()
|
||||||
|
,DigestUtils.sha256Hex("Random"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates dummy container reports.
|
||||||
|
* @param count - The number of closed containers to create.
|
||||||
|
* @return ContainerReportsProto
|
||||||
|
*/
|
||||||
|
StorageContainerDatanodeProtocolProtos.ContainerReportsProto
|
||||||
|
createDummyContainerReports(int count) {
|
||||||
|
StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
|
||||||
|
reportsBuilder = StorageContainerDatanodeProtocolProtos
|
||||||
|
.ContainerReportsProto.newBuilder();
|
||||||
|
for (int x = 0; x < count; x++) {
|
||||||
|
reportsBuilder.addReports(getRandomContainerReport()
|
||||||
|
.getProtoBufMessage());
|
||||||
|
}
|
||||||
|
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
|
||||||
|
.ContainerReportsProto.reportType.fullReport);
|
||||||
|
return reportsBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that rpcEndpoint sendContainerReport works as expected.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testContainerReportSend() throws Exception {
|
||||||
|
final int count = 1000;
|
||||||
|
try (EndpointStateMachine rpcEndPoint =
|
||||||
|
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
|
||||||
|
serverAddress, 1000)) {
|
||||||
|
SCMHeartbeatResponseProto responseProto = rpcEndPoint
|
||||||
|
.getEndPoint().sendContainerReport(createDummyContainerReports(
|
||||||
|
count));
|
||||||
|
Assert.assertNotNull(responseProto);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
|
||||||
|
Assert.assertEquals(count, scmServerImpl.getClosedContainerCount());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue