diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java index 91f7cbe5500..e07f1db04c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -18,7 +18,9 @@ package org.apache.hadoop.ozone.container.common.helpers; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.util.Time; import java.io.IOException; import java.util.Collections; @@ -49,6 +51,7 @@ public class ContainerData { public ContainerData(String containerName) { this.metadata = new TreeMap<>(); this.containerName = containerName; + this.open = true; } /** @@ -213,15 +216,20 @@ public class ContainerData { * checks if the container is open. * @return - boolean */ - public boolean isOpen() { + public synchronized boolean isOpen() { return open; } /** * Marks this container as closed. */ - public void closeContainer() { - this.open = false; + public synchronized void closeContainer() { + setOpen(false); + + // Some thing brain dead for now. name + Time stamp of when we get the close + // container message. + setHash(DigestUtils.sha256Hex(this.getContainerName() + + Long.toString(Time.monotonicNow()))); } /** @@ -242,7 +250,7 @@ public class ContainerData { * Sets the open or closed values. * @param open */ - public void setOpen(boolean open) { + public synchronized void setOpen(boolean open) { this.open = open; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java new file mode 100644 index 00000000000..9e95c601999 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; + +/** + * Container Report iterates the closed containers and sends a container report + * to SCM. + *

+ * The protobuf counter part of this class looks like this. + * message ContainerInfo { + * required string containerName = 1; + * repeated bytes finalhash = 2; + * optional int64 size = 3; + * optional int64 keycount = 4; + * } + */ +public class ContainerReport { + private static final int UNKNOWN = -1; + private final String containerName; + private final String finalhash; + private long size; + private long keyCount; + + /** + * Constructs the ContainerReport. + * + * @param containerName - Container Name. + * @param finalhash - Final Hash. + */ + public ContainerReport(String containerName, String finalhash) { + this.containerName = containerName; + this.finalhash = finalhash; + this.size = UNKNOWN; + this.keyCount = UNKNOWN; + } + + /** + * Gets a containerReport from protobuf class. + * + * @param info - ContainerInfo. + * @return - ContainerReport. + */ + public static ContainerReport getFromProtoBuf(ContainerInfo info) { + Preconditions.checkNotNull(info); + ContainerReport report = new ContainerReport(info.getContainerName(), + info.getFinalhash()); + if (info.hasSize()) { + report.setSize(info.getSize()); + } + if (info.hasKeycount()) { + report.setKeyCount(info.getKeycount()); + } + return report; + } + + /** + * Gets the container name. + * + * @return - Name + */ + public String getContainerName() { + return containerName; + } + + /** + * Returns the final signature for this container. + * + * @return - hash + */ + public String getFinalhash() { + return finalhash; + } + + /** + * Returns a positive number it is a valid number, -1 if not known. + * + * @return size or -1 + */ + public long getSize() { + return size; + } + + /** + * Sets the size of the container on disk. + * + * @param size - int + */ + public void setSize(long size) { + this.size = size; + } + + /** + * Gets number of keys in the container if known. + * + * @return - Number of keys or -1 for not known. + */ + public long getKeyCount() { + return keyCount; + } + + /** + * Sets the key count. + * + * @param keyCount - Key Count + */ + public void setKeyCount(long keyCount) { + this.keyCount = keyCount; + } + + /** + * Gets a containerInfo protobuf message from ContainerReports. + * + * @return ContainerInfo + */ + public ContainerInfo getProtoBufMessage() { + return ContainerInfo.newBuilder() + .setContainerName(this.getContainerName()) + .setKeycount(this.getKeyCount()) + .setSize(this.getSize()) + .setFinalhash(this.getFinalhash()) + .build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index fe22042a601..2cb295f5716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos @@ -611,6 +612,25 @@ public class ContainerManagerImpl implements ContainerManager { return nrb.build(); } + /** + * Gets container reports. + * + * @return List of all closed containers. + * @throws IOException + */ + @Override + public List getContainerReports() throws IOException { + LOG.debug("Starting container report iteration."); + // No need for locking since containerMap is a ConcurrentSkipListMap + // And we can never get the exact state since close might happen + // after we iterate a point. + return containerMap.entrySet().stream() + .filter(containerStatus -> + containerStatus.getValue().getContainer().isOpen()) + .map(containerStatus -> containerStatus.getValue().getContainer()) + .collect(Collectors.toList()); + } + /** * Sets the Key Manager. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index 19ce6598ecf..e41cc3c88cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -25,8 +25,7 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.IOException; @@ -151,4 +150,11 @@ public interface ContainerManager extends RwLock { * @return node report. */ SCMNodeReport getNodeReport() throws IOException; + + /** + * Gets container reports. + * @return List of all closed containers. + * @throws IOException + */ + List getContainerReports() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 600884421f6..5cac7b012e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -21,7 +21,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; @@ -31,6 +34,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * State Machine Class. @@ -46,12 +50,15 @@ public class DatanodeStateMachine implements Closeable { private StateContext context; private final OzoneContainer container; private DatanodeID datanodeID = null; + private final CommandDispatcher commandDispatcher; + private long commandsHandled; + private AtomicLong nextHB; /** * Constructs a a datanode state machine. * * @param datanodeID - DatanodeID used to identify a datanode - * @param conf - Configration. + * @param conf - Configuration. */ public DatanodeStateMachine(DatanodeID datanodeID, Configuration conf) throws IOException { @@ -65,6 +72,17 @@ public class DatanodeStateMachine implements Closeable { OzoneClientUtils.getScmHeartbeatInterval(conf)); container = new OzoneContainer(conf); this.datanodeID = datanodeID; + nextHB = new AtomicLong(Time.monotonicNow()); + + + // When we add new handlers just adding a new handler here should do the + // trick. + commandDispatcher = CommandDispatcher.newBuilder() + .addHandler(new ContainerReportHandler()) + .setConnectionManager(connectionManager) + .setContainer(container) + .setContext(context) + .build(); } public DatanodeStateMachine(Configuration conf) @@ -104,18 +122,19 @@ public class DatanodeStateMachine implements Closeable { */ private void start() throws IOException { long now = 0; - long nextHB = 0; + container.start(); + initCommandHandlerThread(conf); while (context.getState() != DatanodeStates.SHUTDOWN) { try { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); - nextHB = Time.monotonicNow() + heartbeatFrequency; + nextHB.set(Time.monotonicNow() + heartbeatFrequency); context.setReportState(container.getNodeReport()); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); now = Time.monotonicNow(); - if (now < nextHB) { - Thread.sleep(nextHB - now); + if (now < nextHB.get()) { + Thread.sleep(nextHB.get() - now); } } catch (Exception e) { LOG.error("Unable to finish the execution.", e); @@ -162,7 +181,7 @@ public class DatanodeStateMachine implements Closeable { } if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - LOG.error("Unable to shutdown statemachine properly."); + LOG.error("Unable to shutdown state machine properly."); } } catch (InterruptedException e) { LOG.error("Error attempting to shutdown.", e); @@ -289,4 +308,61 @@ public class DatanodeStateMachine implements Closeable { && this.getContext().getExecutionCount() == 0 && this.getContext().getState() == DatanodeStates.SHUTDOWN; } + + /** + * Create a command handler thread. + * + * @param conf + */ + private void initCommandHandlerThread(Configuration conf) { + + /** + * Task that periodically checks if we have any outstanding commands. + * It is assumed that commands can be processed slowly and in order. + * This assumption might change in future. Right now due to this assumption + * we have single command queue process thread. + */ + Runnable processCommandQueue = () -> { + long now; + while (getContext().getState() != DatanodeStates.SHUTDOWN) { + SCMCommand command = getContext().getNextCommand(); + if (command != null) { + commandDispatcher.handle(command); + commandsHandled++; + } else { + try { + // Sleep till the next HB + 1 second. + now = Time.monotonicNow(); + if (nextHB.get() > now) { + Thread.sleep((nextHB.get() - now) + 1000L); + } + } catch (InterruptedException e) { + // Ignore this exception. + } + } + } + }; + + // We will have only one thread for command processing in a datanode. + Thread cmdProcessThread = new Thread(processCommandQueue); + cmdProcessThread.setDaemon(true); + cmdProcessThread.setName("Command processor thread"); + cmdProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { + // Let us just restart this thread after logging a critical error. + // if this thread is not running we cannot handle commands from SCM. + LOG.error("Critical Error : Command processor thread encountered an " + + "error. Thread: {}", t.toString(), e); + cmdProcessThread.start(); + }); + cmdProcessThread.start(); + } + + /** + * Returns the number of commands handled by the datanode. + * @return count + */ + @VisibleForTesting + public long getCommandHandled() { + return commandsHandled; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java new file mode 100644 index 00000000000..ca619a3d4c6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Dispatches command to the correct handler. + */ +public final class CommandDispatcher { + static final Logger LOG = + LoggerFactory.getLogger(CommandDispatcher.class); + private final StateContext context; + private final Map handlerMap; + private final OzoneContainer container; + private final SCMConnectionManager connectionManager; + + /** + * Constructs a command Dispatcher. + * @param context - Context. + */ + /** + * Constructs a command dispatcher. + * + * @param container - Ozone Container + * @param context - Context + * @param handlers - Set of handlers. + */ + private CommandDispatcher(OzoneContainer container, SCMConnectionManager + connectionManager, StateContext context, + CommandHandler... handlers) { + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(handlers); + Preconditions.checkArgument(handlers.length > 0); + Preconditions.checkNotNull(container); + Preconditions.checkNotNull(connectionManager); + this.context = context; + this.container = container; + this.connectionManager = connectionManager; + handlerMap = new HashMap<>(); + for (CommandHandler h : handlers) { + if(handlerMap.containsKey(h.getCommandType())){ + LOG.error("Duplicate handler for the same command. Exiting. Handle " + + "key : { }", h.getCommandType().getDescriptorForType().getName()); + throw new IllegalArgumentException("Duplicate handler for the same " + + "command."); + } + handlerMap.put(h.getCommandType(), h); + } + } + + /** + * Dispatch the command to the correct handler. + * + * @param command - SCM Command. + */ + public void handle(SCMCommand command) { + Preconditions.checkNotNull(command); + CommandHandler handler = handlerMap.get(command.getType()); + if (handler != null) { + handler.handle(command, container, context, connectionManager); + } else { + LOG.error("Unknown SCM Command queued. There is no handler for this " + + "command. Command: {}", command.getType().getDescriptorForType() + .getName()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Helper class to construct command dispatcher. + */ + public static class Builder { + private final List handlerList; + private OzoneContainer container; + private StateContext context; + private SCMConnectionManager connectionManager; + + public Builder() { + handlerList = new LinkedList<>(); + } + + /** + * Adds a handler. + * + * @param handler - handler + * @return Builder + */ + public Builder addHandler(CommandHandler handler) { + Preconditions.checkNotNull(handler); + handlerList.add(handler); + return this; + } + + /** + * Add the OzoneContainer. + * + * @param container - ozone container. + * @return Builder + */ + public Builder setContainer(OzoneContainer container) { + Preconditions.checkNotNull(container); + this.container = container; + return this; + } + + /** + * Set the Connection Manager. + * + * @param connectionManager + * @return this + */ + public Builder setConnectionManager(SCMConnectionManager + connectionManager) { + Preconditions.checkNotNull(connectionManager); + this.connectionManager = connectionManager; + return this; + } + + /** + * Sets the Context. + * + * @param context - StateContext + * @return this + */ + public Builder setContext(StateContext context) { + Preconditions.checkNotNull(context); + this.context = context; + return this; + } + + /** + * Builds a command Dispatcher. + * @return Command Dispatcher. + */ + public CommandDispatcher build() { + Preconditions.checkNotNull(this.connectionManager, "Missing connection" + + " manager."); + Preconditions.checkNotNull(this.container, "Missing container."); + Preconditions.checkNotNull(this.context, "Missing context."); + Preconditions.checkArgument(this.handlerList.size() > 0); + return new CommandDispatcher(this.container, this.connectionManager, + this.context, handlerList.toArray( + new CommandHandler[handlerList.size()])); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java new file mode 100644 index 00000000000..b30d4810ebc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +/** + * Generic interface for handlers. + */ +public interface CommandHandler { + + /** + * Handles a given SCM command. + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager); + + /** + * Returns the command type that this command handler handles. + * @return Type + */ + Type getCommandType(); + + /** + * Returns number of times this handler has been invoked. + * @return int + */ + int getInvocationCount(); + + /** + * Returns the average time this function takes to run. + * @return long + */ + long getAverageRunTime(); + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java new file mode 100644 index 00000000000..a30783d6ce7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Container Report handler. + */ +public class ContainerReportHandler implements CommandHandler { + static final Logger LOG = + LoggerFactory.getLogger(ContainerReportHandler.class); + private int invocationCount; + private long totalTime; + + /** + * Constructs a ContainerReport handler. + */ + public ContainerReportHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + LOG.debug("Processing Container Report."); + invocationCount++; + long startTime = Time.monotonicNow(); + try { + ContainerReportsProto.Builder contianerReportsBuilder = + ContainerReportsProto.newBuilder(); + List closedContainerList = container.getContainerReports(); + for (ContainerData cd : closedContainerList) { + ContainerReport report = + new ContainerReport(cd.getContainerName(), cd.getHash()); + contianerReportsBuilder.addReports(report.getProtoBufMessage()); + } + contianerReportsBuilder.setType(ContainerReportsProto.reportType + .fullReport); + + // TODO : We send this report to all SCMs.Check if it is enough only to + // send to the leader once we have RAFT enabled SCMs. + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + endPoint.getEndPoint().sendContainerReport( + contianerReportsBuilder.build()); + } + } catch (IOException ex) { + LOG.error("Unable to process the Container Report command.", ex); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public Type getCommandType() { + return Type.sendContainerReport; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return invocationCount; + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java new file mode 100644 index 00000000000..1e9c8dc5eee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 5db1ce8b0f7..de68a994353 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; import org.apache.hadoop.ozone.container.common.impl.Dispatcher; @@ -176,4 +177,13 @@ public class OzoneContainer { public int getContainerServerPort() { return server.getIPCPort(); } + + /** + * Returns the list of closed containers. + * @return - List of closed containers. + * @throws IOException + */ + public List getContainerReports() throws IOException { + return this.manager.getContainerReports(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index 76f359ca75e..75bd771a52e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; @@ -61,4 +62,12 @@ public interface StorageContainerDatanodeProtocol { SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, String[] scmAddresses) throws IOException; + /** + * Send a container report. + * @param reports -- Container report + * @return HeartbeatRespose.nullcommand. + * @throws IOException + */ + SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 78b46cfc632..e0e3bee8c3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto @@ -31,14 +32,12 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import java.io.Closeable; import java.io.IOException; @@ -159,4 +158,23 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB return response; } + /** + * Send a container report. + * + * @param reports -- Container report + * @return HeartbeatRespose.nullcommand. + * @throws IOException + */ + @Override + public SCMHeartbeatResponseProto sendContainerReport( + ContainerReportsProto reports) throws IOException { + final SCMHeartbeatResponseProto resp; + try { + resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index ca13305af54..116cc382819 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import java.io.IOException; @@ -84,4 +85,16 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB throw new ServiceException(e); } } + + @Override + public SCMHeartbeatResponseProto + sendContainerReport(RpcController controller, + ContainerReportsProto request) + throws ServiceException { + try { + return impl.sendContainerReport(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 53ddb9c9917..91a7376c047 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -34,11 +34,12 @@ import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.ozone.protocol.commands.NullCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto @@ -297,16 +298,24 @@ public class StorageContainerManager Type type = cmd.getType(); switch (type) { case nullCmd: - Preconditions.checkState(cmd.getClass() == NullCommand.class); - return SCMCommandResponseProto.newBuilder().setCmdType(cmd.getType()) - .setNullCommand( - NullCmdResponseProto.parseFrom(cmd.getProtoBufMessage())) - .build(); + return getNullCmdResponse(); default: throw new IllegalArgumentException("Not implemented"); } } + /** + * Returns a null command response. + * @return + * @throws InvalidProtocolBufferException + */ + private static SCMCommandResponseProto getNullCmdResponse() { + return SCMCommandResponseProto.newBuilder() + .setCmdType(Type.nullCmd) + .setNullCommand(NullCmdResponseProto.getDefaultInstance()) + .build(); + } + @VisibleForTesting public static SCMRegisteredCmdResponseProto getRegisteredResponse( SCMCommand cmd, SCMNodeAddressList addressList) { @@ -480,6 +489,22 @@ public class StorageContainerManager return getRegisteredResponse(scmNodeManager.register(datanodeID), null); } + /** + * Send a container report. + * + * @param reports -- Container report + * @return HeartbeatRespose.nullcommand. + * @throws IOException + */ + @Override + public SCMHeartbeatResponseProto + sendContainerReport(ContainerReportsProto reports) throws IOException { + // TODO : fix this in the server side code changes for handling this request + // correctly. + return SCMHeartbeatResponseProto.newBuilder() + .addCommands(getNullCmdResponse()).build(); + } + /** * Returns the Number of Datanodes that are communicating with SCM. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 38be4bea082..545f05f2bb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -96,7 +96,7 @@ A container report contains the following information. */ message ContainerInfo { required string containerName = 1; - repeated bytes finalhash = 2; + required string finalhash = 2; optional int64 size = 3; optional int64 keycount = 4; } @@ -105,7 +105,7 @@ message ContainerInfo { A set of container reports, max count is generally set to 8192 since that keeps the size of the reports under 1 MB. */ -message ContainerReports { +message ContainerReportsProto { enum reportType { fullReport = 0; deltaReport = 1; @@ -306,4 +306,11 @@ service StorageContainerDatanodeProtocolService { * extremely light weight and contains no data payload. */ rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); + + /** + send container reports sends the container report to SCM. This will + return a null command as response. + */ + rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto); + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index a9787cefa40..5e8d13b5fa3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.container.common; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -37,6 +38,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { private AtomicInteger heartbeatCount = new AtomicInteger(0); private AtomicInteger rpcCount = new AtomicInteger(0); private ReportState reportState; + private AtomicInteger containerReportsCount = new AtomicInteger(0); + private AtomicInteger closedContainerCount = new AtomicInteger(0); /** * Returns the number of heartbeats made to this class. @@ -74,6 +77,22 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { this.rpcResponseDelay = rpcResponseDelay; } + /** + * Returns the number of container reports server has seen. + * @return int + */ + public int getContainerReportsCount() { + return containerReportsCount.get(); + } + + /** + * Returns the number of closed containers that have been reported so far. + * @return - count of closed containers. + */ + public int getClosedContainerCount() { + return closedContainerCount.get(); + } + /** * Returns SCM version. * @@ -118,6 +137,12 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { heartbeatCount.incrementAndGet(); this.reportState = reportState; sleepIfNeeded(); + return getNullRespose(); + } + + private StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto + getNullRespose() throws + com.google.protobuf.InvalidProtocolBufferException { StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto cmdResponse = StorageContainerDatanodeProtocolProtos .SCMCommandResponseProto @@ -155,6 +180,23 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); } + /** + * Send a container report. + * + * @param reports -- Container report + * @return HeartbeatResponse.nullcommand. + * @throws IOException + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto + sendContainerReport(StorageContainerDatanodeProtocolProtos + .ContainerReportsProto reports) throws IOException { + Preconditions.checkNotNull(reports); + containerReportsCount.incrementAndGet(); + closedContainerCount.addAndGet(reports.getReportsCount()); + return getNullRespose(); + } + public ReportState getReportState() { return this.reportState; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index c8e897c3eb5..3c2c8ca077c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -16,10 +16,12 @@ */ package org.apache.hadoop.ozone.container.common; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; import org.apache.hadoop.ozone.container.common.statemachine .DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine @@ -344,4 +346,51 @@ public class TestEndPoint { scmServerImpl.setRpcResponseDelay(0); Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); } + + /** + * Returns a new container report. + * @return + */ + ContainerReport getRandomContainerReport() { + return new ContainerReport(UUID.randomUUID().toString() + ,DigestUtils.sha256Hex("Random")); + } + + /** + * Creates dummy container reports. + * @param count - The number of closed containers to create. + * @return ContainerReportsProto + */ + StorageContainerDatanodeProtocolProtos.ContainerReportsProto + createDummyContainerReports(int count) { + StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder + reportsBuilder = StorageContainerDatanodeProtocolProtos + .ContainerReportsProto.newBuilder(); + for (int x = 0; x < count; x++) { + reportsBuilder.addReports(getRandomContainerReport() + .getProtoBufMessage()); + } + reportsBuilder.setType(StorageContainerDatanodeProtocolProtos + .ContainerReportsProto.reportType.fullReport); + return reportsBuilder.build(); + } + + /** + * Tests that rpcEndpoint sendContainerReport works as expected. + * @throws Exception + */ + @Test + public void testContainerReportSend() throws Exception { + final int count = 1000; + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMHeartbeatResponseProto responseProto = rpcEndPoint + .getEndPoint().sendContainerReport(createDummyContainerReports( + count)); + Assert.assertNotNull(responseProto); + } + Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); + Assert.assertEquals(count, scmServerImpl.getClosedContainerCount()); + } }