diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7ab49ac3471..48c97fc26d9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -477,6 +477,9 @@ Branch YARN-321: Generic ApplicationHistoryService YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via devaraj) + YARN-975. Added a file-system implementation for HistoryStorage. (Zhijie Shen + via vinodkv) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dc195858cb8..009dda3c160 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -931,6 +931,19 @@ public class YarnConfiguration extends Configuration { public static final String YARN_APP_CONTAINER_LOG_BACKUPS = YARN_PREFIX + "app.container.log.backups"; + //////////////////////////////// + // AHS Configs + //////////////////////////////// + + public static final String AHS_PREFIX = YARN_PREFIX + "ahs."; + + /** URI for FileSystemApplicationHistoryStore */ + public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri"; + + /** T-file compression types used to compress history data.*/ + public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type"; + public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ba6264e0ae3..b831158460f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1041,6 +1041,23 @@ + + + + URI pointing to the location of the FileSystem path where + the history will be persisted. This must be supplied when using + org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore + as the value for yarn.resourcemanager.ahs.writer.class + yarn.ahs.fs-history-store.uri + ${hadoop.log.dir}/yarn/system/ahstore + + + + T-file compression types used to compress history data. + yarn.ahs.fs-history-store.compression-type + none + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java new file mode 100644 index 00000000000..b4d97f314db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.file.tfile.TFile; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * File system implementation of {@link ApplicationHistoryStore}. In this + * implementation, one application will have just one file in the file system, + * which contains all the history data of one application, and its attempts and + * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to + * be invoked first when writing any history data of one application and it will + * open a file, while {@link #applicationFinished(ApplicationFinishData)} is + * supposed to be last writing operation and will close the file. + */ +@Public +@Unstable +public class FileSystemApplicationHistoryStore extends AbstractService + implements ApplicationHistoryStore { + + private static final Log LOG = LogFactory + .getLog(FileSystemApplicationHistoryStore.class); + + private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot"; + private static final int MIN_BLOCK_SIZE = 256 * 1024; + private static final String START_DATA_SUFFIX = "_start"; + private static final String FINISH_DATA_SUFFIX = "_finish"; + private static final FsPermission ROOT_DIR_UMASK = + FsPermission.createImmutable((short) 0740); + private static final FsPermission HISTORY_FILE_UMASK = + FsPermission.createImmutable((short) 0640); + + private FileSystem fs; + private Path rootDirPath; + + private ConcurrentMap outstandingWriters = + new ConcurrentHashMap(); + + public FileSystemApplicationHistoryStore() { + super(FileSystemApplicationHistoryStore.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Path fsWorkingPath = new Path( + conf.get(YarnConfiguration.FS_HISTORY_STORE_URI)); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + try { + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(rootDirPath); + fs.setPermission(rootDirPath, ROOT_DIR_UMASK); + } catch (IOException e) { + LOG.error("Error when initializing FileSystemHistoryStorage", e); + throw e; + } + super.serviceInit(conf); + } + + @Override + public void serviceStop() throws Exception { + try { + for (Entry entry : outstandingWriters + .entrySet()) { + entry.getValue().close(); + } + outstandingWriters.clear(); + } finally { + IOUtils.cleanup(LOG, fs); + } + super.serviceStop(); + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + HistoryFileReader hfReader = getHistoryFileReader(appId); + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationHistoryData historyData = + ApplicationHistoryData.newInstance( + appId, null, null, null, null, Long.MIN_VALUE, Long.MIN_VALUE, + Long.MAX_VALUE, null, FinalApplicationStatus.UNDEFINED, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(appId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ApplicationStartData startData = + parseApplicationStartData(entry.value); + mergeApplicationHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ApplicationFinishData finishData = + parseApplicationFinishData(entry.value); + mergeApplicationHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application " + appId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application " + appId); + } + LOG.info("Completed reading history information of application " + appId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application " + appId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public Map getAllApplications() + throws IOException { + Map historyDataMap = + new HashMap(); + FileStatus[] files = fs.listStatus(rootDirPath); + for (FileStatus file : files) { + ApplicationId appId = + ConverterUtils.toApplicationId(file.getPath().getName()); + try { + ApplicationHistoryData historyData = getApplication(appId); + if (historyData != null) { + historyDataMap.put(appId, historyData); + } + } catch (IOException e) { + // Eat the exception not to disturb the getting the next + // ApplicationHistoryData + LOG.error("History information of application " + appId + + " is not included into the result due to the exception", e); + } + } + return historyDataMap; + } + + @Override + public Map + getApplicationAttempts(ApplicationId appId) throws IOException { + Map historyDataMap = + new HashMap(); + Map> startFinshDataMap = + new HashMap>(); + HistoryFileReader hfReader = getHistoryFileReader(appId); + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.startsWith(ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + retrieveStartFinishData(appId, entry, startFinshDataMap, true); + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + retrieveStartFinishData(appId, entry, startFinshDataMap, false); + } + } + } + LOG.info("Completed reading history information of all application" + + " attempts of application " + appId); + } catch (IOException e) { + LOG.info("Error when reading history information of some application" + + " attempts of application " + appId); + } finally { + hfReader.close(); + } + for (Map.Entry> entry : startFinshDataMap + .entrySet()) { + ApplicationAttemptHistoryData historyData = + ApplicationAttemptHistoryData.newInstance( + entry.getKey(), null, -1, null, null, null, + FinalApplicationStatus.UNDEFINED, null); + mergeApplicationAttemptHistoryData(historyData, + entry.getValue().startData); + mergeApplicationAttemptHistoryData(historyData, + entry.getValue().finishData); + historyDataMap.put(entry.getKey(), historyData); + } + return historyDataMap; + } + + private + void + retrieveStartFinishData( + ApplicationId appId, + HistoryFileReader.Entry entry, + Map> startFinshDataMap, + boolean start) throws IOException { + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(entry.key.id); + if (appAttemptId.getApplicationId().equals(appId)) { + StartFinishDataPair pair = + startFinshDataMap.get(appAttemptId); + if (pair == null) { + pair = + new StartFinishDataPair(); + startFinshDataMap.put(appAttemptId, pair); + } + if (start) { + pair.startData = parseApplicationAttemptStartData(entry.value); + } else { + pair.finishData = parseApplicationAttemptFinishData(entry.value); + } + } + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationAttemptHistoryData historyData = + ApplicationAttemptHistoryData.newInstance( + appAttemptId, null, -1, null, null, null, + FinalApplicationStatus.UNDEFINED, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(appAttemptId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ApplicationAttemptStartData startData = + parseApplicationAttemptStartData(entry.value); + mergeApplicationAttemptHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ApplicationAttemptFinishData finishData = + parseApplicationAttemptFinishData(entry.value); + mergeApplicationAttemptHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application attempt " + + appAttemptId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application attempt " + + appAttemptId); + } + LOG.info("Completed reading history information of application attempt " + + appAttemptId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application attempt" + + appAttemptId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(containerId.getApplicationAttemptId() + .getApplicationId()); + try { + boolean readStartData = false; + boolean readFinishData = false; + ContainerHistoryData historyData = + ContainerHistoryData.newInstance(containerId, null, null, null, + Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE, + null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(containerId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ContainerStartData startData = + parseContainerStartData(entry.value); + mergeContainerHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ContainerFinishData finishData = + parseContainerFinishData(entry.value); + mergeContainerHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for container " + containerId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for container " + containerId); + } + LOG.info("Completed reading history information of container " + + containerId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of container " + containerId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + ApplicationAttemptHistoryData attemptHistoryData = + getApplicationAttempt(appAttemptId); + if (attemptHistoryData == null + || attemptHistoryData.getMasterContainerId() == null) { + return null; + } + return getContainer(attemptHistoryData.getMasterContainerId()); + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + Map historyDataMap = + new HashMap(); + Map> startFinshDataMap = + new HashMap>(); + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + retrieveStartFinishData(appAttemptId, entry, startFinshDataMap, + true); + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + retrieveStartFinishData(appAttemptId, entry, startFinshDataMap, + false); + } + } + } + LOG.info("Completed reading history information of all conatiners" + + " of application attempt " + appAttemptId); + } catch (IOException e) { + LOG.info("Error when reading history information of some containers" + + " of application attempt " + appAttemptId); + } finally { + hfReader.close(); + } + for (Map.Entry> entry : startFinshDataMap + .entrySet()) { + ContainerHistoryData historyData = + ContainerHistoryData.newInstance(entry.getKey(), null, null, null, + Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE, + null); + mergeContainerHistoryData(historyData, entry.getValue().startData); + mergeContainerHistoryData(historyData, entry.getValue().finishData); + historyDataMap.put(entry.getKey(), historyData); + } + return historyDataMap; + } + + private + void + retrieveStartFinishData( + ApplicationAttemptId appAttemptId, + HistoryFileReader.Entry entry, + Map> startFinshDataMap, + boolean start) throws IOException { + ContainerId containerId = + ConverterUtils.toContainerId(entry.key.id); + if (containerId.getApplicationAttemptId().equals(appAttemptId)) { + StartFinishDataPair pair = + startFinshDataMap.get(containerId); + if (pair == null) { + pair = + new StartFinishDataPair(); + startFinshDataMap.put(containerId, pair); + } + if (start) { + pair.startData = parseContainerStartData(entry.value); + } else { + pair.finishData = parseContainerFinishData(entry.value); + } + } + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + HistoryFileWriter hfWriter = + outstandingWriters.get(appStart.getApplicationId()); + if (hfWriter == null) { + Path applicationHistoryFile = + new Path(rootDirPath, appStart.getApplicationId().toString()); + try { + hfWriter = new HistoryFileWriter(applicationHistoryFile); + LOG.info("Opened history file of application " + + appStart.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when openning history file of application " + + appStart.getApplicationId()); + throw e; + } + outstandingWriters.put(appStart.getApplicationId(), hfWriter); + } else { + throw new IOException("History file of application " + + appStart.getApplicationId() + " is already opened"); + } + assert appStart instanceof ApplicationStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId() + .toString(), START_DATA_SUFFIX), + ((ApplicationStartDataPBImpl) appStart) + .getProto().toByteArray()); + LOG.info("Start information of application " + + appStart.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application " + + appStart.getApplicationId()); + throw e; + } + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appFinish.getApplicationId()); + assert appFinish instanceof ApplicationFinishDataPBImpl; + try { + hfWriter.writeHistoryData( + new HistoryDataKey(appFinish.getApplicationId().toString(), + FINISH_DATA_SUFFIX), + ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray()); + LOG.info("Finish information of application " + + appFinish.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application " + + appFinish.getApplicationId()); + throw e; + } finally { + hfWriter.close(); + outstandingWriters.remove(appFinish.getApplicationId()); + } + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptStart.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl; + try { + hfWriter.writeHistoryData( + new HistoryDataKey(appAttemptStart.getApplicationAttemptId() + .toString(), + START_DATA_SUFFIX), + ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto() + .toByteArray()); + LOG.info("Start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application attempt " + + appAttemptStart.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl; + try { + hfWriter.writeHistoryData( + new HistoryDataKey(appAttemptFinish.getApplicationAttemptId() + .toString(), + FINISH_DATA_SUFFIX), + ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto() + .toByteArray()); + LOG.info("Finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerStart.getContainerId() + .getApplicationAttemptId() + .getApplicationId()); + assert containerStart instanceof ContainerStartDataPBImpl; + try { + hfWriter.writeHistoryData( + new HistoryDataKey(containerStart.getContainerId().toString(), + START_DATA_SUFFIX), + ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray()); + LOG.info("Start information of container " + + containerStart.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of container " + + containerStart.getContainerId()); + throw e; + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerFinish.getContainerId() + .getApplicationAttemptId().getApplicationId()); + assert containerFinish instanceof ContainerFinishDataPBImpl; + try { + hfWriter.writeHistoryData( + new HistoryDataKey(containerFinish.getContainerId().toString(), + FINISH_DATA_SUFFIX), + ((ContainerFinishDataPBImpl) containerFinish).getProto() + .toByteArray()); + LOG.info("Finish information of container " + + containerFinish.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of container " + + containerFinish.getContainerId()); + } + } + + private static ApplicationStartData parseApplicationStartData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationStartDataPBImpl( + ApplicationStartDataProto.parseFrom(value)); + } + + private static ApplicationFinishData parseApplicationFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationFinishDataPBImpl( + ApplicationFinishDataProto.parseFrom(value)); + } + + private static ApplicationAttemptStartData parseApplicationAttemptStartData( + byte[] value) throws InvalidProtocolBufferException { + return new ApplicationAttemptStartDataPBImpl( + ApplicationAttemptStartDataProto.parseFrom(value)); + } + + private static ApplicationAttemptFinishData + parseApplicationAttemptFinishData( + byte[] value) throws InvalidProtocolBufferException { + return new ApplicationAttemptFinishDataPBImpl( + ApplicationAttemptFinishDataProto.parseFrom(value)); + } + + private static ContainerStartData parseContainerStartData(byte[] value) + throws InvalidProtocolBufferException { + return new ContainerStartDataPBImpl( + ContainerStartDataProto.parseFrom(value)); + } + + private static ContainerFinishData parseContainerFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ContainerFinishDataPBImpl( + ContainerFinishDataProto.parseFrom(value)); + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, + ApplicationStartData startData) { + historyData.setApplicationName(startData.getApplicationName()); + historyData.setApplicationType(startData.getApplicationType()); + historyData.setQueue(startData.getQueue()); + historyData.setUser(startData.getUser()); + historyData.setSubmitTime(startData.getSubmitTime()); + historyData.setStartTime(startData.getStartTime()); + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, + ApplicationFinishData finishData) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + historyData.setYarnApplicationState(finishData.getYarnApplicationState()); + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptStartData startData) { + historyData.setHost(startData.getHost()); + historyData.setRPCPort(startData.getRPCPort()); + historyData.setMasterContainerId(startData.getMasterContainerId()); + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptFinishData finishData) { + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setTrackingURL(finishData.getTrackingURL()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + historyData.setYarnApplicationAttemptState(finishData + .getYarnApplicationAttemptState()); + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerStartData startData) { + historyData.setAllocatedResource(startData.getAllocatedResource()); + historyData.setAssignedNode(startData.getAssignedNode()); + historyData.setPriority(startData.getPriority()); + historyData.setStartTime(startData.getStartTime()); + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerFinishData finishData) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setLogURL(finishData.getLogURL()); + historyData.setContainerExitStatus(finishData + .getContainerExitStatus()); + historyData.setContainerState(finishData.getContainerState()); + } + + private HistoryFileWriter getHistoryFileWriter(ApplicationId appId) + throws IOException { + HistoryFileWriter hfWriter = outstandingWriters.get(appId); + if (hfWriter == null) { + throw new IOException("History file of application " + appId + + " is not opened"); + } + return hfWriter; + } + + private HistoryFileReader getHistoryFileReader(ApplicationId appId) + throws IOException { + Path applicationHistoryFile = new Path(rootDirPath, appId.toString()); + if (!fs.exists(applicationHistoryFile)) { + throw new IOException("History file for application " + appId + + " is not found"); + } + // The history file is still under writing + if (outstandingWriters.containsKey(appId)) { + throw new IOException("History file for application " + appId + + " is under writing"); + } + return new HistoryFileReader(applicationHistoryFile); + } + + private class HistoryFileReader { + + private class Entry { + + private HistoryDataKey key; + private byte[] value; + + public Entry(HistoryDataKey key, byte[] value) { + this.key = key; + this.value = value; + } + } + + private FSDataInputStream fsdis; + private TFile.Reader reader; + private TFile.Reader.Scanner scanner; + + public HistoryFileReader(Path historyFile) throws IOException { + FSDataInputStream fsdis = fs.open(historyFile); + reader = + new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(), + getConfig()); + reset(); + } + + public boolean hasNext() { + return !scanner.atEnd(); + } + + public Entry next() throws IOException { + TFile.Reader.Scanner.Entry entry = scanner.entry(); + DataInputStream dis = entry.getKeyStream(); + HistoryDataKey key = new HistoryDataKey(); + key.readFields(dis); + dis = entry.getValueStream(); + byte[] value = new byte[entry.getValueLength()]; + dis.read(value); + scanner.advance(); + return new Entry(key, value); + } + + public void reset() throws IOException { + IOUtils.cleanup(LOG, scanner); + scanner = reader.createScanner(); + } + + public void close() { + IOUtils.cleanup(LOG, scanner, reader, fsdis); + } + + } + + private class HistoryFileWriter { + + private FSDataOutputStream fsdos; + private TFile.Writer writer; + + public HistoryFileWriter(Path historyFile) + throws IOException { + if (fs.exists(historyFile)) { + fsdos = fs.append(historyFile); + } else { + fsdos = fs.create(historyFile); + } + fs.setPermission(historyFile, HISTORY_FILE_UMASK); + writer = + new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get( + YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE), + null, getConfig()); + } + + public synchronized void close() { + IOUtils.cleanup(LOG, writer, fsdos); + } + + public synchronized void writeHistoryData(HistoryDataKey key, byte[] value) + throws IOException { + DataOutputStream dos = null; + try { + dos = writer.prepareAppendKey(-1); + key.write(dos); + } finally { + IOUtils.cleanup(LOG, dos); + } + try { + dos = writer.prepareAppendValue(value.length); + dos.write(value); + } finally { + IOUtils.cleanup(LOG, dos); + } + } + + } + + private static class HistoryDataKey implements Writable { + + private String id; + + private String suffix; + + public HistoryDataKey() { + this(null, null); + } + + public HistoryDataKey(String id, String suffix) { + this.id = id; + this.suffix = suffix; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(id); + out.writeUTF(suffix); + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readUTF(); + suffix = in.readUTF(); + } + + } + + private static class StartFinishDataPair { + + private S startData; + private F finishData; + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java new file mode 100644 index 00000000000..d4a431f4ecb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.net.URI; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFileSystemApplicationHistoryStore extends + ApplicationHistoryStoreTestUtils { + + private FileSystem fs; + private Path fsWorkingPath; + + @Before + public void setup() throws Exception { + fs = new RawLocalFileSystem(); + Configuration conf = new Configuration(); + fs.initialize(new URI("/"), conf); + fsWorkingPath = new Path("Test"); + fs.delete(fsWorkingPath, true); + conf.set(YarnConfiguration.FS_HISTORY_STORE_URI, fsWorkingPath.toString()); + store = new FileSystemApplicationHistoryStore(); + store.init(conf); + store.start(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fs.delete(fsWorkingPath, true); + fs.close(); + } + + @Test + public void testReadWriteHistoryData() throws IOException { + testWriteHistoryData(5); + testReadHistoryData(5); + } + + private void testWriteHistoryData(int num) throws IOException { + // write application history data + for (int i = 1; i <= num; ++i) { + ApplicationId appId = ApplicationId.newInstance(0, i); + writeApplicationStartData(appId); + + // write application attempt history data + for (int j = 1; j <= num; ++j) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, j); + writeApplicationAttemptStartData(appAttemptId); + + // write container history data + for (int k = 1; k <= num; ++k) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, k); + writeContainerStartData(containerId); + writeContainerFinishData(containerId); + + writeApplicationAttemptFinishData(appAttemptId); + } + } + + writeApplicationFinishData(appId); + } + } + + private void testReadHistoryData(int num) throws IOException { + // read application history data + Assert.assertEquals(num, store.getAllApplications().size()); + for (int i = 1; i <= num; ++i) { + ApplicationId appId = ApplicationId.newInstance(0, i); + ApplicationHistoryData appData = store.getApplication(appId); + Assert.assertNotNull(appData); + Assert.assertEquals(appId.toString(), appData.getApplicationName()); + Assert.assertEquals(appId.toString(), appData.getDiagnosticsInfo()); + + // read application attempt history data + Assert.assertEquals( + num, store.getApplicationAttempts(appId).size()); + for (int j = 1; j <= num; ++j) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, j); + ApplicationAttemptHistoryData attemptData = + store.getApplicationAttempt(appAttemptId); + Assert.assertNotNull(attemptData); + Assert.assertEquals(appAttemptId.toString(), attemptData.getHost()); + Assert.assertEquals(appAttemptId.toString(), + attemptData.getDiagnosticsInfo()); + + // read container history data + Assert.assertEquals( + num, store.getContainers(appAttemptId).size()); + for (int k = 1; k <= num; ++k) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, k); + ContainerHistoryData containerData = store.getContainer(containerId); + Assert.assertNotNull(containerData); + Assert.assertEquals(Priority.newInstance(containerId.getId()), + containerData.getPriority()); + Assert.assertEquals(containerId.toString(), + containerData.getDiagnosticsInfo()); + } + ContainerHistoryData masterContainer = + store.getAMContainer(appAttemptId); + Assert.assertNotNull(masterContainer); + Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1), + masterContainer.getContainerId()); + } + } + } + + @Test + public void testWriteAfterApplicationFinish() throws IOException { + ApplicationId appId = ApplicationId.newInstance(0, 1); + writeApplicationStartData(appId); + writeApplicationFinishData(appId); + // write application attempt history data + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + try { + writeApplicationAttemptStartData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + try { + writeApplicationAttemptFinishData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + // write container history data + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + try { + writeContainerStartData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + try { + writeContainerFinishData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + } + + @Test + public void testMassiveWriteContainerHistoryData() throws IOException { + long mb = 1024 * 1024; + long usedDiskBefore = fs.getContentSummary(fsWorkingPath).getLength() / mb; + ApplicationId appId = ApplicationId.newInstance(0, 1); + writeApplicationStartData(appId); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + for (int i = 1; i <= 100000; ++i) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, i); + writeContainerStartData(containerId); + writeContainerFinishData(containerId); + } + writeApplicationFinishData(appId); + long usedDiskAfter = fs.getContentSummary(fsWorkingPath).getLength() / mb; + Assert.assertTrue((usedDiskAfter - usedDiskBefore) < 20); + } + +}