From 0f63cb4f0078081736d56d286eb8018497a0f41d Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Tue, 20 Nov 2012 15:30:50 -0600 Subject: [PATCH] 1) Have IndexGeneratorJob write the descriptors for each of the segments it creates to a path in the temporary working directory (generally HDFS) 2) Have the DbUpdaterJob read descriptors from the temporary working directory instead of looking in the final segment output location (often the eventually consistent S3) 3) 1 and 2 Fixes #30 --- client/pom.xml | 16 --- deploy.sh | 22 --- indexer/pom.xml | 2 +- .../metamx/druid/indexer/DbUpdaterJob.java | 72 ++++------ .../druid/indexer/HadoopDruidIndexer.java | 3 +- .../indexer/HadoopDruidIndexerConfig.java | 21 ++- .../druid/indexer/HadoopDruidIndexerNode.java | 19 +++ .../druid/indexer/IndexGeneratorJob.java | 21 ++- .../metamx/druid/indexer/ZkUpdaterJob.java | 130 ------------------ .../druid/indexer/updater/UpdaterJobSpec.java | 2 - install/druid_setup.sh | 8 +- realtime/pom.xml | 16 --- server/pom.xml | 16 --- 13 files changed, 82 insertions(+), 266 deletions(-) delete mode 100755 deploy.sh delete mode 100644 indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java diff --git a/client/pom.xml b/client/pom.xml index 0d0aae60a34..16b6ba0ffae 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -186,22 +186,6 @@ - - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - - maven-jar-plugin diff --git a/deploy.sh b/deploy.sh deleted file mode 100755 index ab19515bfe5..00000000000 --- a/deploy.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -e - -PROJECT=druid - -DIST_DIR=dist/tar - -SCRIPT_DIR=`dirname $0` -pushd $SCRIPT_DIR -SCRIPT_DIR=`pwd` -popd - -VERSION=`cat pom.xml | grep version | head -2 | tail -1 | sed 's_.*\([^<]*\).*_\1_'` -TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz - -${SCRIPT_DIR}/build.sh -if [ $? -ne "0" ]; then - echo "Build failed" - exit 2 -fi - -echo Deploying ${TAR_FILE} -s3cmd put ${TAR_FILE} s3://metamx-galaxy-bin/binaries/ diff --git a/indexer/pom.xml b/indexer/pom.xml index 9c57ed49344..f7cd4df4eba 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -70,7 +70,7 @@ org.apache.hadoop hadoop-core - 0.20.2 + 0.20.205-emr org.mortbay.jetty diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java index d1925629bd2..720242466e4 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java @@ -19,12 +19,16 @@ package com.metamx.druid.indexer; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.db.DbConnector; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; @@ -32,9 +36,7 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; +import java.io.IOException; import java.util.List; /** @@ -50,7 +52,7 @@ public class DbUpdaterJob implements Jobby private final DBI dbi; // Keep track of published segment identifiers, in case a client is interested. - private volatile List publishedSegments = null; + private volatile ImmutableList publishedSegments = null; public DbUpdaterJob( HadoopDruidIndexerConfig config @@ -66,7 +68,7 @@ public class DbUpdaterJob implements Jobby { final Configuration conf = new Configuration(); - List newPublishedSegments = new LinkedList(); + ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); for (String propName : System.getProperties().stringPropertyNames()) { if (propName.startsWith("hadoop.")) { @@ -74,16 +76,13 @@ public class DbUpdaterJob implements Jobby } } - final Iterator buckets = config.getAllBuckets().iterator(); - Bucket bucket = buckets.next(); - int numRetried = 0; - while (true) { - try { - final Path path = new Path(config.makeSegmentOutputPath(bucket), "descriptor.json"); - final DataSegment segment = jsonMapper.readValue( - path.getFileSystem(conf).open(path), - DataSegment.class - ); + final Path descriptorInfoDir = config.makeDescriptorInfoDir(); + + try { + FileSystem fs = descriptorInfoDir.getFileSystem(conf); + + for (FileStatus status : fs.listStatus(descriptorInfoDir)) { + final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); dbi.withHandle( new HandleCallback() @@ -91,12 +90,11 @@ public class DbUpdaterJob implements Jobby @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + handle.createStatement(String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", spec.getSegmentTable() - ) - ) + )) .bind("id", segment.getIdentifier()) .bind("dataSource", segment.getDataSource()) .bind("created_date", new DateTime().toString()) @@ -113,37 +111,15 @@ public class DbUpdaterJob implements Jobby } ); - newPublishedSegments.add(segment); + publishedSegmentsBuilder.add(segment); log.info("Published %s", segment.getIdentifier()); } - catch (Exception e) { - if (numRetried < 5) { - log.error(e, "Retrying[%d] after exception when loading segment metadata into db", numRetried); - - try { - Thread.sleep(15 * 1000); - } - catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - return false; - } - - ++numRetried; - continue; - } - log.error(e, "Failing, retried too many times."); - return false; - } - - if (buckets.hasNext()) { - bucket = buckets.next(); - numRetried = 0; - } else { - break; - } + } + catch (IOException e) { + throw Throwables.propagate(e); } - publishedSegments = newPublishedSegments; + publishedSegments = publishedSegmentsBuilder.build(); return true; } @@ -158,7 +134,7 @@ public class DbUpdaterJob implements Jobby log.error("getPublishedSegments called before run!"); throw new IllegalStateException("DbUpdaterJob has not run yet"); } else { - return Collections.unmodifiableList(publishedSegments); + return publishedSegments; } } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java index 33b72d06634..b020a99fa4e 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java @@ -118,8 +118,7 @@ public class HadoopDruidIndexer private static void printHelp() { - System.out.println("Usage: "); - System.out.println(" is the ISO8601 interval of data to run over."); + System.out.println("Usage: "); System.out.println(" is either a JSON object or the path to a file that contains a JSON object."); System.out.println(); System.out.println("JSON object description:"); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index be4b09fd9c6..403484b9c61 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -34,6 +34,7 @@ import com.metamx.common.MapUtils; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.serde.Registererer; import com.metamx.druid.indexer.data.DataSpec; import com.metamx.druid.indexer.data.ToLowercaseDataSpec; @@ -59,6 +60,8 @@ import org.joda.time.format.ISODateTimeFormat; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; @@ -111,9 +114,7 @@ public class HadoopDruidIndexerConfig RegisteringNode.registerHandlers(registererers, Arrays.asList(jsonMapper)); } - final HadoopDruidIndexerConfig retVal = jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); - retVal.verify(); - return retVal; + return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); } @SuppressWarnings("unchecked") @@ -152,7 +153,9 @@ public class HadoopDruidIndexerConfig public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) { - return fromString(conf.get(CONFIG_PROPERTY)); + final HadoopDruidIndexerConfig retVal = fromString(conf.get(CONFIG_PROPERTY)); + retVal.verify(); + return retVal; } private static final Logger log = new Logger(HadoopDruidIndexerConfig.class); @@ -558,6 +561,16 @@ public class HadoopDruidIndexerConfig ); } + public Path makeDescriptorInfoDir() + { + return new Path(makeIntermediatePath(), "segmentDescriptorInfo"); + } + + public Path makeDescriptorInfoPath(DataSegment segment) + { + return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); + } + public Path makeSegmentOutputPath(Bucket bucket) { final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java index 5d7c281aef9..15354d0be67 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.indexer; import com.google.common.base.Preconditions; diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 9914da18c13..34d743fc9be 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -44,7 +44,6 @@ import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.MMappedIndex; import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.input.MapBasedInputRow; -import com.metamx.druid.jackson.DefaultObjectMapper; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -419,6 +418,7 @@ public class IndexGeneratorJob implements Jobby int attemptNumber = context.getTaskAttemptID().getId(); Path indexBasePath = config.makeSegmentOutputPath(bucket); Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); outputFS.mkdirs(indexBasePath); @@ -478,7 +478,7 @@ public class IndexGeneratorJob implements Jobby // retry 1 minute boolean success = false; for (int i = 0; i < 6; i++) { - if (renameIndexFiles(outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); success = true; break; @@ -508,7 +508,7 @@ public class IndexGeneratorJob implements Jobby outputFS.delete(indexZipFilePath, true); } else { outputFS.delete(finalIndexZipFilePath, true); - if (!renameIndexFiles(outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { throw new ISE( "Files [%s] and [%s] are different, but still cannot rename after retry loop", indexZipFilePath.toUri().getPath(), @@ -520,6 +520,7 @@ public class IndexGeneratorJob implements Jobby } private boolean renameIndexFiles( + FileSystem intermediateFS, FileSystem outputFS, Path indexBasePath, Path indexZipFilePath, @@ -565,7 +566,18 @@ public class IndexGeneratorJob implements Jobby return false; } - final Path descriptorPath = new Path(indexBasePath, "descriptor.json"); + writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json")); + final Path descriptorPath = config.makeDescriptorInfoPath(segment); + log.info("Writing descriptor to path[%s]", descriptorPath); + intermediateFS.mkdirs(descriptorPath.getParent()); + writeSegmentDescriptor(intermediateFS, segment, descriptorPath); + + return true; + } + + private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath) + throws IOException + { if (outputFS.exists(descriptorPath)) { outputFS.delete(descriptorPath, false); } @@ -577,7 +589,6 @@ public class IndexGeneratorJob implements Jobby finally { descriptorOut.close(); } - return true; } private long copyFile( diff --git a/indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java b/indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java deleted file mode 100644 index cd2ed44bd59..00000000000 --- a/indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.indexer; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.indexer.updater.ZkUpdaterJobSpec; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ZkClientConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; -import org.I0Itec.zkclient.ZkClient; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.codehaus.jackson.map.ObjectMapper; -import org.joda.time.DateTime; - -/** - */ -public class ZkUpdaterJob implements Jobby -{ - private static final Logger log = new Logger(ZkUpdaterJob.class); - - private static final Joiner JOINER = Joiner.on("/"); - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); - - private final HadoopDruidIndexerConfig config; - private final ZkUpdaterJobSpec spec; - - public ZkUpdaterJob( - HadoopDruidIndexerConfig config - ) - { - this.config = config; - this.spec = (ZkUpdaterJobSpec) config.getUpdaterJobSpec(); - } - - @Override - public boolean run() - { - if (!spec.postToZk()) { - return true; - } - - Configuration conf = new Configuration(); - - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - - final Lifecycle lifecycle = new Lifecycle(); - ZkClient zkClient = Initialization.makeZkClient( - new ZkClientConfig() - { - @Override - public String getZkHosts() - { - return spec.getZkQuorum(); - } - }, - lifecycle - ); - - try { - lifecycle.start(); - } - catch (Exception e) { - log.error(e, "Exception on lifecycle start?"); - lifecycle.stop(); - return false; - } - - try { - zkClient.waitUntilConnected(); - final String dataSourceBasePath = JOINER.join(spec.getZkBasePath(), config.getDataSource()); - if (! zkClient.exists(dataSourceBasePath)) { - zkClient.createPersistent( - dataSourceBasePath, - jsonMapper.writeValueAsString( - ImmutableMap.of( - "created", new DateTime().toString() - ) - ) - ); - } - - for (Bucket bucket: config.getAllBuckets()) { - final Path path = new Path(config.makeSegmentOutputPath(bucket), "descriptor.json"); - DataSegment segment = jsonMapper.readValue( - path.getFileSystem(conf).open(path), - DataSegment.class - ); - - String segmentPath = JOINER.join(dataSourceBasePath, segment.getIdentifier()); - log.info("Adding index to list of indexes at zkPath[%s].", segmentPath); - zkClient.createPersistent(segmentPath, jsonMapper.writeValueAsString(segment)); - } - } - catch (Exception e) { - log.error(e, "Exception when trying to update zk metadata."); - return false; - } - finally { - lifecycle.stop(); - } - - return true; - } -} diff --git a/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java b/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java index 4818e4c99a9..3523a198ba6 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java @@ -19,8 +19,6 @@ package com.metamx.druid.indexer.updater; -import com.metamx.druid.indexer.DbUpdaterJob; -import com.metamx.druid.indexer.ZkUpdaterJob; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonTypeInfo; diff --git a/install/druid_setup.sh b/install/druid_setup.sh index 7ea69f99d18..22dc2ad1b77 100755 --- a/install/druid_setup.sh +++ b/install/druid_setup.sh @@ -15,11 +15,11 @@ fi DRUID_DIR=$(cd $(dirname $0)/.. ; pwd) -DRUID_SERVER_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-server*selfcontained.jar') |head -1)" -[ -z "${DRUID_SERVER_JAR}" ] && echo "unable to find druid server jar" && exit 2 -echo "using ${DRUID_SERVER_JAR}" +DRUID_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-services*selfcontained.jar') |head -1)" +[ -z "${DRUID_JAR}" ] && echo "unable to find druid server jar" && exit 2 +echo "using ${DRUID_JAR}" echo -$RUN_JAVA -cp "${DRUID_SERVER_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $* +$RUN_JAVA -cp "${DRUID_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $* [ -e ${DRUID_DIR}/install/druid_setup.log ] && egrep "WARN|ERROR|FATAL" ${DRUID_DIR}/install/druid_setup.log diff --git a/realtime/pom.xml b/realtime/pom.xml index 0cd9aaaac43..732f3703c9e 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -192,22 +192,6 @@ - - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - - org.scala-tools maven-scala-plugin diff --git a/server/pom.xml b/server/pom.xml index eed7d78371c..6e88b69dbec 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -228,22 +228,6 @@ - - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - - maven-jar-plugin