diff --git a/client/pom.xml b/client/pom.xml index 0d0aae60a34..16b6ba0ffae 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -186,22 +186,6 @@ - - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - - maven-jar-plugin diff --git a/deploy.sh b/deploy.sh deleted file mode 100755 index ab19515bfe5..00000000000 --- a/deploy.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -e - -PROJECT=druid - -DIST_DIR=dist/tar - -SCRIPT_DIR=`dirname $0` -pushd $SCRIPT_DIR -SCRIPT_DIR=`pwd` -popd - -VERSION=`cat pom.xml | grep version | head -2 | tail -1 | sed 's_.*\([^<]*\).*_\1_'` -TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz - -${SCRIPT_DIR}/build.sh -if [ $? -ne "0" ]; then - echo "Build failed" - exit 2 -fi - -echo Deploying ${TAR_FILE} -s3cmd put ${TAR_FILE} s3://metamx-galaxy-bin/binaries/ diff --git a/indexer/pom.xml b/indexer/pom.xml index 9c57ed49344..f7cd4df4eba 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -70,7 +70,7 @@ org.apache.hadoop hadoop-core - 0.20.2 + 0.20.205-emr org.mortbay.jetty diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java index d1925629bd2..720242466e4 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java @@ -19,12 +19,16 @@ package com.metamx.druid.indexer; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.db.DbConnector; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; @@ -32,9 +36,7 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; +import java.io.IOException; import java.util.List; /** @@ -50,7 +52,7 @@ public class DbUpdaterJob implements Jobby private final DBI dbi; // Keep track of published segment identifiers, in case a client is interested. - private volatile List publishedSegments = null; + private volatile ImmutableList publishedSegments = null; public DbUpdaterJob( HadoopDruidIndexerConfig config @@ -66,7 +68,7 @@ public class DbUpdaterJob implements Jobby { final Configuration conf = new Configuration(); - List newPublishedSegments = new LinkedList(); + ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); for (String propName : System.getProperties().stringPropertyNames()) { if (propName.startsWith("hadoop.")) { @@ -74,16 +76,13 @@ public class DbUpdaterJob implements Jobby } } - final Iterator buckets = config.getAllBuckets().iterator(); - Bucket bucket = buckets.next(); - int numRetried = 0; - while (true) { - try { - final Path path = new Path(config.makeSegmentOutputPath(bucket), "descriptor.json"); - final DataSegment segment = jsonMapper.readValue( - path.getFileSystem(conf).open(path), - DataSegment.class - ); + final Path descriptorInfoDir = config.makeDescriptorInfoDir(); + + try { + FileSystem fs = descriptorInfoDir.getFileSystem(conf); + + for (FileStatus status : fs.listStatus(descriptorInfoDir)) { + final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); dbi.withHandle( new HandleCallback() @@ -91,12 +90,11 @@ public class DbUpdaterJob implements Jobby @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + handle.createStatement(String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", spec.getSegmentTable() - ) - ) + )) .bind("id", segment.getIdentifier()) .bind("dataSource", segment.getDataSource()) .bind("created_date", new DateTime().toString()) @@ -113,37 +111,15 @@ public class DbUpdaterJob implements Jobby } ); - newPublishedSegments.add(segment); + publishedSegmentsBuilder.add(segment); log.info("Published %s", segment.getIdentifier()); } - catch (Exception e) { - if (numRetried < 5) { - log.error(e, "Retrying[%d] after exception when loading segment metadata into db", numRetried); - - try { - Thread.sleep(15 * 1000); - } - catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - return false; - } - - ++numRetried; - continue; - } - log.error(e, "Failing, retried too many times."); - return false; - } - - if (buckets.hasNext()) { - bucket = buckets.next(); - numRetried = 0; - } else { - break; - } + } + catch (IOException e) { + throw Throwables.propagate(e); } - publishedSegments = newPublishedSegments; + publishedSegments = publishedSegmentsBuilder.build(); return true; } @@ -158,7 +134,7 @@ public class DbUpdaterJob implements Jobby log.error("getPublishedSegments called before run!"); throw new IllegalStateException("DbUpdaterJob has not run yet"); } else { - return Collections.unmodifiableList(publishedSegments); + return publishedSegments; } } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java index 33b72d06634..b020a99fa4e 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexer.java @@ -118,8 +118,7 @@ public class HadoopDruidIndexer private static void printHelp() { - System.out.println("Usage: "); - System.out.println(" is the ISO8601 interval of data to run over."); + System.out.println("Usage: "); System.out.println(" is either a JSON object or the path to a file that contains a JSON object."); System.out.println(); System.out.println("JSON object description:"); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index be4b09fd9c6..403484b9c61 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -34,6 +34,7 @@ import com.metamx.common.MapUtils; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.serde.Registererer; import com.metamx.druid.indexer.data.DataSpec; import com.metamx.druid.indexer.data.ToLowercaseDataSpec; @@ -59,6 +60,8 @@ import org.joda.time.format.ISODateTimeFormat; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; @@ -111,9 +114,7 @@ public class HadoopDruidIndexerConfig RegisteringNode.registerHandlers(registererers, Arrays.asList(jsonMapper)); } - final HadoopDruidIndexerConfig retVal = jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); - retVal.verify(); - return retVal; + return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); } @SuppressWarnings("unchecked") @@ -152,7 +153,9 @@ public class HadoopDruidIndexerConfig public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) { - return fromString(conf.get(CONFIG_PROPERTY)); + final HadoopDruidIndexerConfig retVal = fromString(conf.get(CONFIG_PROPERTY)); + retVal.verify(); + return retVal; } private static final Logger log = new Logger(HadoopDruidIndexerConfig.class); @@ -558,6 +561,16 @@ public class HadoopDruidIndexerConfig ); } + public Path makeDescriptorInfoDir() + { + return new Path(makeIntermediatePath(), "segmentDescriptorInfo"); + } + + public Path makeDescriptorInfoPath(DataSegment segment) + { + return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); + } + public Path makeSegmentOutputPath(Bucket bucket) { final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java index 5d7c281aef9..15354d0be67 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.indexer; import com.google.common.base.Preconditions; diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 9914da18c13..34d743fc9be 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -44,7 +44,6 @@ import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.MMappedIndex; import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.input.MapBasedInputRow; -import com.metamx.druid.jackson.DefaultObjectMapper; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -419,6 +418,7 @@ public class IndexGeneratorJob implements Jobby int attemptNumber = context.getTaskAttemptID().getId(); Path indexBasePath = config.makeSegmentOutputPath(bucket); Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); outputFS.mkdirs(indexBasePath); @@ -478,7 +478,7 @@ public class IndexGeneratorJob implements Jobby // retry 1 minute boolean success = false; for (int i = 0; i < 6; i++) { - if (renameIndexFiles(outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); success = true; break; @@ -508,7 +508,7 @@ public class IndexGeneratorJob implements Jobby outputFS.delete(indexZipFilePath, true); } else { outputFS.delete(finalIndexZipFilePath, true); - if (!renameIndexFiles(outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { throw new ISE( "Files [%s] and [%s] are different, but still cannot rename after retry loop", indexZipFilePath.toUri().getPath(), @@ -520,6 +520,7 @@ public class IndexGeneratorJob implements Jobby } private boolean renameIndexFiles( + FileSystem intermediateFS, FileSystem outputFS, Path indexBasePath, Path indexZipFilePath, @@ -565,7 +566,18 @@ public class IndexGeneratorJob implements Jobby return false; } - final Path descriptorPath = new Path(indexBasePath, "descriptor.json"); + writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json")); + final Path descriptorPath = config.makeDescriptorInfoPath(segment); + log.info("Writing descriptor to path[%s]", descriptorPath); + intermediateFS.mkdirs(descriptorPath.getParent()); + writeSegmentDescriptor(intermediateFS, segment, descriptorPath); + + return true; + } + + private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath) + throws IOException + { if (outputFS.exists(descriptorPath)) { outputFS.delete(descriptorPath, false); } @@ -577,7 +589,6 @@ public class IndexGeneratorJob implements Jobby finally { descriptorOut.close(); } - return true; } private long copyFile( diff --git a/indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java b/indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java deleted file mode 100644 index cd2ed44bd59..00000000000 --- a/indexer/src/main/java/com/metamx/druid/indexer/ZkUpdaterJob.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.indexer; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.indexer.updater.ZkUpdaterJobSpec; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ZkClientConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; -import org.I0Itec.zkclient.ZkClient; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.codehaus.jackson.map.ObjectMapper; -import org.joda.time.DateTime; - -/** - */ -public class ZkUpdaterJob implements Jobby -{ - private static final Logger log = new Logger(ZkUpdaterJob.class); - - private static final Joiner JOINER = Joiner.on("/"); - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); - - private final HadoopDruidIndexerConfig config; - private final ZkUpdaterJobSpec spec; - - public ZkUpdaterJob( - HadoopDruidIndexerConfig config - ) - { - this.config = config; - this.spec = (ZkUpdaterJobSpec) config.getUpdaterJobSpec(); - } - - @Override - public boolean run() - { - if (!spec.postToZk()) { - return true; - } - - Configuration conf = new Configuration(); - - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - - final Lifecycle lifecycle = new Lifecycle(); - ZkClient zkClient = Initialization.makeZkClient( - new ZkClientConfig() - { - @Override - public String getZkHosts() - { - return spec.getZkQuorum(); - } - }, - lifecycle - ); - - try { - lifecycle.start(); - } - catch (Exception e) { - log.error(e, "Exception on lifecycle start?"); - lifecycle.stop(); - return false; - } - - try { - zkClient.waitUntilConnected(); - final String dataSourceBasePath = JOINER.join(spec.getZkBasePath(), config.getDataSource()); - if (! zkClient.exists(dataSourceBasePath)) { - zkClient.createPersistent( - dataSourceBasePath, - jsonMapper.writeValueAsString( - ImmutableMap.of( - "created", new DateTime().toString() - ) - ) - ); - } - - for (Bucket bucket: config.getAllBuckets()) { - final Path path = new Path(config.makeSegmentOutputPath(bucket), "descriptor.json"); - DataSegment segment = jsonMapper.readValue( - path.getFileSystem(conf).open(path), - DataSegment.class - ); - - String segmentPath = JOINER.join(dataSourceBasePath, segment.getIdentifier()); - log.info("Adding index to list of indexes at zkPath[%s].", segmentPath); - zkClient.createPersistent(segmentPath, jsonMapper.writeValueAsString(segment)); - } - } - catch (Exception e) { - log.error(e, "Exception when trying to update zk metadata."); - return false; - } - finally { - lifecycle.stop(); - } - - return true; - } -} diff --git a/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java b/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java index 4818e4c99a9..3523a198ba6 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/updater/UpdaterJobSpec.java @@ -19,8 +19,6 @@ package com.metamx.druid.indexer.updater; -import com.metamx.druid.indexer.DbUpdaterJob; -import com.metamx.druid.indexer.ZkUpdaterJob; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonTypeInfo; diff --git a/install/druid_setup.sh b/install/druid_setup.sh index 7ea69f99d18..22dc2ad1b77 100755 --- a/install/druid_setup.sh +++ b/install/druid_setup.sh @@ -15,11 +15,11 @@ fi DRUID_DIR=$(cd $(dirname $0)/.. ; pwd) -DRUID_SERVER_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-server*selfcontained.jar') |head -1)" -[ -z "${DRUID_SERVER_JAR}" ] && echo "unable to find druid server jar" && exit 2 -echo "using ${DRUID_SERVER_JAR}" +DRUID_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-services*selfcontained.jar') |head -1)" +[ -z "${DRUID_JAR}" ] && echo "unable to find druid server jar" && exit 2 +echo "using ${DRUID_JAR}" echo -$RUN_JAVA -cp "${DRUID_SERVER_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $* +$RUN_JAVA -cp "${DRUID_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $* [ -e ${DRUID_DIR}/install/druid_setup.log ] && egrep "WARN|ERROR|FATAL" ${DRUID_DIR}/install/druid_setup.log diff --git a/realtime/pom.xml b/realtime/pom.xml index 0cd9aaaac43..732f3703c9e 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -192,22 +192,6 @@ - - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - - org.scala-tools maven-scala-plugin diff --git a/server/pom.xml b/server/pom.xml index eed7d78371c..6e88b69dbec 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -228,22 +228,6 @@ - - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - - maven-jar-plugin