Cassandra Data Segment Pusher, initial commit.

This commit is contained in:
Brian O'Neill 2013-05-01 15:11:50 -04:00
parent fd4b14ece2
commit bf01399a2a
7 changed files with 448 additions and 1 deletions

View File

@ -75,7 +75,12 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>1.0.0-beta2</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.util.Map;
/**
*/
public class CassandraDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(CassandraDataSegmentKiller.class);
private final RestS3Service s3Client;
@Inject
public CassandraDataSegmentKiller(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
try {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier());
}
}
}

View File

@ -0,0 +1,170 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.zip.GZIPInputStream;
/**
*/
public class CassandraDataSegmentPuller implements DataSegmentPuller
{
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final String BUCKET = "bucket";
private static final String KEY = "key";
private final RestS3Service s3Client;
@Inject
public CassandraDataSegmentPuller(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
try {
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
InputStream in = null;
try {
in = s3Obj.getDataInputStream();
final String key = s3Obj.getKey();
if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir);
} else if (key.endsWith(".gz")) {
final File outFile = new File(outDir, toFilename(key, ".gz"));
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
} else {
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
}
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
}
catch (IOException e) {
FileUtils.deleteDirectory(outDir);
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
}
finally {
Closeables.closeQuietly(in);
}
}
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
}
}
private String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
{
try {
return s3Client.isObjectInBucket(coords.bucket, coords.path);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
S3Coords coords = new S3Coords(segment);
try {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
return objDetails.getLastModifiedDate().getTime();
}
catch (S3ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
private static class S3Coords
{
String bucket;
String path;
public S3Coords(DataSegment segment)
{
Map<String, Object> loadSpec = segment.getLoadSpec();
bucket = MapUtils.getString(loadSpec, BUCKET);
path = MapUtils.getString(loadSpec, KEY);
if (path.startsWith("/")) {
path = path.substring(1);
}
}
public String toString()
{
return String.format("s3://%s/%s", bucket, path);
}
}
}

View File

@ -0,0 +1,89 @@
package com.metamx.druid.loading;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
/**
*/
public class CassandraDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(CassandraDataSegmentPusher.class);
private final CassandraDataSegmentPusherConfig config;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final ObjectMapper jsonMapper;
private Cluster cluster;
private Session session;
private String keyspace = null;
private String table = null;
public CassandraDataSegmentPusher(
CassandraDataSegmentPusherConfig config,
ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.keyspace = this.config.getKeyspace();
this.table = this.config.getTable();
cluster = Cluster.builder().addContactPoints(this.config.getHost()).build();
session = cluster.connect();
session.execute("USE " + keyspace);
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
);
// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
int version = IndexIO.getVersionFromDir(indexFilesDir);
// Create descriptor
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)";
PreparedStatement ps = session.prepare(statement);
byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile));
byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile));
BoundStatement bs = ps.bind(key, version, descriptorData, indexData);
session.execute(bs);
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of("type", "c*", "key", key)
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return segment;
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import org.skife.config.Config;
/**
*/
public abstract class CassandraDataSegmentPusherConfig
{
@Config("druid.pusher.cassandra.host")
public abstract String getHost();
@Config("druid.pusher.cassandra.keyspace")
public abstract String getKeyspace();
@Config("druid.pusher.cassandra.table")
public abstract String getTable();
}

View File

@ -0,0 +1,78 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.metamx.common.MapUtils;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.Segment;
import org.joda.time.Interval;
import java.io.File;
import java.util.Map;
/**
*/
public class CassandraDataSegmentLoader implements SegmentLoader
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
}
@Override
public Segment getSegment(final DataSegment segment) throws SegmentLoadingException
{
return new Segment()
{
@Override
public String getIdentifier()
{
return segment.getIdentifier();
}
@Override
public Interval getDataInterval()
{
return segment.getInterval();
}
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
};
}
@Override
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException
{
}
}