From b24fe0648348d325d14931f80cee8a170fb3358a Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Fri, 30 Oct 2015 16:05:23 -0500 Subject: [PATCH] Addendum to MAPREDUCE-6451 --- .../mapred/lib/DynamicInputChunkContext.java | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java new file mode 100644 index 00000000000..043ff1ca354 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.mapred.lib; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.tools.DistCpConstants; + +import java.io.IOException; + +/** + * Class to initialize the DynamicInputChunk invariants. + */ +class DynamicInputChunkContext { + + private static Log LOG = LogFactory.getLog(DynamicInputChunkContext.class); + private Configuration configuration; + private Path chunkRootPath = null; + private String chunkFilePrefix; + private FileSystem fs; + private int numChunksLeft = -1; // Un-initialized before 1st dir-scan. + + public DynamicInputChunkContext(Configuration config) + throws IOException { + this.configuration = config; + Path listingFilePath = new Path(getListingFilePath(configuration)); + chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir"); + fs = chunkRootPath.getFileSystem(configuration); + chunkFilePrefix = listingFilePath.getName() + ".chunk."; + } + + public Configuration getConfiguration() { + return configuration; + } + + public Path getChunkRootPath() { + return chunkRootPath; + } + + public String getChunkFilePrefix() { + return chunkFilePrefix; + } + + public FileSystem getFs() { + return fs; + } + + private static String getListingFilePath(Configuration configuration) { + final String listingFileString = configuration.get( + DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); + assert !listingFileString.equals("") : "Listing file not found."; + return listingFileString; + } + + public int getNumChunksLeft() { + return numChunksLeft; + } + + public DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + + String taskId + = taskAttemptContext.getTaskAttemptID().getTaskID().toString(); + Path acquiredFilePath = new Path(getChunkRootPath(), taskId); + + if (fs.exists(acquiredFilePath)) { + LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath); + return new DynamicInputChunk(acquiredFilePath, taskAttemptContext, this); + } + + for (FileStatus chunkFile : getListOfChunkFiles()) { + if (fs.rename(chunkFile.getPath(), acquiredFilePath)) { + LOG.info(taskId + " acquired " + chunkFile.getPath()); + return new DynamicInputChunk(acquiredFilePath, taskAttemptContext, + this); + } + } + return null; + } + + public DynamicInputChunk createChunkForWrite(String chunkId) + throws IOException { + return new DynamicInputChunk(chunkId, this); + } + + public FileStatus [] getListOfChunkFiles() throws IOException { + Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*"); + FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern); + numChunksLeft = chunkFiles.length; + return chunkFiles; + } +}