1) Have IndexGeneratorJob write the descriptors for each of the segments it creates to a path in the temporary working directory (generally HDFS)

2) Have the DbUpdaterJob read descriptors from the temporary working directory instead of looking in the final segment output location (often the eventually consistent S3)
3) 1 and 2 Fixes #30
This commit is contained in:
Eric Tschetter 2012-11-20 15:30:50 -06:00
parent 9cbffa287f
commit 0f63cb4f00
13 changed files with 82 additions and 266 deletions

View File

@ -186,22 +186,6 @@
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>

View File

@ -1,22 +0,0 @@
#!/bin/bash -e
PROJECT=druid
DIST_DIR=dist/tar
SCRIPT_DIR=`dirname $0`
pushd $SCRIPT_DIR
SCRIPT_DIR=`pwd`
popd
VERSION=`cat pom.xml | grep version | head -2 | tail -1 | sed 's_.*<version>\([^<]*\)</version>.*_\1_'`
TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz
${SCRIPT_DIR}/build.sh
if [ $? -ne "0" ]; then
echo "Build failed"
exit 2
fi
echo Deploying ${TAR_FILE}
s3cmd put ${TAR_FILE} s3://metamx-galaxy-bin/binaries/

View File

@ -70,7 +70,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<version>0.20.205-emr</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>

View File

@ -19,12 +19,16 @@
package com.metamx.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
@ -32,9 +36,7 @@ import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.io.IOException;
import java.util.List;
/**
@ -50,7 +52,7 @@ public class DbUpdaterJob implements Jobby
private final DBI dbi;
// Keep track of published segment identifiers, in case a client is interested.
private volatile List<DataSegment> publishedSegments = null;
private volatile ImmutableList<DataSegment> publishedSegments = null;
public DbUpdaterJob(
HadoopDruidIndexerConfig config
@ -66,7 +68,7 @@ public class DbUpdaterJob implements Jobby
{
final Configuration conf = new Configuration();
List<DataSegment> newPublishedSegments = new LinkedList<DataSegment>();
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
@ -74,16 +76,13 @@ public class DbUpdaterJob implements Jobby
}
}
final Iterator<Bucket> buckets = config.getAllBuckets().iterator();
Bucket bucket = buckets.next();
int numRetried = 0;
while (true) {
try {
final Path path = new Path(config.makeSegmentOutputPath(bucket), "descriptor.json");
final DataSegment segment = jsonMapper.readValue(
path.getFileSystem(conf).open(path),
DataSegment.class
);
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
dbi.withHandle(
new HandleCallback<Void>()
@ -91,12 +90,11 @@ public class DbUpdaterJob implements Jobby
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
handle.createStatement(String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
spec.getSegmentTable()
)
)
))
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
@ -113,37 +111,15 @@ public class DbUpdaterJob implements Jobby
}
);
newPublishedSegments.add(segment);
publishedSegmentsBuilder.add(segment);
log.info("Published %s", segment.getIdentifier());
}
catch (Exception e) {
if (numRetried < 5) {
log.error(e, "Retrying[%d] after exception when loading segment metadata into db", numRetried);
try {
Thread.sleep(15 * 1000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
return false;
}
++numRetried;
continue;
}
log.error(e, "Failing, retried too many times.");
return false;
}
if (buckets.hasNext()) {
bucket = buckets.next();
numRetried = 0;
} else {
break;
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
publishedSegments = newPublishedSegments;
publishedSegments = publishedSegmentsBuilder.build();
return true;
}
@ -158,7 +134,7 @@ public class DbUpdaterJob implements Jobby
log.error("getPublishedSegments called before run!");
throw new IllegalStateException("DbUpdaterJob has not run yet");
} else {
return Collections.unmodifiableList(publishedSegments);
return publishedSegments;
}
}
}

View File

@ -118,8 +118,7 @@ public class HadoopDruidIndexer
private static void printHelp()
{
System.out.println("Usage: <java invocation> <time_interval> <config_spec>");
System.out.println("<time_interval> is the ISO8601 interval of data to run over.");
System.out.println("Usage: <java invocation> <config_spec>");
System.out.println("<config_spec> is either a JSON object or the path to a file that contains a JSON object.");
System.out.println();
System.out.println("JSON object description:");

View File

@ -34,6 +34,7 @@ import com.metamx.common.MapUtils;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.indexer.data.DataSpec;
import com.metamx.druid.indexer.data.ToLowercaseDataSpec;
@ -59,6 +60,8 @@ import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
@ -111,9 +114,7 @@ public class HadoopDruidIndexerConfig
RegisteringNode.registerHandlers(registererers, Arrays.asList(jsonMapper));
}
final HadoopDruidIndexerConfig retVal = jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
retVal.verify();
return retVal;
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
}
@SuppressWarnings("unchecked")
@ -152,7 +153,9 @@ public class HadoopDruidIndexerConfig
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
return fromString(conf.get(CONFIG_PROPERTY));
final HadoopDruidIndexerConfig retVal = fromString(conf.get(CONFIG_PROPERTY));
retVal.verify();
return retVal;
}
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
@ -558,6 +561,16 @@ public class HadoopDruidIndexerConfig
);
}
public Path makeDescriptorInfoDir()
{
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
}
public Path makeDescriptorInfoPath(DataSegment segment)
{
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
}
public Path makeSegmentOutputPath(Bucket bucket)
{
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexer;
import com.google.common.base.Preconditions;

View File

@ -44,7 +44,6 @@ import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -419,6 +418,7 @@ public class IndexGeneratorJob implements Jobby
int attemptNumber = context.getTaskAttemptID().getId();
Path indexBasePath = config.makeSegmentOutputPath(bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
outputFS.mkdirs(indexBasePath);
@ -478,7 +478,7 @@ public class IndexGeneratorJob implements Jobby
// retry 1 minute
boolean success = false;
for (int i = 0; i < 6; i++) {
if (renameIndexFiles(outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
success = true;
break;
@ -508,7 +508,7 @@ public class IndexGeneratorJob implements Jobby
outputFS.delete(indexZipFilePath, true);
} else {
outputFS.delete(finalIndexZipFilePath, true);
if (!renameIndexFiles(outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
throw new ISE(
"Files [%s] and [%s] are different, but still cannot rename after retry loop",
indexZipFilePath.toUri().getPath(),
@ -520,6 +520,7 @@ public class IndexGeneratorJob implements Jobby
}
private boolean renameIndexFiles(
FileSystem intermediateFS,
FileSystem outputFS,
Path indexBasePath,
Path indexZipFilePath,
@ -565,7 +566,18 @@ public class IndexGeneratorJob implements Jobby
return false;
}
final Path descriptorPath = new Path(indexBasePath, "descriptor.json");
writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json"));
final Path descriptorPath = config.makeDescriptorInfoPath(segment);
log.info("Writing descriptor to path[%s]", descriptorPath);
intermediateFS.mkdirs(descriptorPath.getParent());
writeSegmentDescriptor(intermediateFS, segment, descriptorPath);
return true;
}
private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath)
throws IOException
{
if (outputFS.exists(descriptorPath)) {
outputFS.delete(descriptorPath, false);
}
@ -577,7 +589,6 @@ public class IndexGeneratorJob implements Jobby
finally {
descriptorOut.close();
}
return true;
}
private long copyFile(

View File

@ -1,130 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexer;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.updater.ZkUpdaterJobSpec;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
/**
*/
public class ZkUpdaterJob implements Jobby
{
private static final Logger log = new Logger(ZkUpdaterJob.class);
private static final Joiner JOINER = Joiner.on("/");
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final HadoopDruidIndexerConfig config;
private final ZkUpdaterJobSpec spec;
public ZkUpdaterJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.spec = (ZkUpdaterJobSpec) config.getUpdaterJobSpec();
}
@Override
public boolean run()
{
if (!spec.postToZk()) {
return true;
}
Configuration conf = new Configuration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Lifecycle lifecycle = new Lifecycle();
ZkClient zkClient = Initialization.makeZkClient(
new ZkClientConfig()
{
@Override
public String getZkHosts()
{
return spec.getZkQuorum();
}
},
lifecycle
);
try {
lifecycle.start();
}
catch (Exception e) {
log.error(e, "Exception on lifecycle start?");
lifecycle.stop();
return false;
}
try {
zkClient.waitUntilConnected();
final String dataSourceBasePath = JOINER.join(spec.getZkBasePath(), config.getDataSource());
if (! zkClient.exists(dataSourceBasePath)) {
zkClient.createPersistent(
dataSourceBasePath,
jsonMapper.writeValueAsString(
ImmutableMap.of(
"created", new DateTime().toString()
)
)
);
}
for (Bucket bucket: config.getAllBuckets()) {
final Path path = new Path(config.makeSegmentOutputPath(bucket), "descriptor.json");
DataSegment segment = jsonMapper.readValue(
path.getFileSystem(conf).open(path),
DataSegment.class
);
String segmentPath = JOINER.join(dataSourceBasePath, segment.getIdentifier());
log.info("Adding index to list of indexes at zkPath[%s].", segmentPath);
zkClient.createPersistent(segmentPath, jsonMapper.writeValueAsString(segment));
}
}
catch (Exception e) {
log.error(e, "Exception when trying to update zk metadata.");
return false;
}
finally {
lifecycle.stop();
}
return true;
}
}

View File

@ -19,8 +19,6 @@
package com.metamx.druid.indexer.updater;
import com.metamx.druid.indexer.DbUpdaterJob;
import com.metamx.druid.indexer.ZkUpdaterJob;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;

View File

@ -15,11 +15,11 @@ fi
DRUID_DIR=$(cd $(dirname $0)/.. ; pwd)
DRUID_SERVER_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-server*selfcontained.jar') |head -1)"
[ -z "${DRUID_SERVER_JAR}" ] && echo "unable to find druid server jar" && exit 2
echo "using ${DRUID_SERVER_JAR}"
DRUID_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-services*selfcontained.jar') |head -1)"
[ -z "${DRUID_JAR}" ] && echo "unable to find druid server jar" && exit 2
echo "using ${DRUID_JAR}"
echo
$RUN_JAVA -cp "${DRUID_SERVER_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $*
$RUN_JAVA -cp "${DRUID_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $*
[ -e ${DRUID_DIR}/install/druid_setup.log ] && egrep "WARN|ERROR|FATAL" ${DRUID_DIR}/install/druid_setup.log

View File

@ -192,22 +192,6 @@
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>

View File

@ -228,22 +228,6 @@
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>